summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/oneshot.rs')
-rw-r--r--third_party/rust/tokio/src/sync/oneshot.rs1366
1 files changed, 1366 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/oneshot.rs b/third_party/rust/tokio/src/sync/oneshot.rs
new file mode 100644
index 0000000000..2240074e73
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/oneshot.rs
@@ -0,0 +1,1366 @@
+#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
+
+//! A one-shot channel is used for sending a single message between
+//! asynchronous tasks. The [`channel`] function is used to create a
+//! [`Sender`] and [`Receiver`] handle pair that form the channel.
+//!
+//! The `Sender` handle is used by the producer to send the value.
+//! The `Receiver` handle is used by the consumer to receive the value.
+//!
+//! Each handle can be used on separate tasks.
+//!
+//! Since the `send` method is not async, it can be used anywhere. This includes
+//! sending between two runtimes, and using it from non-async code.
+//!
+//! # Examples
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let (tx, rx) = oneshot::channel();
+//!
+//! tokio::spawn(async move {
+//! if let Err(_) = tx.send(3) {
+//! println!("the receiver dropped");
+//! }
+//! });
+//!
+//! match rx.await {
+//! Ok(v) => println!("got = {:?}", v),
+//! Err(_) => println!("the sender dropped"),
+//! }
+//! }
+//! ```
+//!
+//! If the sender is dropped without sending, the receiver will fail with
+//! [`error::RecvError`]:
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let (tx, rx) = oneshot::channel::<u32>();
+//!
+//! tokio::spawn(async move {
+//! drop(tx);
+//! });
+//!
+//! match rx.await {
+//! Ok(_) => panic!("This doesn't happen"),
+//! Err(_) => println!("the sender dropped"),
+//! }
+//! }
+//! ```
+//!
+//! To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of
+//! the channel.
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//! use tokio::time::{interval, sleep, Duration};
+//!
+//! #[tokio::main]
+//! # async fn _doc() {}
+//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
+//! async fn main() {
+//! let (send, mut recv) = oneshot::channel();
+//! let mut interval = interval(Duration::from_millis(100));
+//!
+//! # let handle =
+//! tokio::spawn(async move {
+//! sleep(Duration::from_secs(1)).await;
+//! send.send("shut down").unwrap();
+//! });
+//!
+//! loop {
+//! tokio::select! {
+//! _ = interval.tick() => println!("Another 100ms"),
+//! msg = &mut recv => {
+//! println!("Got message: {}", msg.unwrap());
+//! break;
+//! }
+//! }
+//! }
+//! # handle.await.unwrap();
+//! }
+//! ```
+//!
+//! To use a `Sender` from a destructor, put it in an [`Option`] and call
+//! [`Option::take`].
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//!
+//! struct SendOnDrop {
+//! sender: Option<oneshot::Sender<&'static str>>,
+//! }
+//! impl Drop for SendOnDrop {
+//! fn drop(&mut self) {
+//! if let Some(sender) = self.sender.take() {
+//! // Using `let _ =` to ignore send errors.
+//! let _ = sender.send("I got dropped!");
+//! }
+//! }
+//! }
+//!
+//! #[tokio::main]
+//! # async fn _doc() {}
+//! # #[tokio::main(flavor = "current_thread")]
+//! async fn main() {
+//! let (send, recv) = oneshot::channel();
+//!
+//! let send_on_drop = SendOnDrop { sender: Some(send) };
+//! drop(send_on_drop);
+//!
+//! assert_eq!(recv.await, Ok("I got dropped!"));
+//! }
+//! ```
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Arc;
+#[cfg(all(tokio_unstable, feature = "tracing"))]
+use crate::util::trace;
+
+use std::fmt;
+use std::future::Future;
+use std::mem::MaybeUninit;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire};
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll, Waker};
+
+/// Sends a value to the associated [`Receiver`].
+///
+/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
+/// [`channel`](fn@channel) function.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel();
+///
+/// tokio::spawn(async move {
+/// if let Err(_) = tx.send(3) {
+/// println!("the receiver dropped");
+/// }
+/// });
+///
+/// match rx.await {
+/// Ok(v) => println!("got = {:?}", v),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+///
+/// If the sender is dropped without sending, the receiver will fail with
+/// [`error::RecvError`]:
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel::<u32>();
+///
+/// tokio::spawn(async move {
+/// drop(tx);
+/// });
+///
+/// match rx.await {
+/// Ok(_) => panic!("This doesn't happen"),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+///
+/// To use a `Sender` from a destructor, put it in an [`Option`] and call
+/// [`Option::take`].
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// struct SendOnDrop {
+/// sender: Option<oneshot::Sender<&'static str>>,
+/// }
+/// impl Drop for SendOnDrop {
+/// fn drop(&mut self) {
+/// if let Some(sender) = self.sender.take() {
+/// // Using `let _ =` to ignore send errors.
+/// let _ = sender.send("I got dropped!");
+/// }
+/// }
+/// }
+///
+/// #[tokio::main]
+/// # async fn _doc() {}
+/// # #[tokio::main(flavor = "current_thread")]
+/// async fn main() {
+/// let (send, recv) = oneshot::channel();
+///
+/// let send_on_drop = SendOnDrop { sender: Some(send) };
+/// drop(send_on_drop);
+///
+/// assert_eq!(recv.await, Ok("I got dropped!"));
+/// }
+/// ```
+///
+/// [`Option`]: std::option::Option
+/// [`Option::take`]: std::option::Option::take
+#[derive(Debug)]
+pub struct Sender<T> {
+ inner: Option<Arc<Inner<T>>>,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
+}
+
+/// Receives a value from the associated [`Sender`].
+///
+/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
+/// [`channel`](fn@channel) function.
+///
+/// This channel has no `recv` method because the receiver itself implements the
+/// [`Future`] trait. To receive a value, `.await` the `Receiver` object directly.
+///
+/// [`Future`]: trait@std::future::Future
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel();
+///
+/// tokio::spawn(async move {
+/// if let Err(_) = tx.send(3) {
+/// println!("the receiver dropped");
+/// }
+/// });
+///
+/// match rx.await {
+/// Ok(v) => println!("got = {:?}", v),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+///
+/// If the sender is dropped without sending, the receiver will fail with
+/// [`error::RecvError`]:
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel::<u32>();
+///
+/// tokio::spawn(async move {
+/// drop(tx);
+/// });
+///
+/// match rx.await {
+/// Ok(_) => panic!("This doesn't happen"),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+///
+/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
+/// channel.
+///
+/// ```
+/// use tokio::sync::oneshot;
+/// use tokio::time::{interval, sleep, Duration};
+///
+/// #[tokio::main]
+/// # async fn _doc() {}
+/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+/// async fn main() {
+/// let (send, mut recv) = oneshot::channel();
+/// let mut interval = interval(Duration::from_millis(100));
+///
+/// # let handle =
+/// tokio::spawn(async move {
+/// sleep(Duration::from_secs(1)).await;
+/// send.send("shut down").unwrap();
+/// });
+///
+/// loop {
+/// tokio::select! {
+/// _ = interval.tick() => println!("Another 100ms"),
+/// msg = &mut recv => {
+/// println!("Got message: {}", msg.unwrap());
+/// break;
+/// }
+/// }
+/// }
+/// # handle.await.unwrap();
+/// }
+/// ```
+#[derive(Debug)]
+pub struct Receiver<T> {
+ inner: Option<Arc<Inner<T>>>,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: tracing::Span,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ async_op_span: tracing::Span,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ async_op_poll_span: tracing::Span,
+}
+
+pub mod error {
+ //! Oneshot error types.
+
+ use std::fmt;
+
+ /// Error returned by the `Future` implementation for `Receiver`.
+ #[derive(Debug, Eq, PartialEq)]
+ pub struct RecvError(pub(super) ());
+
+ /// Error returned by the `try_recv` function on `Receiver`.
+ #[derive(Debug, Eq, PartialEq)]
+ pub enum TryRecvError {
+ /// The send half of the channel has not yet sent a value.
+ Empty,
+
+ /// The send half of the channel was dropped without sending a value.
+ Closed,
+ }
+
+ // ===== impl RecvError =====
+
+ impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+ }
+
+ impl std::error::Error for RecvError {}
+
+ // ===== impl TryRecvError =====
+
+ impl fmt::Display for TryRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ TryRecvError::Empty => write!(fmt, "channel empty"),
+ TryRecvError::Closed => write!(fmt, "channel closed"),
+ }
+ }
+ }
+
+ impl std::error::Error for TryRecvError {}
+}
+
+use self::error::*;
+
+struct Inner<T> {
+ /// Manages the state of the inner cell.
+ state: AtomicUsize,
+
+ /// The value. This is set by `Sender` and read by `Receiver`. The state of
+ /// the cell is tracked by `state`.
+ value: UnsafeCell<Option<T>>,
+
+ /// The task to notify when the receiver drops without consuming the value.
+ ///
+ /// ## Safety
+ ///
+ /// The `TX_TASK_SET` bit in the `state` field is set if this field is
+ /// initialized. If that bit is unset, this field may be uninitialized.
+ tx_task: Task,
+
+ /// The task to notify when the value is sent.
+ ///
+ /// ## Safety
+ ///
+ /// The `RX_TASK_SET` bit in the `state` field is set if this field is
+ /// initialized. If that bit is unset, this field may be uninitialized.
+ rx_task: Task,
+}
+
+struct Task(UnsafeCell<MaybeUninit<Waker>>);
+
+impl Task {
+ unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
+ self.with_task(|w| w.will_wake(cx.waker()))
+ }
+
+ unsafe fn with_task<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&Waker) -> R,
+ {
+ self.0.with(|ptr| {
+ let waker: *const Waker = (&*ptr).as_ptr();
+ f(&*waker)
+ })
+ }
+
+ unsafe fn drop_task(&self) {
+ self.0.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.drop_in_place();
+ });
+ }
+
+ unsafe fn set_task(&self, cx: &mut Context<'_>) {
+ self.0.with_mut(|ptr| {
+ let ptr: *mut Waker = (&mut *ptr).as_mut_ptr();
+ ptr.write(cx.waker().clone());
+ });
+ }
+}
+
+#[derive(Clone, Copy)]
+struct State(usize);
+
+/// Creates a new one-shot channel for sending single values across asynchronous
+/// tasks.
+///
+/// The function returns separate "send" and "receive" handles. The `Sender`
+/// handle is used by the producer to send the value. The `Receiver` handle is
+/// used by the consumer to receive the value.
+///
+/// Each handle can be used on separate tasks.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel();
+///
+/// tokio::spawn(async move {
+/// if let Err(_) = tx.send(3) {
+/// println!("the receiver dropped");
+/// }
+/// });
+///
+/// match rx.await {
+/// Ok(v) => println!("got = {:?}", v),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+#[track_caller]
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = {
+ let location = std::panic::Location::caller();
+
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Sender|Receiver",
+ kind = "Sync",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ tx_dropped = false,
+ tx_dropped.op = "override",
+ )
+ });
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ rx_dropped = false,
+ rx_dropped.op = "override",
+ )
+ });
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ value_sent = false,
+ value_sent.op = "override",
+ )
+ });
+
+ resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ value_received = false,
+ value_received.op = "override",
+ )
+ });
+
+ resource_span
+ };
+
+ let inner = Arc::new(Inner {
+ state: AtomicUsize::new(State::new().as_usize()),
+ value: UnsafeCell::new(None),
+ tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
+ rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
+ });
+
+ let tx = Sender {
+ inner: Some(inner.clone()),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: resource_span.clone(),
+ };
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let async_op_span = resource_span
+ .in_scope(|| tracing::trace_span!("runtime.resource.async_op", source = "Receiver::await"));
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let async_op_poll_span =
+ async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
+
+ let rx = Receiver {
+ inner: Some(inner),
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ resource_span: resource_span,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ async_op_span,
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ async_op_poll_span,
+ };
+
+ (tx, rx)
+}
+
+impl<T> Sender<T> {
+ /// Attempts to send a value on this channel, returning it back if it could
+ /// not be sent.
+ ///
+ /// This method consumes `self` as only one value may ever be sent on a oneshot
+ /// channel. It is not marked async because sending a message to an oneshot
+ /// channel never requires any form of waiting. Because of this, the `send`
+ /// method can be used in both synchronous and asynchronous code without
+ /// problems.
+ ///
+ /// A successful send occurs when it is determined that the other end of the
+ /// channel has not hung up already. An unsuccessful send would be one where
+ /// the corresponding receiver has already been deallocated. Note that a
+ /// return value of `Err` means that the data will never be received, but
+ /// a return value of `Ok` does *not* mean that the data will be received.
+ /// It is possible for the corresponding receiver to hang up immediately
+ /// after this function returns `Ok`.
+ ///
+ /// # Examples
+ ///
+ /// Send a value to another task
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = oneshot::channel();
+ ///
+ /// tokio::spawn(async move {
+ /// if let Err(_) = tx.send(3) {
+ /// println!("the receiver dropped");
+ /// }
+ /// });
+ ///
+ /// match rx.await {
+ /// Ok(v) => println!("got = {:?}", v),
+ /// Err(_) => println!("the sender dropped"),
+ /// }
+ /// }
+ /// ```
+ pub fn send(mut self, t: T) -> Result<(), T> {
+ let inner = self.inner.take().unwrap();
+
+ inner.value.with_mut(|ptr| unsafe {
+ // SAFETY: The receiver will not access the `UnsafeCell` unless the
+ // channel has been marked as "complete" (the `VALUE_SENT` state bit
+ // is set).
+ // That bit is only set by the sender later on in this method, and
+ // calling this method consumes `self`. Therefore, if it was possible to
+ // call this method, we know that the `VALUE_SENT` bit is unset, and
+ // the receiver is not currently accessing the `UnsafeCell`.
+ *ptr = Some(t);
+ });
+
+ if !inner.complete() {
+ unsafe {
+ // SAFETY: The receiver will not access the `UnsafeCell` unless
+ // the channel has been marked as "complete". Calling
+ // `complete()` will return true if this bit is set, and false
+ // if it is not set. Thus, if `complete()` returned false, it is
+ // safe for us to access the value, because we know that the
+ // receiver will not.
+ return Err(inner.consume_value().unwrap());
+ }
+ }
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ value_sent = true,
+ value_sent.op = "override",
+ )
+ });
+
+ Ok(())
+ }
+
+ /// Waits for the associated [`Receiver`] handle to close.
+ ///
+ /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
+ /// [`Receiver`] value is dropped.
+ ///
+ /// This function is useful when paired with `select!` to abort a
+ /// computation when the receiver is no longer interested in the result.
+ ///
+ /// # Return
+ ///
+ /// Returns a `Future` which must be awaited on.
+ ///
+ /// [`Receiver`]: Receiver
+ /// [`close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// Basic usage
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, rx) = oneshot::channel::<()>();
+ ///
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// tx.closed().await;
+ /// println!("the receiver dropped");
+ /// }
+ /// ```
+ ///
+ /// Paired with select
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ /// use tokio::time::{self, Duration};
+ ///
+ /// async fn compute() -> String {
+ /// // Complex computation returning a `String`
+ /// # "hello".to_string()
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, rx) = oneshot::channel();
+ ///
+ /// tokio::spawn(async move {
+ /// tokio::select! {
+ /// _ = tx.closed() => {
+ /// // The receiver dropped, no need to do any further work
+ /// }
+ /// value = compute() => {
+ /// // The send can fail if the channel was closed at the exact same
+ /// // time as when compute() finished, so just ignore the failure.
+ /// let _ = tx.send(value);
+ /// }
+ /// }
+ /// });
+ ///
+ /// // Wait for up to 10 seconds
+ /// let _ = time::timeout(Duration::from_secs(10), rx).await;
+ /// }
+ /// ```
+ pub async fn closed(&mut self) {
+ use crate::future::poll_fn;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let resource_span = self.resource_span.clone();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let closed = trace::async_op(
+ || poll_fn(|cx| self.poll_closed(cx)),
+ resource_span,
+ "Sender::closed",
+ "poll_closed",
+ false,
+ );
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let closed = poll_fn(|cx| self.poll_closed(cx));
+
+ closed.await
+ }
+
+ /// Returns `true` if the associated [`Receiver`] handle has been dropped.
+ ///
+ /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
+ /// [`Receiver`] value is dropped.
+ ///
+ /// If `true` is returned, a call to `send` will always result in an error.
+ ///
+ /// [`Receiver`]: Receiver
+ /// [`close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = oneshot::channel();
+ ///
+ /// assert!(!tx.is_closed());
+ ///
+ /// drop(rx);
+ ///
+ /// assert!(tx.is_closed());
+ /// assert!(tx.send("never received").is_err());
+ /// }
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ let inner = self.inner.as_ref().unwrap();
+
+ let state = State::load(&inner.state, Acquire);
+ state.is_closed()
+ }
+
+ /// Checks whether the oneshot channel has been closed, and if not, schedules the
+ /// `Waker` in the provided `Context` to receive a notification when the channel is
+ /// closed.
+ ///
+ /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
+ /// [`Receiver`] value is dropped.
+ ///
+ /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
+ /// to the most recent call will be scheduled to receive a wakeup.
+ ///
+ /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
+ /// [`close`]: fn@crate::sync::oneshot::Receiver::close
+ ///
+ /// # Return value
+ ///
+ /// This function returns:
+ ///
+ /// * `Poll::Pending` if the channel is still open.
+ /// * `Poll::Ready(())` if the channel is closed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// use futures::future::poll_fn;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = oneshot::channel::<()>();
+ ///
+ /// tokio::spawn(async move {
+ /// rx.close();
+ /// });
+ ///
+ /// poll_fn(|cx| tx.poll_closed(cx)).await;
+ ///
+ /// println!("the receiver dropped");
+ /// }
+ /// ```
+ pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ let inner = self.inner.as_ref().unwrap();
+
+ let mut state = State::load(&inner.state, Acquire);
+
+ if state.is_closed() {
+ coop.made_progress();
+ return Poll::Ready(());
+ }
+
+ if state.is_tx_task_set() {
+ let will_notify = unsafe { inner.tx_task.will_wake(cx) };
+
+ if !will_notify {
+ state = State::unset_tx_task(&inner.state);
+
+ if state.is_closed() {
+ // Set the flag again so that the waker is released in drop
+ State::set_tx_task(&inner.state);
+ coop.made_progress();
+ return Ready(());
+ } else {
+ unsafe { inner.tx_task.drop_task() };
+ }
+ }
+ }
+
+ if !state.is_tx_task_set() {
+ // Attempt to set the task
+ unsafe {
+ inner.tx_task.set_task(cx);
+ }
+
+ // Update the state
+ state = State::set_tx_task(&inner.state);
+
+ if state.is_closed() {
+ coop.made_progress();
+ return Ready(());
+ }
+ }
+
+ Pending
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.as_ref() {
+ inner.complete();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ tx_dropped = true,
+ tx_dropped.op = "override",
+ )
+ });
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Prevents the associated [`Sender`] handle from sending a value.
+ ///
+ /// Any `send` operation which happens after calling `close` is guaranteed
+ /// to fail. After calling `close`, [`try_recv`] should be called to
+ /// receive a value if one was sent **before** the call to `close`
+ /// completed.
+ ///
+ /// This function is useful to perform a graceful shutdown and ensure that a
+ /// value will not be sent into the channel and never received.
+ ///
+ /// `close` is no-op if a message is already received or the channel
+ /// is already closed.
+ ///
+ /// [`Sender`]: Sender
+ /// [`try_recv`]: Receiver::try_recv
+ ///
+ /// # Examples
+ ///
+ /// Prevent a value from being sent
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ /// use tokio::sync::oneshot::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = oneshot::channel();
+ ///
+ /// assert!(!tx.is_closed());
+ ///
+ /// rx.close();
+ ///
+ /// assert!(tx.is_closed());
+ /// assert!(tx.send("never received").is_err());
+ ///
+ /// match rx.try_recv() {
+ /// Err(TryRecvError::Closed) => {}
+ /// _ => unreachable!(),
+ /// }
+ /// }
+ /// ```
+ ///
+ /// Receive a value sent **before** calling `close`
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = oneshot::channel();
+ ///
+ /// assert!(tx.send("will receive").is_ok());
+ ///
+ /// rx.close();
+ ///
+ /// let msg = rx.try_recv().unwrap();
+ /// assert_eq!(msg, "will receive");
+ /// }
+ /// ```
+ pub fn close(&mut self) {
+ if let Some(inner) = self.inner.as_ref() {
+ inner.close();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ rx_dropped = true,
+ rx_dropped.op = "override",
+ )
+ });
+ }
+ }
+
+ /// Attempts to receive a value.
+ ///
+ /// If a pending value exists in the channel, it is returned. If no value
+ /// has been sent, the current task **will not** be registered for
+ /// future notification.
+ ///
+ /// This function is useful to call from outside the context of an
+ /// asynchronous task.
+ ///
+ /// # Return
+ ///
+ /// - `Ok(T)` if a value is pending in the channel.
+ /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
+ /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
+ /// a value.
+ ///
+ /// # Examples
+ ///
+ /// `try_recv` before a value is sent, then after.
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ /// use tokio::sync::oneshot::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = oneshot::channel();
+ ///
+ /// match rx.try_recv() {
+ /// // The channel is currently empty
+ /// Err(TryRecvError::Empty) => {}
+ /// _ => unreachable!(),
+ /// }
+ ///
+ /// // Send a value
+ /// tx.send("hello").unwrap();
+ ///
+ /// match rx.try_recv() {
+ /// Ok(value) => assert_eq!(value, "hello"),
+ /// _ => unreachable!(),
+ /// }
+ /// }
+ /// ```
+ ///
+ /// `try_recv` when the sender dropped before sending a value
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ /// use tokio::sync::oneshot::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = oneshot::channel::<()>();
+ ///
+ /// drop(tx);
+ ///
+ /// match rx.try_recv() {
+ /// // The channel will never receive a value.
+ /// Err(TryRecvError::Closed) => {}
+ /// _ => unreachable!(),
+ /// }
+ /// }
+ /// ```
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ let result = if let Some(inner) = self.inner.as_ref() {
+ let state = State::load(&inner.state, Acquire);
+
+ if state.is_complete() {
+ // SAFETY: If `state.is_complete()` returns true, then the
+ // `VALUE_SENT` bit has been set and the sender side of the
+ // channel will no longer attempt to access the inner
+ // `UnsafeCell`. Therefore, it is now safe for us to access the
+ // cell.
+ match unsafe { inner.consume_value() } {
+ Some(value) => {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ value_received = true,
+ value_received.op = "override",
+ )
+ });
+ Ok(value)
+ }
+ None => Err(TryRecvError::Closed),
+ }
+ } else if state.is_closed() {
+ Err(TryRecvError::Closed)
+ } else {
+ // Not ready, this does not clear `inner`
+ return Err(TryRecvError::Empty);
+ }
+ } else {
+ Err(TryRecvError::Closed)
+ };
+
+ self.inner = None;
+ result
+ }
+
+ /// Blocking receive to call outside of asynchronous contexts.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if called within an asynchronous execution
+ /// context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = oneshot::channel::<u8>();
+ ///
+ /// let sync_code = thread::spawn(move || {
+ /// assert_eq!(Ok(10), rx.blocking_recv());
+ /// });
+ ///
+ /// let _ = tx.send(10);
+ /// sync_code.join().unwrap();
+ /// }
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_recv(self) -> Result<T, RecvError> {
+ crate::future::block_on(self)
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.as_ref() {
+ inner.close();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ self.resource_span.in_scope(|| {
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ rx_dropped = true,
+ rx_dropped.op = "override",
+ )
+ });
+ }
+ }
+}
+
+impl<T> Future for Receiver<T> {
+ type Output = Result<T, RecvError>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // If `inner` is `None`, then `poll()` has already completed.
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _res_span = self.resource_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_span = self.async_op_span.clone().entered();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _ao_poll_span = self.async_op_poll_span.clone().entered();
+
+ let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
+
+ #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
+ let res = ready!(inner.poll_recv(cx))?;
+
+ res
+ } else {
+ panic!("called after complete");
+ };
+
+ self.inner = None;
+ Ready(Ok(ret))
+ }
+}
+
+impl<T> Inner<T> {
+ fn complete(&self) -> bool {
+ let prev = State::set_complete(&self.state);
+
+ if prev.is_closed() {
+ return false;
+ }
+
+ if prev.is_rx_task_set() {
+ // TODO: Consume waker?
+ unsafe {
+ self.rx_task.with_task(Waker::wake_by_ref);
+ }
+ }
+
+ true
+ }
+
+ fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ // Load the state
+ let mut state = State::load(&self.state, Acquire);
+
+ if state.is_complete() {
+ coop.made_progress();
+ match unsafe { self.consume_value() } {
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
+ }
+ } else if state.is_closed() {
+ coop.made_progress();
+ Ready(Err(RecvError(())))
+ } else {
+ if state.is_rx_task_set() {
+ let will_notify = unsafe { self.rx_task.will_wake(cx) };
+
+ // Check if the task is still the same
+ if !will_notify {
+ // Unset the task
+ state = State::unset_rx_task(&self.state);
+ if state.is_complete() {
+ // Set the flag again so that the waker is released in drop
+ State::set_rx_task(&self.state);
+
+ coop.made_progress();
+ // SAFETY: If `state.is_complete()` returns true, then the
+ // `VALUE_SENT` bit has been set and the sender side of the
+ // channel will no longer attempt to access the inner
+ // `UnsafeCell`. Therefore, it is now safe for us to access the
+ // cell.
+ return match unsafe { self.consume_value() } {
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
+ };
+ } else {
+ unsafe { self.rx_task.drop_task() };
+ }
+ }
+ }
+
+ if !state.is_rx_task_set() {
+ // Attempt to set the task
+ unsafe {
+ self.rx_task.set_task(cx);
+ }
+
+ // Update the state
+ state = State::set_rx_task(&self.state);
+
+ if state.is_complete() {
+ coop.made_progress();
+ match unsafe { self.consume_value() } {
+ Some(value) => Ready(Ok(value)),
+ None => Ready(Err(RecvError(()))),
+ }
+ } else {
+ Pending
+ }
+ } else {
+ Pending
+ }
+ }
+ }
+
+ /// Called by `Receiver` to indicate that the value will never be received.
+ fn close(&self) {
+ let prev = State::set_closed(&self.state);
+
+ if prev.is_tx_task_set() && !prev.is_complete() {
+ unsafe {
+ self.tx_task.with_task(Waker::wake_by_ref);
+ }
+ }
+ }
+
+ /// Consumes the value. This function does not check `state`.
+ ///
+ /// # Safety
+ ///
+ /// Calling this method concurrently on multiple threads will result in a
+ /// data race. The `VALUE_SENT` state bit is used to ensure that only the
+ /// sender *or* the receiver will call this method at a given point in time.
+ /// If `VALUE_SENT` is not set, then only the sender may call this method;
+ /// if it is set, then only the receiver may call this method.
+ unsafe fn consume_value(&self) -> Option<T> {
+ self.value.with_mut(|ptr| (*ptr).take())
+ }
+}
+
+unsafe impl<T: Send> Send for Inner<T> {}
+unsafe impl<T: Send> Sync for Inner<T> {}
+
+fn mut_load(this: &mut AtomicUsize) -> usize {
+ this.with_mut(|v| *v)
+}
+
+impl<T> Drop for Inner<T> {
+ fn drop(&mut self) {
+ let state = State(mut_load(&mut self.state));
+
+ if state.is_rx_task_set() {
+ unsafe {
+ self.rx_task.drop_task();
+ }
+ }
+
+ if state.is_tx_task_set() {
+ unsafe {
+ self.tx_task.drop_task();
+ }
+ }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for Inner<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use std::sync::atomic::Ordering::Relaxed;
+
+ fmt.debug_struct("Inner")
+ .field("state", &State::load(&self.state, Relaxed))
+ .finish()
+ }
+}
+
+/// Indicates that a waker for the receiving task has been set.
+///
+/// # Safety
+///
+/// If this bit is not set, the `rx_task` field may be uninitialized.
+const RX_TASK_SET: usize = 0b00001;
+/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
+///
+/// # Safety
+///
+/// This bit controls which side of the channel is permitted to access the
+/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
+/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
+/// the sender.
+const VALUE_SENT: usize = 0b00010;
+const CLOSED: usize = 0b00100;
+
+/// Indicates that a waker for the sending task has been set.
+///
+/// # Safety
+///
+/// If this bit is not set, the `tx_task` field may be uninitialized.
+const TX_TASK_SET: usize = 0b01000;
+
+impl State {
+ fn new() -> State {
+ State(0)
+ }
+
+ fn is_complete(self) -> bool {
+ self.0 & VALUE_SENT == VALUE_SENT
+ }
+
+ fn set_complete(cell: &AtomicUsize) -> State {
+ // This method is a compare-and-swap loop rather than a fetch-or like
+ // other `set_$WHATEVER` methods on `State`. This is because we must
+ // check if the state has been closed before setting the `VALUE_SENT`
+ // bit.
+ //
+ // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
+ // bit is already set, because `VALUE_SENT` will tell the receiver that
+ // it's okay to access the inner `UnsafeCell`. Immediately after calling
+ // `set_complete`, if the channel was closed, the sender will _also_
+ // access the `UnsafeCell` to take the value back out, so if a
+ // `poll_recv` or `try_recv` call is occurring concurrently, both
+ // threads may try to access the `UnsafeCell` if we were to set the
+ // `VALUE_SENT` bit on a closed channel.
+ let mut state = cell.load(Ordering::Relaxed);
+ loop {
+ if State(state).is_closed() {
+ break;
+ }
+ // TODO: This could be `Release`, followed by an `Acquire` fence *if*
+ // the `RX_TASK_SET` flag is set. However, `loom` does not support
+ // fences yet.
+ match cell.compare_exchange_weak(
+ state,
+ state | VALUE_SENT,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(actual) => state = actual,
+ }
+ }
+ State(state)
+ }
+
+ fn is_rx_task_set(self) -> bool {
+ self.0 & RX_TASK_SET == RX_TASK_SET
+ }
+
+ fn set_rx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_or(RX_TASK_SET, AcqRel);
+ State(val | RX_TASK_SET)
+ }
+
+ fn unset_rx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_and(!RX_TASK_SET, AcqRel);
+ State(val & !RX_TASK_SET)
+ }
+
+ fn is_closed(self) -> bool {
+ self.0 & CLOSED == CLOSED
+ }
+
+ fn set_closed(cell: &AtomicUsize) -> State {
+ // Acquire because we want all later writes (attempting to poll) to be
+ // ordered after this.
+ let val = cell.fetch_or(CLOSED, Acquire);
+ State(val)
+ }
+
+ fn set_tx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_or(TX_TASK_SET, AcqRel);
+ State(val | TX_TASK_SET)
+ }
+
+ fn unset_tx_task(cell: &AtomicUsize) -> State {
+ let val = cell.fetch_and(!TX_TASK_SET, AcqRel);
+ State(val & !TX_TASK_SET)
+ }
+
+ fn is_tx_task_set(self) -> bool {
+ self.0 & TX_TASK_SET == TX_TASK_SET
+ }
+
+ fn as_usize(self) -> usize {
+ self.0
+ }
+
+ fn load(cell: &AtomicUsize, order: Ordering) -> State {
+ let val = cell.load(order);
+ State(val)
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("State")
+ .field("is_complete", &self.is_complete())
+ .field("is_closed", &self.is_closed())
+ .field("is_rx_task_set", &self.is_rx_task_set())
+ .field("is_tx_task_set", &self.is_tx_task_set())
+ .finish()
+ }
+}