summaryrefslogtreecommitdiffstats
path: root/third_party/rust/oneshot-uniffi/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/oneshot-uniffi/src
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/oneshot-uniffi/src')
-rw-r--r--third_party/rust/oneshot-uniffi/src/errors.rs144
-rw-r--r--third_party/rust/oneshot-uniffi/src/lib.rs1193
-rw-r--r--third_party/rust/oneshot-uniffi/src/loombox.rs151
3 files changed, 1488 insertions, 0 deletions
diff --git a/third_party/rust/oneshot-uniffi/src/errors.rs b/third_party/rust/oneshot-uniffi/src/errors.rs
new file mode 100644
index 0000000000..afc48acd03
--- /dev/null
+++ b/third_party/rust/oneshot-uniffi/src/errors.rs
@@ -0,0 +1,144 @@
+use super::{dealloc, Channel};
+use core::fmt;
+use core::mem;
+use core::ptr::NonNull;
+
+/// An error returned when trying to send on a closed channel. Returned from
+/// [`Sender::send`] if the corresponding [`Receiver`] has already been dropped.
+///
+/// The message that could not be sent can be retreived again with [`SendError::into_inner`].
+pub struct SendError<T> {
+ channel_ptr: NonNull<Channel<T>>,
+}
+
+unsafe impl<T: Send> Send for SendError<T> {}
+unsafe impl<T: Sync> Sync for SendError<T> {}
+
+impl<T> SendError<T> {
+ /// # Safety
+ ///
+ /// By calling this function, the caller semantically transfers ownership of the
+ /// channel's resources to the created `SendError`. Thus the caller must ensure that the
+ /// pointer is not used in a way which would violate this ownership transfer. Moreover,
+ /// the caller must assert that the channel contains a valid, initialized message.
+ pub(crate) const unsafe fn new(channel_ptr: NonNull<Channel<T>>) -> Self {
+ Self { channel_ptr }
+ }
+
+ /// Consumes the error and returns the message that failed to be sent.
+ #[inline]
+ pub fn into_inner(self) -> T {
+ let channel_ptr = self.channel_ptr;
+
+ // Don't run destructor if we consumed ourselves. Freeing happens here.
+ mem::forget(self);
+
+ // SAFETY: we have ownership of the channel
+ let channel: &Channel<T> = unsafe { channel_ptr.as_ref() };
+
+ // SAFETY: we know that the message is initialized according to the safety requirements of
+ // `new`
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: we own the channel
+ unsafe { dealloc(channel_ptr) };
+
+ message
+ }
+
+ /// Get a reference to the message that failed to be sent.
+ #[inline]
+ pub fn as_inner(&self) -> &T {
+ unsafe { self.channel_ptr.as_ref().message().assume_init_ref() }
+ }
+}
+
+impl<T> Drop for SendError<T> {
+ fn drop(&mut self) {
+ // SAFETY: we have ownership of the channel and require that the message is initialized
+ // upon construction
+ unsafe {
+ self.channel_ptr.as_ref().drop_message();
+ dealloc(self.channel_ptr);
+ }
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "sending on a closed channel".fmt(f)
+ }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "SendError<{}>(_)", stringify!(T))
+ }
+}
+
+#[cfg(feature = "std")]
+impl<T> std::error::Error for SendError<T> {}
+
+/// An error returned from the indefinitely blocking recv functions on a [`Receiver`].
+///
+/// The recv operation can only fail if the corresponding [`Sender`] was dropped before sending
+/// any message. Or if a message has already been sent and received on the channel.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub struct RecvError;
+
+impl fmt::Display for RecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "receiving on a closed channel".fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for RecvError {}
+
+/// An error returned when trying a non blocking receive on a [`Receiver`].
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub enum TryRecvError {
+ /// The channel is still open, but there was no message present in it.
+ Empty,
+
+ /// The channel is closed. Either the sender was dropped before sending any message, or the
+ /// message has already been extracted from the receiver.
+ Disconnected,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let msg = match self {
+ TryRecvError::Empty => "receiving on an empty channel",
+ TryRecvError::Disconnected => "receiving on a closed channel",
+ };
+ msg.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for TryRecvError {}
+
+/// An error returned when trying a time limited blocking receive on a [`Receiver`].
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub enum RecvTimeoutError {
+ /// No message arrived on the channel before the timeout was reached. The channel is still open.
+ Timeout,
+
+ /// The channel is closed. Either the sender was dropped before sending any message, or the
+ /// message has already been extracted from the receiver.
+ Disconnected,
+}
+
+impl fmt::Display for RecvTimeoutError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let msg = match self {
+ RecvTimeoutError::Timeout => "timed out waiting on channel",
+ RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
+ };
+ msg.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for RecvTimeoutError {}
diff --git a/third_party/rust/oneshot-uniffi/src/lib.rs b/third_party/rust/oneshot-uniffi/src/lib.rs
new file mode 100644
index 0000000000..94bb35d12a
--- /dev/null
+++ b/third_party/rust/oneshot-uniffi/src/lib.rs
@@ -0,0 +1,1193 @@
+//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
+//! can only transport a single message. This has a few nice outcomes. One thing is that
+//! the implementation can be very efficient, utilizing the knowledge that there will
+//! only be one message. But more importantly, it allows the API to be expressed in such
+//! a way that certain edge cases that you don't want to care about when only sending a
+//! single message on a channel does not exist. For example: The sender can't be copied
+//! or cloned, and the send method takes ownership and consumes the sender.
+//! So you are guaranteed, at the type level, that there can only be one message sent.
+//!
+//! The sender's send method is non-blocking, and potentially lock- and wait-free.
+//! See documentation on [Sender::send] for situations where it might not be fully wait-free.
+//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
+//! limited thread blocking receive operations. The receiver also implements `Future` and
+//! supports asynchronously awaiting the message.
+//!
+//!
+//! # Examples
+//!
+//! This example sets up a background worker that processes requests coming in on a standard
+//! mpsc channel and replies on a oneshot channel provided with each request. The worker can
+//! be interacted with both from sync and async contexts since the oneshot receiver
+//! can receive both blocking and async.
+//!
+//! ```rust
+//! use std::sync::mpsc;
+//! use std::thread;
+//! use std::time::Duration;
+//!
+//! type Request = String;
+//!
+//! // Starts a background thread performing some computation on requests sent to it.
+//! // Delivers the response back over a oneshot channel.
+//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
+//! let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
+//! thread::spawn(move || {
+//! for (request_data, response_sender) in request_receiver.iter() {
+//! let compute_operation = || request_data.len();
+//! let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
+//! }
+//! });
+//! request_sender
+//! }
+//!
+//! let processor = spawn_processing_thread();
+//!
+//! // If compiled with `std` the library can receive messages with timeout on regular threads
+//! #[cfg(feature = "std")] {
+//! let (response_sender, response_receiver) = oneshot::channel();
+//! let request = Request::from("data from sync thread");
+//!
+//! processor.send((request, response_sender)).expect("Processor down");
+//! match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
+//! Ok(result) => println!("Processor returned {}", result),
+//! Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
+//! Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
+//! }
+//! }
+//!
+//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
+//! #[cfg(feature = "async")] {
+//! tokio::runtime::Runtime::new()
+//! .unwrap()
+//! .block_on(async move {
+//! let (response_sender, response_receiver) = oneshot::channel();
+//! let request = Request::from("data from sync thread");
+//!
+//! processor.send((request, response_sender)).expect("Processor down");
+//! match response_receiver.await { // <- Receive on the oneshot channel asynchronously
+//! Ok(result) => println!("Processor returned {}", result),
+//! Err(_e) => panic!("Processor exited"),
+//! }
+//! });
+//! }
+//! ```
+//!
+//! # Sync vs async
+//!
+//! The main motivation for writing this library was that there were no (known to me) channel
+//! implementations allowing you to seamlessly send messages between a normal thread and an async
+//! task, or the other way around. If message passing is the way you are communicating, of course
+//! that should work smoothly between the sync and async parts of the program!
+//!
+//! This library achieves that by having a fast and cheap send operation that can
+//! be used in both sync threads and async tasks. The receiver has both thread blocking
+//! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
+//!
+//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
+//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
+//! be possible to use this library with any executor.
+//!
+
+// # Implementation description
+//
+// When a channel is created via the channel function, it creates a single heap allocation
+// containing:
+// * A one byte atomic integer that represents the current channel state,
+// * Uninitialized memory to fit the message,
+// * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
+//
+// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
+// So with all features enabled (the default) each channel allocates 25 bytes plus the size of the
+// message, plus any padding needed to get correct memory alignment.
+//
+// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
+// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
+// be consumed or dropped signal via the state that it is gone. And the second one see this and
+// frees the memory.
+//
+// ## Footnotes
+//
+// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
+// impossible to *wait* for the message. `try_recv` the only available method in this scenario.
+
+#![deny(rust_2018_idioms)]
+#![cfg_attr(not(feature = "std"), no_std)]
+
+#[cfg(not(loom))]
+extern crate alloc;
+
+use core::{
+ marker::PhantomData,
+ mem::{self, MaybeUninit},
+ ptr::{self, NonNull},
+};
+
+#[cfg(not(loom))]
+use core::{
+ cell::UnsafeCell,
+ sync::atomic::{fence, AtomicU8, Ordering::*},
+};
+#[cfg(loom)]
+use loom::{
+ cell::UnsafeCell,
+ sync::atomic::{fence, AtomicU8, Ordering::*},
+};
+
+#[cfg(all(feature = "async", not(loom)))]
+use core::hint;
+#[cfg(all(feature = "async", loom))]
+use loom::hint;
+
+#[cfg(feature = "async")]
+use core::{
+ pin::Pin,
+ task::{self, Poll},
+};
+#[cfg(feature = "std")]
+use std::time::{Duration, Instant};
+
+#[cfg(feature = "std")]
+mod thread {
+ #[cfg(not(loom))]
+ pub use std::thread::{current, park, park_timeout, yield_now, Thread};
+
+ #[cfg(loom)]
+ pub use loom::thread::{current, park, yield_now, Thread};
+
+ // loom does not support parking with a timeout. So we just
+ // yield. This means that the "park" will "spuriously" wake up
+ // way too early. But the code should properly handle this.
+ // One thing to note is that very short timeouts are needed
+ // when using loom, since otherwise the looping will cause
+ // an overflow in loom.
+ #[cfg(loom)]
+ pub fn park_timeout(_timeout: std::time::Duration) {
+ loom::thread::yield_now()
+ }
+}
+
+#[cfg(loom)]
+mod loombox;
+#[cfg(not(loom))]
+use alloc::boxed::Box;
+#[cfg(loom)]
+use loombox::Box;
+
+mod errors;
+pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError};
+
+/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ // Allocate the channel on the heap and get the pointer.
+ // The last endpoint of the channel to be alive is responsible for freeing the channel
+ // and dropping any object that might have been written to it.
+
+ let channel_ptr = Box::into_raw(Box::new(Channel::new()));
+
+ // SAFETY: `channel_ptr` came from a Box and thus is not null
+ let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) };
+
+ (
+ Sender {
+ channel_ptr,
+ _invariant: PhantomData,
+ },
+ Receiver { channel_ptr },
+ )
+}
+
+#[derive(Debug)]
+pub struct Sender<T> {
+ channel_ptr: NonNull<Channel<T>>,
+ // In reality we want contravariance, however we can't obtain that.
+ //
+ // Consider the following scenario:
+ // ```
+ // let (mut tx, rx) = channel::<&'short u8>();
+ // let (tx2, rx2) = channel::<&'long u8>();
+ //
+ // tx = tx2;
+ //
+ // // Pretend short_ref is some &'short u8
+ // tx.send(short_ref).unwrap();
+ // let long_ref = rx2.recv().unwrap();
+ // ```
+ //
+ // If this type were covariant then we could safely extend lifetimes, which is not okay.
+ // Hence, we enforce invariance.
+ _invariant: PhantomData<fn(T) -> T>,
+}
+
+#[derive(Debug)]
+pub struct Receiver<T> {
+ // Covariance is the right choice here. Consider the example presented in Sender, and you'll
+ // see that if we replaced `rx` instead then we would get the expected behavior
+ channel_ptr: NonNull<Channel<T>>,
+}
+
+unsafe impl<T: Send> Send for Sender<T> {}
+unsafe impl<T: Send> Send for Receiver<T> {}
+impl<T> Unpin for Receiver<T> {}
+
+impl<T> Sender<T> {
+ /// Sends `message` over the channel to the corresponding [`Receiver`].
+ ///
+ /// Returns an error if the receiver has already been dropped. The message can
+ /// be extracted from the error.
+ ///
+ /// This method is lock-free and wait-free when sending on a channel that the
+ /// receiver is currently not receiving on. If the receiver is receiving during the send
+ /// operation this method includes waking up the thread/task. Unparking a thread involves
+ /// a mutex in Rust's standard library at the time of writing this.
+ /// How lock-free waking up an async task is
+ /// depends on your executor. If this method returns a `SendError`, please mind that dropping
+ /// the error involves running any drop implementation on the message type, and freeing the
+ /// channel's heap allocation, which might or might not be lock-free.
+ pub fn send(self, message: T) -> Result<(), SendError<T>> {
+ let channel_ptr = self.channel_ptr;
+
+ // Don't run our Drop implementation if send was called, any cleanup now happens here
+ mem::forget(self);
+
+ // SAFETY: The channel exists on the heap for the entire duration of this method and we
+ // only ever acquire shared references to it. Note that if the receiver disconnects it
+ // does not free the channel.
+ let channel = unsafe { channel_ptr.as_ref() };
+
+ // Write the message into the channel on the heap.
+ // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
+ // state, and since we're responsible for setting that state, we can guarantee that we have
+ // exclusive access to this memory location to perform this write.
+ unsafe { channel.write_message(message) };
+
+ // Set the state to signal there is a message on the channel.
+ // ORDERING: we use release ordering to ensure the write of the message is visible to the
+ // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
+ // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
+ // independent of this operation.
+ //
+ // EMPTY + 1 = MESSAGE
+ // RECEIVING + 1 = UNPARKING
+ // DISCONNECTED + 1 = invalid, however this state is never observed
+ match channel.state.fetch_add(1, Release) {
+ // The receiver is alive and has not started waiting. Send done.
+ EMPTY => Ok(()),
+ // The receiver is waiting. Wake it up so it can return the message.
+ RECEIVING => {
+ // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
+ // taking of the waker from being ordered before this operation.
+ fence(Acquire);
+
+ // Take the waker, but critically do not unpark it. If we unparked now, then the
+ // receiving thread could still observe the UNPARKING state and re-park, meaning
+ // that after we change to the MESSAGE state, it would remain parked indefinitely
+ // or until a spurious wakeup.
+ // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
+ // does not access the waker while in this state, nor does it free the channel
+ // allocation in this state.
+ let waker = unsafe { channel.take_waker() };
+
+ // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
+ // in the receiving thread, ensuring that both our read of the waker and write of
+ // the message happen-before the taking of the message and freeing of the channel.
+ // Furthermore, we need acquire ordering to ensure the unparking of the receiver
+ // happens after the channel state is updated.
+ channel.state.swap(MESSAGE, AcqRel);
+
+ // Note: it is possible that between the store above and this statement that
+ // the receiving thread is spuriously unparked, takes the message, and frees
+ // the channel allocation. However, we took ownership of the channel out of
+ // that allocation, and freeing the channel does not drop the waker since the
+ // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
+ // whether or not the receive has completed by this point.
+ waker.unpark();
+
+ Ok(())
+ }
+ // The receiver was already dropped. The error is responsible for freeing the channel.
+ // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
+ // we can transfer exclusive ownership of the channel's resources to the error.
+ // Moreover, since we just placed the message in the channel, the channel contains a
+ // valid message.
+ DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
+ // DISCONNECTED states. If we are in the MESSAGE state, then we called
+ // mem::forget(self), so we should not be in this function call. If we are in the
+ // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
+ // unreachable, or was dropped and observed that our side was still alive, and thus didn't
+ // free the channel.
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // Set the channel state to disconnected and read what state the receiver was in
+ // ORDERING: we don't need release ordering here since there are no modifications we
+ // need to make visible to other thread, and the Err(RECEIVING) branch handles
+ // synchronization independent of this cmpxchg
+ //
+ // EMPTY ^ 001 = DISCONNECTED
+ // RECEIVING ^ 001 = UNPARKING
+ // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
+ match channel.state.fetch_xor(0b001, Relaxed) {
+ // The receiver has not started waiting, nor is it dropped.
+ EMPTY => (),
+ // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
+ RECEIVING => {
+ // See comments in Sender::send
+
+ fence(Acquire);
+
+ let waker = unsafe { channel.take_waker() };
+
+ // We still need release ordering here to make sure our read of the waker happens
+ // before this, and acquire ordering to ensure the unparking of the receiver
+ // happens after this.
+ channel.state.swap(DISCONNECTED, AcqRel);
+
+ // The Acquire ordering above ensures that the write of the DISCONNECTED state
+ // happens-before unparking the receiver.
+ waker.unpark();
+ }
+ // The receiver was already dropped. We are responsible for freeing the channel.
+ DISCONNECTED => {
+ // SAFETY: when the receiver switches the state to DISCONNECTED they have received
+ // the message or will no longer be trying to receive the message, and have
+ // observed that the sender is still alive, meaning that we're responsible for
+ // freeing the channel allocation.
+ unsafe { dealloc(self.channel_ptr) };
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Checks if there is a message in the channel without blocking. Returns:
+ /// * `Ok(message)` if there was a message in the channel.
+ /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
+ /// * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
+ /// message has already been extracted by a previous receive call.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// This method is completely lock-free and wait-free. The only thing it does is an atomic
+ /// integer load of the channel state. And if there is a message in the channel it additionally
+ /// performs one atomic integer store and copies the message from the heap to the stack for
+ /// returning it.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ // SAFETY: The channel will not be freed while this method is still running.
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // ORDERING: we use acquire ordering to synchronize with the store of the message.
+ match channel.state.load(Acquire) {
+ MESSAGE => {
+ // It's okay to break up the load and store since once we're in the message state
+ // the sender no longer modifies the state
+ // ORDERING: at this point the sender has done its job and is no longer active, so
+ // we don't need to make any side effects visible to it
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we are in the MESSAGE state so the message is present
+ Ok(unsafe { channel.take_message() })
+ }
+ EMPTY => Err(TryRecvError::Empty),
+ DISCONNECTED => Err(TryRecvError::Disconnected),
+ #[cfg(feature = "async")]
+ RECEIVING | UNPARKING => Err(TryRecvError::Empty),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
+ /// disconnected.
+ ///
+ /// This method will always block the current thread if there is no data available and it is
+ /// still possible for the message to be sent. Once the message is sent to the corresponding
+ /// [`Sender`], then this receiver will wake up and return that message.
+ ///
+ /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
+ /// this call is blocking, this call will wake up and return `Err` to indicate that the message
+ /// can never be received on this channel.
+ ///
+ /// If a sent message has already been extracted from this channel this method will return an
+ /// error.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv(self) -> Result<T, RecvError> {
+ // Note that we don't need to worry about changing the state to disconnected or setting the
+ // state to an invalid value at any point in this function because we take ownership of
+ // self, and this function does not exit until the message has been received or both side
+ // of the channel are inactive and cleaned up.
+
+ let channel_ptr = self.channel_ptr;
+
+ // Don't run our Drop implementation if we are receiving consuming ourselves.
+ mem::forget(self);
+
+ // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
+ // is still alive, meaning that even if the sender was dropped then it would have observed
+ // the fact that we're still alive and left the responsibility of deallocating the
+ // channel to us, so channel_ptr is valid
+ let channel = unsafe { channel_ptr.as_ref() };
+
+ // ORDERING: we use acquire ordering to synchronize with the write of the message in the
+ // case that it's available
+ match channel.state.load(Acquire) {
+ // The sender is alive but has not sent anything yet. We prepare to park.
+ EMPTY => {
+ // Conditionally add a delay here to help the tests trigger the edge cases where
+ // the sender manages to be dropped or send something before we are able to store
+ // our waker object in the channel.
+ #[cfg(oneshot_test_delay)]
+ std::thread::sleep(std::time::Duration::from_millis(10));
+
+ // Write our waker instance to the channel.
+ // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
+ // try to access the waker until it sees the state set to RECEIVING below
+ unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
+
+ // Switch the state to RECEIVING. We need to do this in one atomic step in case the
+ // sender disconnected or sent the message while we wrote the waker to memory. We
+ // don't need to do a compare exchange here however because if the original state
+ // was not EMPTY, then the sender has either finished sending the message or is
+ // being dropped, so the RECEIVING state will never be observed after we return.
+ // ORDERING: we use release ordering so the sender can synchronize with our writing
+ // of the waker to memory. The individual branches handle any additional
+ // synchronizaton
+ match channel.state.swap(RECEIVING, Release) {
+ // We stored our waker, now we park until the sender has changed the state
+ EMPTY => loop {
+ thread::park();
+
+ // ORDERING: synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender sent the message while we were parked.
+ MESSAGE => {
+ // SAFETY: we are in the message state so the message is valid
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: the Sender delegates the responsibility of deallocating
+ // the channel to us upon sending the message
+ unsafe { dealloc(channel_ptr) };
+
+ break Ok(message);
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => {
+ // SAFETY: the Sender doesn't deallocate the channel allocation in
+ // its drop implementation if we're receiving
+ unsafe { dealloc(channel_ptr) };
+
+ break Err(RecvError);
+ }
+ // State did not change, spurious wakeup, park again.
+ RECEIVING | UNPARKING => (),
+ _ => unreachable!(),
+ }
+ },
+ // The sender sent the message while we prepared to park.
+ MESSAGE => {
+ // ORDERING: Synchronize with the write of the message. This branch is
+ // unlikely to be taken, so it's likely more efficient to use a fence here
+ // instead of AcqRel ordering on the RMW operation
+ fence(Acquire);
+
+ // SAFETY: we started in the empty state and the sender switched us to the
+ // message state. This means that it did not take the waker, so we're
+ // responsible for dropping it.
+ unsafe { channel.drop_waker() };
+
+ // SAFETY: we are in the message state so the message is valid
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: the Sender delegates the responsibility of deallocating the
+ // channel to us upon sending the message
+ unsafe { dealloc(channel_ptr) };
+
+ Ok(message)
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ DISCONNECTED => {
+ // SAFETY: we started in the empty state and the sender switched us to the
+ // disconnected state. It does not take the waker when it does this so we
+ // need to drop it.
+ unsafe { channel.drop_waker() };
+
+ // SAFETY: the sender does not deallocate the channel if it switches from
+ // empty to disconnected so we need to free the allocation
+ unsafe { dealloc(channel_ptr) };
+
+ Err(RecvError)
+ }
+ _ => unreachable!(),
+ }
+ }
+ // The sender already sent the message.
+ MESSAGE => {
+ // SAFETY: we are in the message state so the message is valid
+ let message = unsafe { channel.take_message() };
+
+ // SAFETY: we are already in the message state so the sender has been forgotten
+ // and it's our job to clean up resources
+ unsafe { dealloc(channel_ptr) };
+
+ Ok(message)
+ }
+ // The sender was dropped before sending anything, or we already received the message.
+ DISCONNECTED => {
+ // SAFETY: the sender does not deallocate the channel if it switches from empty to
+ // disconnected so we need to free the allocation
+ unsafe { dealloc(channel_ptr) };
+
+ Err(RecvError)
+ }
+ // The receiver must have been `Future::poll`ed prior to this call.
+ #[cfg(feature = "async")]
+ RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
+ /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
+ /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv_ref(&self) -> Result<T, RecvError> {
+ self.start_recv_ref(RecvError, |channel| {
+ loop {
+ thread::park();
+
+ // ORDERING: we use acquire ordering to synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender sent the message while we were parked.
+ // We take the message and mark the channel disconnected.
+ MESSAGE => {
+ // ORDERING: the sender is inactive at this point so we don't need to make
+ // any reads or writes visible to the sending thread
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we were just in the message state so the message is valid
+ break Ok(unsafe { channel.take_message() });
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => break Err(RecvError),
+ // State did not change, spurious wakeup, park again.
+ RECEIVING | UNPARKING => (),
+ _ => unreachable!(),
+ }
+ }
+ })
+ }
+
+ /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
+ /// * `Ok(message)` if there was a message in the channel before the timeout was reached.
+ /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
+ /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
+ /// has already been extracted by a previous receive call.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
+ /// in the future this falls back to an indefinitely blocking receive operation.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.recv_deadline(deadline),
+ None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
+ }
+ }
+
+ /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
+ /// * `Ok(message)` if there was a message in the channel before the deadline was reached.
+ /// * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
+ /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
+ /// has already been extracted by a previous receive call.
+ ///
+ /// If a message is returned, the channel is disconnected and any subsequent receive operation
+ /// using this receiver will return an error.
+ ///
+ /// # Panics
+ ///
+ /// Panics if called after this receiver has been polled asynchronously.
+ #[cfg(feature = "std")]
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ /// # Safety
+ ///
+ /// If the sender is unparking us after a message send, the message must already have been
+ /// written to the channel and an acquire memory barrier issued before calling this function
+ #[cold]
+ unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
+ loop {
+ thread::park();
+
+ // ORDERING: The callee has already synchronized with any message write
+ match channel.state.load(Relaxed) {
+ MESSAGE => {
+ // ORDERING: the sender has been dropped, so this update only
+ // needs to be visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+ break Ok(channel.take_message());
+ }
+ DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
+ // The sender is still unparking us. We continue on the empty state here since
+ // the current implementation eagerly sets the state to EMPTY upon timeout.
+ EMPTY => (),
+ _ => unreachable!(),
+ }
+ }
+ }
+
+ self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
+ loop {
+ match deadline.checked_duration_since(Instant::now()) {
+ Some(timeout) => {
+ thread::park_timeout(timeout);
+
+ // ORDERING: synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender sent the message while we were parked.
+ MESSAGE => {
+ // ORDERING: the sender has been `mem::forget`-ed so this update
+ // only needs to be visible to us.
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we either are in the message state or were just in the
+ // message state
+ break Ok(unsafe { channel.take_message() });
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
+ // State did not change, spurious wakeup, park again.
+ RECEIVING | UNPARKING => (),
+ _ => unreachable!(),
+ }
+ }
+ None => {
+ // ORDERING: synchronize with the write of the message
+ match channel.state.swap(EMPTY, Acquire) {
+ // We reached the end of the timeout without receiving a message
+ RECEIVING => {
+ // SAFETY: we were in the receiving state and are now in the empty
+ // state, so the sender has not and will not try to read the waker,
+ // so we have exclusive access to drop it.
+ unsafe { channel.drop_waker() };
+
+ break Err(RecvTimeoutError::Timeout);
+ }
+ // The sender sent the message while we were parked.
+ MESSAGE => {
+ // Same safety and ordering as the Some branch
+
+ channel.state.store(DISCONNECTED, Relaxed);
+ break Ok(unsafe { channel.take_message() });
+ }
+ // The sender was dropped while we were parked.
+ DISCONNECTED => {
+ // ORDERING: we were originally in the disconnected state meaning
+ // that the sender is inactive and no longer observing the state,
+ // so we only need to change it back to DISCONNECTED for if the
+ // receiver is dropped or a recv* method is called again
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ break Err(RecvTimeoutError::Disconnected);
+ }
+ // The sender sent the message and started unparking us
+ UNPARKING => {
+ // We were in the UNPARKING state and are now in the EMPTY state.
+ // We wait to be properly unparked and to observe if the sender
+ // sets MESSAGE or DISCONNECTED state.
+ // SAFETY: The load above has synchronized with any message write.
+ break unsafe { wait_for_unpark(channel) };
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+ })
+ }
+
+ /// Begins the process of receiving on the channel by reference. If the message is already
+ /// ready, or the sender has disconnected, then this function will return the appropriate
+ /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
+ /// sender has finished or disconnected again, and then will call `finish`. `finish` is
+ /// thus responsible for cleaning up the channel's resources appropriately before it returns,
+ /// such as destroying the waker, for instance.
+ #[cfg(feature = "std")]
+ #[inline]
+ fn start_recv_ref<E>(
+ &self,
+ disconnected_error: E,
+ finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
+ ) -> Result<T, E> {
+ // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
+ // is still alive, meaning that even if the sender was dropped then it would have observed
+ // the fact that we're still alive and left the responsibility of deallocating the
+ // channel to us, so `self.channel` is valid
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // ORDERING: synchronize with the write of the message
+ match channel.state.load(Acquire) {
+ // The sender is alive but has not sent anything yet. We prepare to park.
+ EMPTY => {
+ // Conditionally add a delay here to help the tests trigger the edge cases where
+ // the sender manages to be dropped or send something before we are able to store
+ // our waker object in the channel.
+ #[cfg(oneshot_test_delay)]
+ std::thread::sleep(std::time::Duration::from_millis(10));
+
+ // Write our waker instance to the channel.
+ // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
+ // try to access the waker until it sees the state set to RECEIVING below
+ unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
+
+ // ORDERING: we use release ordering on success so the sender can synchronize with
+ // our write of the waker. We use relaxed ordering on failure since the sender does
+ // not need to synchronize with our write and the individual match arms handle any
+ // additional synchronization
+ match channel
+ .state
+ .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
+ {
+ // We stored our waker, now we delegate to the callback to finish the receive
+ // operation
+ Ok(_) => finish(channel),
+ // The sender sent the message while we prepared to finish
+ Err(MESSAGE) => {
+ // See comments in `recv` for ordering and safety
+
+ fence(Acquire);
+
+ unsafe { channel.drop_waker() };
+
+ // ORDERING: the sender has been `mem::forget`-ed so this update only
+ // needs to be visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: The MESSAGE state tells us there is a correctly initialized
+ // message
+ Ok(unsafe { channel.take_message() })
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ Err(DISCONNECTED) => {
+ // See comments in `recv` for safety
+ unsafe { channel.drop_waker() };
+ Err(disconnected_error)
+ }
+ _ => unreachable!(),
+ }
+ }
+ // The sender sent the message. We take the message and mark the channel disconnected.
+ MESSAGE => {
+ // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
+ // visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: we are in the message state so the message is valid
+ Ok(unsafe { channel.take_message() })
+ }
+ // The sender was dropped before sending anything, or we already received the message.
+ DISCONNECTED => Err(disconnected_error),
+ // The receiver must have been `Future::poll`ed prior to this call.
+ #[cfg(feature = "async")]
+ RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
+ _ => unreachable!(),
+ }
+ }
+}
+
+#[cfg(feature = "async")]
+impl<T> core::future::Future for Receiver<T> {
+ type Output = Result<T, RecvError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
+ // is still alive, meaning that even if the sender was dropped then it would have observed
+ // the fact that we're still alive and left the responsibility of deallocating the
+ // channel to us, so `self.channel` is valid
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // ORDERING: we use acquire ordering to synchronize with the store of the message.
+ match channel.state.load(Acquire) {
+ // The sender is alive but has not sent anything yet.
+ EMPTY => {
+ // SAFETY: We can't be in the forbidden states, and no waker in the channel.
+ unsafe { channel.write_async_waker(cx) }
+ }
+ // We were polled again while waiting for the sender. Replace the waker with the new one.
+ RECEIVING => {
+ // ORDERING: We use relaxed ordering on both success and failure since we have not
+ // written anything above that must be released, and the individual match arms
+ // handle any additional synchronization.
+ match channel
+ .state
+ .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
+ {
+ // We successfully changed the state back to EMPTY. Replace the waker.
+ // This is the most likely branch to be taken, which is why we don't use any
+ // memory barriers in the compare_exchange above.
+ Ok(_) => {
+ // SAFETY: We wrote the waker in a previous call to poll. We do not need
+ // a memory barrier since the previous write here was by ourselves.
+ unsafe { channel.drop_waker() };
+ // SAFETY: We can't be in the forbidden states, and no waker in the channel.
+ unsafe { channel.write_async_waker(cx) }
+ }
+ // The sender sent the message while we prepared to replace the waker.
+ // We take the message and mark the channel disconnected.
+ // The sender has already taken the waker.
+ Err(MESSAGE) => {
+ // ORDERING: Synchronize with the write of the message. This branch is
+ // unlikely to be taken.
+ channel.state.swap(DISCONNECTED, Acquire);
+ // SAFETY: The state tells us the sender has initialized the message.
+ Poll::Ready(Ok(unsafe { channel.take_message() }))
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ // The sender has taken the waker already.
+ Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
+ // The sender is currently waking us up.
+ Err(UNPARKING) => {
+ // We can't trust that the old waker that the sender has access to
+ // is honored by the async runtime at this point. So we wake ourselves
+ // up to get polled instantly again.
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ _ => unreachable!(),
+ }
+ }
+ // The sender sent the message.
+ MESSAGE => {
+ // ORDERING: the sender has been dropped so this update only needs to be
+ // visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+ Poll::Ready(Ok(unsafe { channel.take_message() }))
+ }
+ // The sender was dropped before sending anything, or we already received the message.
+ DISCONNECTED => Poll::Ready(Err(RecvError)),
+ // The sender has observed the RECEIVING state and is currently reading the waker from
+ // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
+ // state. We busy loop here since we know the sender is done very soon.
+ UNPARKING => loop {
+ hint::spin_loop();
+ // ORDERING: The load above has already synchronized with the write of the message.
+ match channel.state.load(Relaxed) {
+ MESSAGE => {
+ // ORDERING: the sender has been dropped, so this update only
+ // needs to be visible to us
+ channel.state.store(DISCONNECTED, Relaxed);
+ // SAFETY: We observed the MESSAGE state
+ break Poll::Ready(Ok(unsafe { channel.take_message() }));
+ }
+ DISCONNECTED => break Poll::Ready(Err(RecvError)),
+ UNPARKING => (),
+ _ => unreachable!(),
+ }
+ },
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ // SAFETY: since the receiving side is still alive the sender would have observed that and
+ // left deallocating the channel allocation to us.
+ let channel = unsafe { self.channel_ptr.as_ref() };
+
+ // Set the channel state to disconnected and read what state the receiver was in
+ match channel.state.swap(DISCONNECTED, Acquire) {
+ // The sender has not sent anything, nor is it dropped.
+ EMPTY => (),
+ // The sender already sent something. We must drop it, and free the channel.
+ MESSAGE => {
+ // SAFETY: we are in the message state so the message is initialized
+ unsafe { channel.drop_message() };
+
+ // SAFETY: see safety comment at top of function
+ unsafe { dealloc(self.channel_ptr) };
+ }
+ // The receiver has been polled.
+ #[cfg(feature = "async")]
+ RECEIVING => {
+ // TODO: figure this out when async is fixed
+ unsafe { channel.drop_waker() };
+ }
+ // The sender was already dropped. We are responsible for freeing the channel.
+ DISCONNECTED => {
+ // SAFETY: see safety comment at top of function
+ unsafe { dealloc(self.channel_ptr) };
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+/// All the values that the `Channel::state` field can have during the lifetime of a channel.
+mod states {
+ // These values are very explicitly chosen so that we can replace some cmpxchg calls with
+ // fetch_* calls.
+
+ /// The initial channel state. Active while both endpoints are still alive, no message has been
+ /// sent, and the receiver is not receiving.
+ pub const EMPTY: u8 = 0b011;
+ /// A message has been sent to the channel, but the receiver has not yet read it.
+ pub const MESSAGE: u8 = 0b100;
+ /// No message has yet been sent on the channel, but the receiver is currently receiving.
+ pub const RECEIVING: u8 = 0b000;
+ #[cfg(any(feature = "std", feature = "async"))]
+ pub const UNPARKING: u8 = 0b001;
+ /// The channel has been closed. This means that either the sender or receiver has been dropped,
+ /// or the message sent to the channel has already been received. Since this is a oneshot
+ /// channel, it is disconnected after the one message it is supposed to hold has been
+ /// transmitted.
+ pub const DISCONNECTED: u8 = 0b010;
+}
+use states::*;
+
+/// Internal channel data structure structure. the `channel` method allocates and puts one instance
+/// of this struct on the heap for each oneshot channel instance. The struct holds:
+/// * The current state of the channel.
+/// * The message in the channel. This memory is uninitialized until the message is sent.
+/// * The waker instance for the thread or task that is currently receiving on this channel.
+/// This memory is uninitialized until the receiver starts receiving.
+struct Channel<T> {
+ state: AtomicU8,
+ message: UnsafeCell<MaybeUninit<T>>,
+ waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
+}
+
+impl<T> Channel<T> {
+ pub fn new() -> Self {
+ Self {
+ state: AtomicU8::new(EMPTY),
+ message: UnsafeCell::new(MaybeUninit::uninit()),
+ waker: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn message(&self) -> &MaybeUninit<T> {
+ #[cfg(loom)]
+ {
+ self.message.with(|ptr| &*ptr)
+ }
+
+ #[cfg(not(loom))]
+ {
+ &*self.message.get()
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn with_message_mut<F>(&self, op: F)
+ where
+ F: FnOnce(&mut MaybeUninit<T>),
+ {
+ #[cfg(loom)]
+ {
+ self.message.with_mut(|ptr| op(&mut *ptr))
+ }
+
+ #[cfg(not(loom))]
+ {
+ op(&mut *self.message.get())
+ }
+ }
+
+ #[inline(always)]
+ #[cfg(any(feature = "std", feature = "async"))]
+ unsafe fn with_waker_mut<F>(&self, op: F)
+ where
+ F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
+ {
+ #[cfg(loom)]
+ {
+ self.waker.with_mut(|ptr| op(&mut *ptr))
+ }
+
+ #[cfg(not(loom))]
+ {
+ op(&mut *self.waker.get())
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn write_message(&self, message: T) {
+ self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
+ }
+
+ #[inline(always)]
+ unsafe fn take_message(&self) -> T {
+ #[cfg(loom)]
+ {
+ self.message.with(|ptr| ptr::read(ptr)).assume_init()
+ }
+
+ #[cfg(not(loom))]
+ {
+ ptr::read(self.message.get()).assume_init()
+ }
+ }
+
+ #[inline(always)]
+ unsafe fn drop_message(&self) {
+ self.with_message_mut(|slot| slot.assume_init_drop());
+ }
+
+ #[cfg(any(feature = "std", feature = "async"))]
+ #[inline(always)]
+ unsafe fn write_waker(&self, waker: ReceiverWaker) {
+ self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
+ }
+
+ #[inline(always)]
+ unsafe fn take_waker(&self) -> ReceiverWaker {
+ #[cfg(loom)]
+ {
+ self.waker.with(|ptr| ptr::read(ptr)).assume_init()
+ }
+
+ #[cfg(not(loom))]
+ {
+ ptr::read(self.waker.get()).assume_init()
+ }
+ }
+
+ #[cfg(any(feature = "std", feature = "async"))]
+ #[inline(always)]
+ unsafe fn drop_waker(&self) {
+ self.with_waker_mut(|slot| slot.assume_init_drop());
+ }
+
+ /// # Safety
+ ///
+ /// * `Channel::waker` must not have a waker stored in it when calling this method.
+ /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
+ #[cfg(feature = "async")]
+ unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
+ // Write our thread instance to the channel.
+ // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
+ // try to access the waker until it sees the state set to RECEIVING below
+ self.write_waker(ReceiverWaker::task_waker(cx));
+
+ // ORDERING: we use release ordering on success so the sender can synchronize with
+ // our write of the waker. We use relaxed ordering on failure since the sender does
+ // not need to synchronize with our write and the individual match arms handle any
+ // additional synchronization
+ match self
+ .state
+ .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
+ {
+ // We stored our waker, now we return and let the sender wake us up
+ Ok(_) => Poll::Pending,
+ // The sender sent the message while we prepared to park.
+ // We take the message and mark the channel disconnected.
+ Err(MESSAGE) => {
+ // ORDERING: Synchronize with the write of the message. This branch is
+ // unlikely to be taken, so it's likely more efficient to use a fence here
+ // instead of AcqRel ordering on the compare_exchange operation
+ fence(Acquire);
+
+ // SAFETY: we started in the EMPTY state and the sender switched us to the
+ // MESSAGE state. This means that it did not take the waker, so we're
+ // responsible for dropping it.
+ self.drop_waker();
+
+ // ORDERING: sender does not exist, so this update only needs to be visible to us
+ self.state.store(DISCONNECTED, Relaxed);
+
+ // SAFETY: The MESSAGE state tells us there is a correctly initialized message
+ Poll::Ready(Ok(self.take_message()))
+ }
+ // The sender was dropped before sending anything while we prepared to park.
+ Err(DISCONNECTED) => {
+ // SAFETY: we started in the EMPTY state and the sender switched us to the
+ // DISCONNECTED state. This means that it did not take the waker, so we're
+ // responsible for dropping it.
+ self.drop_waker();
+ Poll::Ready(Err(RecvError))
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+enum ReceiverWaker {
+ /// The receiver is waiting synchronously. Its thread is parked.
+ #[cfg(feature = "std")]
+ Thread(thread::Thread),
+ /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
+ #[cfg(feature = "async")]
+ Task(task::Waker),
+ /// A little hack to not make this enum an uninhibitable type when no features are enabled.
+ #[cfg(not(any(feature = "async", feature = "std")))]
+ _Uninhabited,
+}
+
+impl ReceiverWaker {
+ #[cfg(feature = "std")]
+ pub fn current_thread() -> Self {
+ Self::Thread(thread::current())
+ }
+
+ #[cfg(feature = "async")]
+ pub fn task_waker(cx: &task::Context<'_>) -> Self {
+ Self::Task(cx.waker().clone())
+ }
+
+ pub fn unpark(self) {
+ match self {
+ #[cfg(feature = "std")]
+ ReceiverWaker::Thread(thread) => thread.unpark(),
+ #[cfg(feature = "async")]
+ ReceiverWaker::Task(waker) => waker.wake(),
+ #[cfg(not(any(feature = "async", feature = "std")))]
+ ReceiverWaker::_Uninhabited => unreachable!(),
+ }
+ }
+}
+
+#[cfg(not(loom))]
+#[test]
+fn receiver_waker_size() {
+ let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
+ (false, false) => 0,
+ (false, true) => 16,
+ (true, false) => 8,
+ (true, true) => 24,
+ };
+ assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
+}
+
+#[cfg(all(feature = "std", feature = "async"))]
+const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
+ "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
+
+#[inline]
+pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
+ drop(Box::from_raw(channel.as_ptr()))
+}
diff --git a/third_party/rust/oneshot-uniffi/src/loombox.rs b/third_party/rust/oneshot-uniffi/src/loombox.rs
new file mode 100644
index 0000000000..615db30174
--- /dev/null
+++ b/third_party/rust/oneshot-uniffi/src/loombox.rs
@@ -0,0 +1,151 @@
+use core::{borrow, fmt, hash, mem, ptr};
+use loom::alloc;
+
+pub struct Box<T: ?Sized> {
+ ptr: *mut T,
+}
+
+impl<T> Box<T> {
+ pub fn new(value: T) -> Self {
+ let layout = alloc::Layout::new::<T>();
+ let ptr = unsafe { alloc::alloc(layout) } as *mut T;
+ unsafe { ptr::write(ptr, value) };
+ Self { ptr }
+ }
+}
+
+impl<T: ?Sized> Box<T> {
+ #[inline]
+ pub fn into_raw(b: Box<T>) -> *mut T {
+ let ptr = b.ptr;
+ mem::forget(b);
+ ptr
+ }
+
+ pub const unsafe fn from_raw(ptr: *mut T) -> Box<T> {
+ Self { ptr }
+ }
+}
+
+impl<T: ?Sized> Drop for Box<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let size = mem::size_of_val(&*self.ptr);
+ let align = mem::align_of_val(&*self.ptr);
+ let layout = alloc::Layout::from_size_align(size, align).unwrap();
+ ptr::drop_in_place(self.ptr);
+ alloc::dealloc(self.ptr as *mut u8, layout);
+ }
+ }
+}
+
+unsafe impl<T: Send> Send for Box<T> {}
+unsafe impl<T: Sync> Sync for Box<T> {}
+
+impl<T: ?Sized> core::ops::Deref for Box<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &*self.ptr }
+ }
+}
+
+impl<T: ?Sized> core::ops::DerefMut for Box<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.ptr }
+ }
+}
+
+impl<T: ?Sized> borrow::Borrow<T> for Box<T> {
+ fn borrow(&self) -> &T {
+ &**self
+ }
+}
+
+impl<T: ?Sized> borrow::BorrowMut<T> for Box<T> {
+ fn borrow_mut(&mut self) -> &mut T {
+ &mut **self
+ }
+}
+
+impl<T: ?Sized> AsRef<T> for Box<T> {
+ fn as_ref(&self) -> &T {
+ &**self
+ }
+}
+
+impl<T: ?Sized> AsMut<T> for Box<T> {
+ fn as_mut(&mut self) -> &mut T {
+ &mut **self
+ }
+}
+
+impl<T: fmt::Display + ?Sized> fmt::Display for Box<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&**self, f)
+ }
+}
+
+impl<T: fmt::Debug + ?Sized> fmt::Debug for Box<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&**self, f)
+ }
+}
+
+impl<T: Clone> Clone for Box<T> {
+ #[inline]
+ fn clone(&self) -> Box<T> {
+ Self::new(self.as_ref().clone())
+ }
+}
+
+impl<T: ?Sized + PartialEq> PartialEq for Box<T> {
+ #[inline]
+ fn eq(&self, other: &Box<T>) -> bool {
+ PartialEq::eq(&**self, &**other)
+ }
+
+ #[allow(clippy::partialeq_ne_impl)]
+ #[inline]
+ fn ne(&self, other: &Box<T>) -> bool {
+ PartialEq::ne(&**self, &**other)
+ }
+}
+
+impl<T: ?Sized + Eq> Eq for Box<T> {}
+
+impl<T: ?Sized + PartialOrd> PartialOrd for Box<T> {
+ #[inline]
+ fn partial_cmp(&self, other: &Box<T>) -> Option<core::cmp::Ordering> {
+ PartialOrd::partial_cmp(&**self, &**other)
+ }
+ #[inline]
+ fn lt(&self, other: &Box<T>) -> bool {
+ PartialOrd::lt(&**self, &**other)
+ }
+ #[inline]
+ fn le(&self, other: &Box<T>) -> bool {
+ PartialOrd::le(&**self, &**other)
+ }
+ #[inline]
+ fn ge(&self, other: &Box<T>) -> bool {
+ PartialOrd::ge(&**self, &**other)
+ }
+ #[inline]
+ fn gt(&self, other: &Box<T>) -> bool {
+ PartialOrd::gt(&**self, &**other)
+ }
+}
+
+impl<T: ?Sized + Ord> Ord for Box<T> {
+ #[inline]
+ fn cmp(&self, other: &Box<T>) -> core::cmp::Ordering {
+ Ord::cmp(&**self, &**other)
+ }
+}
+
+impl<T: ?Sized + hash::Hash> hash::Hash for Box<T> {
+ fn hash<H: hash::Hasher>(&self, state: &mut H) {
+ (**self).hash(state);
+ }
+}