diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-core/src | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-core/src')
-rw-r--r-- | third_party/rust/futures-core/src/future.rs | 103 | ||||
-rw-r--r-- | third_party/rust/futures-core/src/lib.rs | 27 | ||||
-rw-r--r-- | third_party/rust/futures-core/src/stream.rs | 235 | ||||
-rw-r--r-- | third_party/rust/futures-core/src/task/__internal/atomic_waker.rs | 409 | ||||
-rw-r--r-- | third_party/rust/futures-core/src/task/__internal/mod.rs | 4 | ||||
-rw-r--r-- | third_party/rust/futures-core/src/task/mod.rs | 10 | ||||
-rw-r--r-- | third_party/rust/futures-core/src/task/poll.rs | 12 |
7 files changed, 800 insertions, 0 deletions
diff --git a/third_party/rust/futures-core/src/future.rs b/third_party/rust/futures-core/src/future.rs new file mode 100644 index 0000000000..7540cd027e --- /dev/null +++ b/third_party/rust/futures-core/src/future.rs @@ -0,0 +1,103 @@ +//! Futures. + +use core::ops::DerefMut; +use core::pin::Pin; +use core::task::{Context, Poll}; + +#[doc(no_inline)] +pub use core::future::Future; + +/// An owned dynamically typed [`Future`] for use in cases where you can't +/// statically type your result or need to add some indirection. +#[cfg(feature = "alloc")] +pub type BoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + Send + 'a>>; + +/// `BoxFuture`, but without the `Send` requirement. +#[cfg(feature = "alloc")] +pub type LocalBoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + 'a>>; + +/// A future which tracks whether or not the underlying future +/// should no longer be polled. +/// +/// `is_terminated` will return `true` if a future should no longer be polled. +/// Usually, this state occurs after `poll` (or `try_poll`) returned +/// `Poll::Ready`. However, `is_terminated` may also return `true` if a future +/// has become inactive and can no longer make progress and should be ignored +/// or dropped rather than being `poll`ed again. +pub trait FusedFuture: Future { + /// Returns `true` if the underlying future should no longer be polled. + fn is_terminated(&self) -> bool; +} + +impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for &mut F { + fn is_terminated(&self) -> bool { + <F as FusedFuture>::is_terminated(&**self) + } +} + +impl<P> FusedFuture for Pin<P> +where + P: DerefMut + Unpin, + P::Target: FusedFuture, +{ + fn is_terminated(&self) -> bool { + <P::Target as FusedFuture>::is_terminated(&**self) + } +} + +mod private_try_future { + use super::Future; + + pub trait Sealed {} + + impl<F, T, E> Sealed for F where F: ?Sized + Future<Output = Result<T, E>> {} +} + +/// A convenience for futures that return `Result` values that includes +/// a variety of adapters tailored to such futures. +pub trait TryFuture: Future + private_try_future::Sealed { + /// The type of successful values yielded by this future + type Ok; + + /// The type of failures yielded by this future + type Error; + + /// Poll this `TryFuture` as if it were a `Future`. + /// + /// This method is a stopgap for a compiler limitation that prevents us from + /// directly inheriting from the `Future` trait; in the future it won't be + /// needed. + fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Ok, Self::Error>>; +} + +impl<F, T, E> TryFuture for F +where + F: ?Sized + Future<Output = Result<T, E>>, +{ + type Ok = T; + type Error = E; + + #[inline] + fn try_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.poll(cx) + } +} + +#[cfg(feature = "alloc")] +mod if_alloc { + use super::*; + use alloc::boxed::Box; + + impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> { + fn is_terminated(&self) -> bool { + <F as FusedFuture>::is_terminated(&**self) + } + } + + #[cfg(feature = "std")] + impl<F: FusedFuture> FusedFuture for std::panic::AssertUnwindSafe<F> { + fn is_terminated(&self) -> bool { + <F as FusedFuture>::is_terminated(&**self) + } + } +} diff --git a/third_party/rust/futures-core/src/lib.rs b/third_party/rust/futures-core/src/lib.rs new file mode 100644 index 0000000000..9c31d8d90b --- /dev/null +++ b/third_party/rust/futures-core/src/lib.rs @@ -0,0 +1,27 @@ +//! Core traits and types for asynchronous operations in Rust. + +#![cfg_attr(not(feature = "std"), no_std)] +#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)] +// It cannot be included in the published code because this lints have false positives in the minimum required version. +#![cfg_attr(test, warn(single_use_lifetimes))] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] + +#[cfg(feature = "alloc")] +extern crate alloc; + +pub mod future; +#[doc(no_inline)] +pub use self::future::{FusedFuture, Future, TryFuture}; + +pub mod stream; +#[doc(no_inline)] +pub use self::stream::{FusedStream, Stream, TryStream}; + +#[macro_use] +pub mod task; diff --git a/third_party/rust/futures-core/src/stream.rs b/third_party/rust/futures-core/src/stream.rs new file mode 100644 index 0000000000..ad5350b795 --- /dev/null +++ b/third_party/rust/futures-core/src/stream.rs @@ -0,0 +1,235 @@ +//! Asynchronous streams. + +use core::ops::DerefMut; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// An owned dynamically typed [`Stream`] for use in cases where you can't +/// statically type your result or need to add some indirection. +#[cfg(feature = "alloc")] +pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>; + +/// `BoxStream`, but without the `Send` requirement. +#[cfg(feature = "alloc")] +pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>; + +/// A stream of values produced asynchronously. +/// +/// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item +/// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream +/// represents a sequence of value-producing events that occur asynchronously to +/// the caller. +/// +/// The trait is modeled after `Future`, but allows `poll_next` to be called +/// even after a value has been produced, yielding `None` once the stream has +/// been fully exhausted. +#[must_use = "streams do nothing unless polled"] +pub trait Stream { + /// Values yielded by the stream. + type Item; + + /// Attempt to pull out the next value of this stream, registering the + /// current task for wakeup if the value is not yet available, and returning + /// `None` if the stream is exhausted. + /// + /// # Return value + /// + /// There are several possible return values, each indicating a distinct + /// stream state: + /// + /// - `Poll::Pending` means that this stream's next value is not ready + /// yet. Implementations will ensure that the current task will be notified + /// when the next value may be ready. + /// + /// - `Poll::Ready(Some(val))` means that the stream has successfully + /// produced a value, `val`, and may produce further values on subsequent + /// `poll_next` calls. + /// + /// - `Poll::Ready(None)` means that the stream has terminated, and + /// `poll_next` should not be invoked again. + /// + /// # Panics + /// + /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its + /// `poll_next` method again may panic, block forever, or cause other kinds of + /// problems; the `Stream` trait places no requirements on the effects of + /// such a call. However, as the `poll_next` method is not marked `unsafe`, + /// Rust's usual rules apply: calls must never cause undefined behavior + /// (memory corruption, incorrect use of `unsafe` functions, or the like), + /// regardless of the stream's state. + /// + /// If this is difficult to guard against then the [`fuse`] adapter can be used + /// to ensure that `poll_next` always returns `Ready(None)` in subsequent + /// calls. + /// + /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; + + /// Returns the bounds on the remaining length of the stream. + /// + /// Specifically, `size_hint()` returns a tuple where the first element + /// is the lower bound, and the second element is the upper bound. + /// + /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. + /// A [`None`] here means that either there is no known upper bound, or the + /// upper bound is larger than [`usize`]. + /// + /// # Implementation notes + /// + /// It is not enforced that a stream implementation yields the declared + /// number of elements. A buggy stream may yield less than the lower bound + /// or more than the upper bound of elements. + /// + /// `size_hint()` is primarily intended to be used for optimizations such as + /// reserving space for the elements of the stream, but must not be + /// trusted to e.g., omit bounds checks in unsafe code. An incorrect + /// implementation of `size_hint()` should not lead to memory safety + /// violations. + /// + /// That said, the implementation should provide a correct estimation, + /// because otherwise it would be a violation of the trait's protocol. + /// + /// The default implementation returns `(0, `[`None`]`)` which is correct for any + /// stream. + #[inline] + fn size_hint(&self) -> (usize, Option<usize>) { + (0, None) + } +} + +impl<S: ?Sized + Stream + Unpin> Stream for &mut S { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + S::poll_next(Pin::new(&mut **self), cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (**self).size_hint() + } +} + +impl<P> Stream for Pin<P> +where + P: DerefMut + Unpin, + P::Target: Stream, +{ + type Item = <P::Target as Stream>::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.get_mut().as_mut().poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (**self).size_hint() + } +} + +/// A stream which tracks whether or not the underlying stream +/// should no longer be polled. +/// +/// `is_terminated` will return `true` if a future should no longer be polled. +/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned +/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a +/// stream has become inactive and can no longer make progress and should be +/// ignored or dropped rather than being polled again. +pub trait FusedStream: Stream { + /// Returns `true` if the stream should no longer be polled. + fn is_terminated(&self) -> bool; +} + +impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F { + fn is_terminated(&self) -> bool { + <F as FusedStream>::is_terminated(&**self) + } +} + +impl<P> FusedStream for Pin<P> +where + P: DerefMut + Unpin, + P::Target: FusedStream, +{ + fn is_terminated(&self) -> bool { + <P::Target as FusedStream>::is_terminated(&**self) + } +} + +mod private_try_stream { + use super::Stream; + + pub trait Sealed {} + + impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {} +} + +/// A convenience for streams that return `Result` values that includes +/// a variety of adapters tailored to such futures. +pub trait TryStream: Stream + private_try_stream::Sealed { + /// The type of successful values yielded by this future + type Ok; + + /// The type of failures yielded by this future + type Error; + + /// Poll this `TryStream` as if it were a `Stream`. + /// + /// This method is a stopgap for a compiler limitation that prevents us from + /// directly inheriting from the `Stream` trait; in the future it won't be + /// needed. + fn try_poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<Self::Ok, Self::Error>>>; +} + +impl<S, T, E> TryStream for S +where + S: ?Sized + Stream<Item = Result<T, E>>, +{ + type Ok = T; + type Error = E; + + fn try_poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<Self::Ok, Self::Error>>> { + self.poll_next(cx) + } +} + +#[cfg(feature = "alloc")] +mod if_alloc { + use super::*; + use alloc::boxed::Box; + + impl<S: ?Sized + Stream + Unpin> Stream for Box<S> { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Pin::new(&mut **self).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (**self).size_hint() + } + } + + #[cfg(feature = "std")] + impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { + unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.size_hint() + } + } + + impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> { + fn is_terminated(&self) -> bool { + <S as FusedStream>::is_terminated(&**self) + } + } +} diff --git a/third_party/rust/futures-core/src/task/__internal/atomic_waker.rs b/third_party/rust/futures-core/src/task/__internal/atomic_waker.rs new file mode 100644 index 0000000000..d49d043619 --- /dev/null +++ b/third_party/rust/futures-core/src/task/__internal/atomic_waker.rs @@ -0,0 +1,409 @@ +use core::cell::UnsafeCell; +use core::fmt; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use core::task::Waker; + +/// A synchronization primitive for task wakeup. +/// +/// Sometimes the task interested in a given event will change over time. +/// An `AtomicWaker` can coordinate concurrent notifications with the consumer +/// potentially "updating" the underlying task to wake up. This is useful in +/// scenarios where a computation completes in another thread and wants to +/// notify the consumer, but the consumer is in the process of being migrated to +/// a new logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `wake` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `wake` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicWaker` may be reused for any number of calls to `register` or +/// `wake`. +/// +/// # Memory ordering +/// +/// Calling `register` "acquires" all memory "released" by calls to `wake` +/// before the call to `register`. Later calls to `wake` will wake the +/// registered waker (on contention this wake might be triggered in `register`). +/// +/// For concurrent calls to `register` (should be avoided) the ordering is only +/// guaranteed for the winning call. +/// +/// # Examples +/// +/// Here is a simple example providing a `Flag` that can be signalled manually +/// when it is ready. +/// +/// ``` +/// use futures::future::Future; +/// use futures::task::{Context, Poll, AtomicWaker}; +/// use std::sync::Arc; +/// use std::sync::atomic::AtomicBool; +/// use std::sync::atomic::Ordering::Relaxed; +/// use std::pin::Pin; +/// +/// struct Inner { +/// waker: AtomicWaker, +/// set: AtomicBool, +/// } +/// +/// #[derive(Clone)] +/// struct Flag(Arc<Inner>); +/// +/// impl Flag { +/// pub fn new() -> Self { +/// Self(Arc::new(Inner { +/// waker: AtomicWaker::new(), +/// set: AtomicBool::new(false), +/// })) +/// } +/// +/// pub fn signal(&self) { +/// self.0.set.store(true, Relaxed); +/// self.0.waker.wake(); +/// } +/// } +/// +/// impl Future for Flag { +/// type Output = (); +/// +/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { +/// // quick check to avoid registration if already done. +/// if self.0.set.load(Relaxed) { +/// return Poll::Ready(()); +/// } +/// +/// self.0.waker.register(cx.waker()); +/// +/// // Need to check condition **after** `register` to avoid a race +/// // condition that would result in lost notifications. +/// if self.0.set.load(Relaxed) { +/// Poll::Ready(()) +/// } else { +/// Poll::Pending +/// } +/// } +/// } +/// ``` +pub struct AtomicWaker { + state: AtomicUsize, + waker: UnsafeCell<Option<Waker>>, +} + +// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell +// stores a `Waker` value produced by calls to `register` and many threads can +// race to take the waker (to wake it) by calling `wake`. +// +// If a new `Waker` instance is produced by calling `register` before an +// existing one is consumed, then the existing one is overwritten. +// +// While `AtomicWaker` is single-producer, the implementation ensures memory +// safety. In the event of concurrent calls to `register`, there will be a +// single winner whose waker will get stored in the cell. The losers will not +// have their tasks woken. As such, callers should ensure to add synchronization +// to calls to `register`. +// +// The implementation uses a single `AtomicUsize` value to coordinate access to +// the `Waker` cell. There are two bits that are operated on independently. +// These are represented by `REGISTERING` and `WAKING`. +// +// The `REGISTERING` bit is set when a producer enters the critical section. The +// `WAKING` bit is set when a consumer enters the critical section. Neither bit +// being set is represented by `WAITING`. +// +// A thread obtains an exclusive lock on the waker cell by transitioning the +// state from `WAITING` to `REGISTERING` or `WAKING`, depending on the operation +// the thread wishes to perform. When this transition is made, it is guaranteed +// that no other thread will access the waker cell. +// +// # Registering +// +// On a call to `register`, an attempt to transition the state from WAITING to +// REGISTERING is made. On success, the caller obtains a lock on the waker cell. +// +// If the lock is obtained, then the thread sets the waker cell to the waker +// provided as an argument. Then it attempts to transition the state back from +// `REGISTERING` -> `WAITING`. +// +// If this transition is successful, then the registering process is complete +// and the next call to `wake` will observe the waker. +// +// If the transition fails, then there was a concurrent call to `wake` that was +// unable to access the waker cell (due to the registering thread holding the +// lock). To handle this, the registering thread removes the waker it just set +// from the cell and calls `wake` on it. This call to wake represents the +// attempt to wake by the other thread (that set the `WAKING` bit). The state is +// then transitioned from `REGISTERING | WAKING` back to `WAITING`. This +// transition must succeed because, at this point, the state cannot be +// transitioned by another thread. +// +// # Waking +// +// On a call to `wake`, an attempt to transition the state from `WAITING` to +// `WAKING` is made. On success, the caller obtains a lock on the waker cell. +// +// If the lock is obtained, then the thread takes ownership of the current value +// in the waker cell, and calls `wake` on it. The state is then transitioned +// back to `WAITING`. This transition must succeed as, at this point, the state +// cannot be transitioned by another thread. +// +// If the thread is unable to obtain the lock, the `WAKING` bit is still. This +// is because it has either been set by the current thread but the previous +// value included the `REGISTERING` bit **or** a concurrent thread is in the +// `WAKING` critical section. Either way, no action must be taken. +// +// If the current thread is the only concurrent call to `wake` and another +// thread is in the `register` critical section, when the other thread **exits** +// the `register` critical section, it will observe the `WAKING` bit and handle +// the wake itself. +// +// If another thread is in the `wake` critical section, then it will handle +// waking the task. +// +// # A potential race (is safely handled). +// +// Imagine the following situation: +// +// * Thread A obtains the `wake` lock and wakes a task. +// +// * Before thread A releases the `wake` lock, the woken task is scheduled. +// +// * Thread B attempts to wake the task. In theory this should result in the +// task being woken, but it cannot because thread A still holds the wake lock. +// +// This case is handled by requiring users of `AtomicWaker` to call `register` +// **before** attempting to observe the application state change that resulted +// in the task being awoken. The wakers also change the application state before +// calling wake. +// +// Because of this, the waker will do one of two things. +// +// 1) Observe the application state change that Thread B is woken for. In this +// case, it is OK for Thread B's wake to be lost. +// +// 2) Call register before attempting to observe the application state. Since +// Thread A still holds the `wake` lock, the call to `register` will result +// in the task waking itself and get scheduled again. + +/// Idle state +const WAITING: usize = 0; + +/// A new waker value is being registered with the `AtomicWaker` cell. +const REGISTERING: usize = 0b01; + +/// The waker currently registered with the `AtomicWaker` cell is being woken. +const WAKING: usize = 0b10; + +impl AtomicWaker { + /// Create an `AtomicWaker`. + pub const fn new() -> Self { + // Make sure that task is Sync + trait AssertSync: Sync {} + impl AssertSync for Waker {} + + Self { state: AtomicUsize::new(WAITING), waker: UnsafeCell::new(None) } + } + + /// Registers the waker to be notified on calls to `wake`. + /// + /// The new task will take place of any previous tasks that were registered + /// by previous calls to `register`. Any calls to `wake` that happen after + /// a call to `register` (as defined by the memory ordering rules), will + /// notify the `register` caller's task and deregister the waker from future + /// notifications. Because of this, callers should ensure `register` gets + /// invoked with a new `Waker` **each** time they require a wakeup. + /// + /// It is safe to call `register` with multiple other threads concurrently + /// calling `wake`. This will result in the `register` caller's current + /// task being notified once. + /// + /// This function is safe to call concurrently, but this is generally a bad + /// idea. Concurrent calls to `register` will attempt to register different + /// tasks to be notified. One of the callers will win and have its task set, + /// but there is no guarantee as to which caller will succeed. + /// + /// # Examples + /// + /// Here is how `register` is used when implementing a flag. + /// + /// ``` + /// use futures::future::Future; + /// use futures::task::{Context, Poll, AtomicWaker}; + /// use std::sync::atomic::AtomicBool; + /// use std::sync::atomic::Ordering::Relaxed; + /// use std::pin::Pin; + /// + /// struct Flag { + /// waker: AtomicWaker, + /// set: AtomicBool, + /// } + /// + /// impl Future for Flag { + /// type Output = (); + /// + /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + /// // Register **before** checking `set` to avoid a race condition + /// // that would result in lost notifications. + /// self.waker.register(cx.waker()); + /// + /// if self.set.load(Relaxed) { + /// Poll::Ready(()) + /// } else { + /// Poll::Pending + /// } + /// } + /// } + /// ``` + pub fn register(&self, waker: &Waker) { + match self + .state + .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) + .unwrap_or_else(|x| x) + { + WAITING => { + unsafe { + // Locked acquired, update the waker cell + *self.waker.get() = Some(waker.clone()); + + // Release the lock. If the state transitioned to include + // the `WAKING` bit, this means that at least one wake has + // been called concurrently. + // + // Start by assuming that the state is `REGISTERING` as this + // is what we just set it to. If this holds, we know that no + // other writes were performed in the meantime, so there is + // nothing to acquire, only release. In case of concurrent + // wakers, we need to acquire their releases, so success needs + // to do both. + let res = self.state.compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); + + match res { + Ok(_) => { + // memory ordering: acquired self.state during CAS + // - if previous wakes went through it syncs with + // their final release (`fetch_and`) + // - if there was no previous wake the next wake + // will wake us, no sync needed. + } + Err(actual) => { + // This branch can only be reached if at least one + // concurrent thread called `wake`. In this + // case, `actual` **must** be `REGISTERING | + // `WAKING`. + debug_assert_eq!(actual, REGISTERING | WAKING); + + // Take the waker to wake once the atomic operation has + // completed. + let waker = (*self.waker.get()).take().unwrap(); + + // We need to return to WAITING state (clear our lock and + // concurrent WAKING flag). This needs to acquire all + // WAKING fetch_or releases and it needs to release our + // update to self.waker, so we need a `swap` operation. + self.state.swap(WAITING, AcqRel); + + // memory ordering: we acquired the state for all + // concurrent wakes, but future wakes might still + // need to wake us in case we can't make progress + // from the pending wakes. + // + // So we simply schedule to come back later (we could + // also simply leave the registration in place above). + waker.wake(); + } + } + } + } + WAKING => { + // Currently in the process of waking the task, i.e., + // `wake` is currently being called on the old task handle. + // + // memory ordering: we acquired the state for all + // concurrent wakes, but future wakes might still + // need to wake us in case we can't make progress + // from the pending wakes. + // + // So we simply schedule to come back later (we + // could also spin here trying to acquire the lock + // to register). + waker.wake_by_ref(); + } + state => { + // In this case, a concurrent thread is holding the + // "registering" lock. This probably indicates a bug in the + // caller's code as racing to call `register` doesn't make much + // sense. + // + // memory ordering: don't care. a concurrent register() is going + // to succeed and provide proper memory ordering. + // + // We just want to maintain memory safety. It is ok to drop the + // call to `register`. + debug_assert!(state == REGISTERING || state == REGISTERING | WAKING); + } + } + } + + /// Calls `wake` on the last `Waker` passed to `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn wake(&self) { + if let Some(waker) = self.take() { + waker.wake(); + } + } + + /// Returns the last `Waker` passed to `register`, so that the user can wake it. + /// + /// + /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user + /// to take the waker and then wake it separately, rather than performing both steps in one + /// atomic action. + /// + /// If a waker has not been registered, this returns `None`. + pub fn take(&self) -> Option<Waker> { + // AcqRel ordering is used in order to acquire the value of the `task` + // cell as well as to establish a `release` ordering with whatever + // memory the `AtomicWaker` is associated with. + match self.state.fetch_or(WAKING, AcqRel) { + WAITING => { + // The waking lock has been acquired. + let waker = unsafe { (*self.waker.get()).take() }; + + // Release the lock + self.state.fetch_and(!WAKING, Release); + + waker + } + state => { + // There is a concurrent thread currently updating the + // associated task. + // + // Nothing more to do as the `WAKING` bit has been set. It + // doesn't matter if there are concurrent registering threads or + // not. + // + debug_assert!( + state == REGISTERING || state == REGISTERING | WAKING || state == WAKING + ); + None + } + } + } +} + +impl Default for AtomicWaker { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for AtomicWaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AtomicWaker") + } +} + +unsafe impl Send for AtomicWaker {} +unsafe impl Sync for AtomicWaker {} diff --git a/third_party/rust/futures-core/src/task/__internal/mod.rs b/third_party/rust/futures-core/src/task/__internal/mod.rs new file mode 100644 index 0000000000..c902eb4bfb --- /dev/null +++ b/third_party/rust/futures-core/src/task/__internal/mod.rs @@ -0,0 +1,4 @@ +#[cfg(not(futures_no_atomic_cas))] +mod atomic_waker; +#[cfg(not(futures_no_atomic_cas))] +pub use self::atomic_waker::AtomicWaker; diff --git a/third_party/rust/futures-core/src/task/mod.rs b/third_party/rust/futures-core/src/task/mod.rs new file mode 100644 index 0000000000..19e4eaecdd --- /dev/null +++ b/third_party/rust/futures-core/src/task/mod.rs @@ -0,0 +1,10 @@ +//! Task notification. + +#[macro_use] +mod poll; + +#[doc(hidden)] +pub mod __internal; + +#[doc(no_inline)] +pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; diff --git a/third_party/rust/futures-core/src/task/poll.rs b/third_party/rust/futures-core/src/task/poll.rs new file mode 100644 index 0000000000..607e78e060 --- /dev/null +++ b/third_party/rust/futures-core/src/task/poll.rs @@ -0,0 +1,12 @@ +/// Extracts the successful type of a `Poll<T>`. +/// +/// This macro bakes in propagation of `Pending` signals by returning early. +#[macro_export] +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + $crate::task::Poll::Ready(t) => t, + $crate::task::Poll::Pending => return $crate::task::Poll::Pending, + } + }; +} |