summaryrefslogtreecommitdiffstats
path: root/library/std/src/sync/mpsc/oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--library/std/src/sync/mpsc/oneshot.rs315
1 files changed, 0 insertions, 315 deletions
diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs
deleted file mode 100644
index 0e259b8ae..000000000
--- a/library/std/src/sync/mpsc/oneshot.rs
+++ /dev/null
@@ -1,315 +0,0 @@
-/// Oneshot channels/ports
-///
-/// This is the initial flavor of channels/ports used for comm module. This is
-/// an optimization for the one-use case of a channel. The major optimization of
-/// this type is to have one and exactly one allocation when the chan/port pair
-/// is created.
-///
-/// Another possible optimization would be to not use an Arc box because
-/// in theory we know when the shared packet can be deallocated (no real need
-/// for the atomic reference counting), but I was having trouble how to destroy
-/// the data early in a drop of a Port.
-///
-/// # Implementation
-///
-/// Oneshots are implemented around one atomic usize variable. This variable
-/// indicates both the state of the port/chan but also contains any threads
-/// blocked on the port. All atomic operations happen on this one word.
-///
-/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
-/// on behalf of the channel side of things (it can be mentally thought of as
-/// consuming the port). This upgrade is then also stored in the shared packet.
-/// The one caveat to consider is that when a port sees a disconnected channel
-/// it must check for data because there is no "data plus upgrade" state.
-pub use self::Failure::*;
-use self::MyUpgrade::*;
-pub use self::UpgradeResult::*;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::Receiver;
-use crate::time::Instant;
-
-// Various states you can find a port in.
-const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
-const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
-const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
-// Any other value represents a pointer to a SignalToken value. The
-// protocol ensures that when the state moves *to* a pointer,
-// ownership of the token is given to the packet, and when the state
-// moves *from* a pointer, ownership of the token is transferred to
-// whoever changed the state.
-
-pub struct Packet<T> {
- // Internal state of the chan/port pair (stores the blocked thread as well)
- state: AtomicPtr<u8>,
- // One-shot data slot location
- data: UnsafeCell<Option<T>>,
- // when used for the second time, a oneshot channel must be upgraded, and
- // this contains the slot for the upgrade
- upgrade: UnsafeCell<MyUpgrade<T>>,
-}
-
-pub enum Failure<T> {
- Empty,
- Disconnected,
- Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
- UpSuccess,
- UpDisconnected,
- UpWoke(SignalToken),
-}
-
-enum MyUpgrade<T> {
- NothingSent,
- SendUsed,
- GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
- pub fn new() -> Packet<T> {
- Packet {
- data: UnsafeCell::new(None),
- upgrade: UnsafeCell::new(NothingSent),
- state: AtomicPtr::new(EMPTY),
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- unsafe {
- // Sanity check
- match *self.upgrade.get() {
- NothingSent => {}
- _ => panic!("sending on a oneshot that's already sent on "),
- }
- assert!((*self.data.get()).is_none());
- ptr::write(self.data.get(), Some(t));
- ptr::write(self.upgrade.get(), SendUsed);
-
- match self.state.swap(DATA, Ordering::SeqCst) {
- // Sent the data, no one was waiting
- EMPTY => Ok(()),
-
- // Couldn't send the data, the port hung up first. Return the data
- // back up the stack.
- DISCONNECTED => {
- self.state.swap(DISCONNECTED, Ordering::SeqCst);
- ptr::write(self.upgrade.get(), NothingSent);
- Err((&mut *self.data.get()).take().unwrap())
- }
-
- // Not possible, these are one-use channels
- DATA => unreachable!(),
-
- // There is a thread waiting on the other end. We leave the 'DATA'
- // state inside so it'll pick it up on the other end.
- ptr => {
- SignalToken::from_raw(ptr).signal();
- Ok(())
- }
- }
- }
- }
-
- // Just tests whether this channel has been sent on or not, this is only
- // safe to use from the sender.
- pub fn sent(&self) -> bool {
- unsafe { !matches!(*self.upgrade.get(), NothingSent) }
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
- // Attempt to not block the thread (it's a little expensive). If it looks
- // like we're not empty, then immediately go through to `try_recv`.
- if self.state.load(Ordering::SeqCst) == EMPTY {
- let (wait_token, signal_token) = blocking::tokens();
- let ptr = unsafe { signal_token.to_raw() };
-
- // race with senders to enter the blocking state
- if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- // Try to reset the state
- if timed_out {
- self.abort_selection().map_err(Upgraded)?;
- }
- } else {
- wait_token.wait();
- debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
- }
- } else {
- // drop the signal token, since we never blocked
- drop(unsafe { SignalToken::from_raw(ptr) });
- }
- }
-
- self.try_recv()
- }
-
- pub fn try_recv(&self) -> Result<T, Failure<T>> {
- unsafe {
- match self.state.load(Ordering::SeqCst) {
- EMPTY => Err(Empty),
-
- // We saw some data on the channel, but the channel can be used
- // again to send us an upgrade. As a result, we need to re-insert
- // into the channel that there's no data available (otherwise we'll
- // just see DATA next time). This is done as a cmpxchg because if
- // the state changes under our feet we'd rather just see that state
- // change.
- DATA => {
- let _ = self.state.compare_exchange(
- DATA,
- EMPTY,
- Ordering::SeqCst,
- Ordering::SeqCst,
- );
- match (&mut *self.data.get()).take() {
- Some(data) => Ok(data),
- None => unreachable!(),
- }
- }
-
- // There's no guarantee that we receive before an upgrade happens,
- // and an upgrade flags the channel as disconnected, so when we see
- // this we first need to check if there's data available and *then*
- // we go through and process the upgrade.
- DISCONNECTED => match (&mut *self.data.get()).take() {
- Some(data) => Ok(data),
- None => match ptr::replace(self.upgrade.get(), SendUsed) {
- SendUsed | NothingSent => Err(Disconnected),
- GoUp(upgrade) => Err(Upgraded(upgrade)),
- },
- },
-
- // We are the sole receiver; there cannot be a blocking
- // receiver already.
- _ => unreachable!(),
- }
- }
- }
-
- // Returns whether the upgrade was completed. If the upgrade wasn't
- // completed, then the port couldn't get sent to the other half (it will
- // never receive it).
- pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
- unsafe {
- let prev = match *self.upgrade.get() {
- NothingSent => NothingSent,
- SendUsed => SendUsed,
- _ => panic!("upgrading again"),
- };
- ptr::write(self.upgrade.get(), GoUp(up));
-
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- // If the channel is empty or has data on it, then we're good to go.
- // Senders will check the data before the upgrade (in case we
- // plastered over the DATA state).
- DATA | EMPTY => UpSuccess,
-
- // If the other end is already disconnected, then we failed the
- // upgrade. Be sure to trash the port we were given.
- DISCONNECTED => {
- ptr::replace(self.upgrade.get(), prev);
- UpDisconnected
- }
-
- // If someone's waiting, we gotta wake them up
- ptr => UpWoke(SignalToken::from_raw(ptr)),
- }
- }
- }
-
- pub fn drop_chan(&self) {
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- DATA | DISCONNECTED | EMPTY => {}
-
- // If someone's waiting, we gotta wake them up
- ptr => unsafe {
- SignalToken::from_raw(ptr).signal();
- },
- }
- }
-
- pub fn drop_port(&self) {
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- // An empty channel has nothing to do, and a remotely disconnected
- // channel also has nothing to do b/c we're about to run the drop
- // glue
- DISCONNECTED | EMPTY => {}
-
- // There's data on the channel, so make sure we destroy it promptly.
- // This is why not using an arc is a little difficult (need the box
- // to stay valid while we take the data).
- DATA => unsafe {
- (&mut *self.data.get()).take().unwrap();
- },
-
- // We're the only ones that can block on this port
- _ => unreachable!(),
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // Remove a previous selecting thread from this port. This ensures that the
- // blocked thread will no longer be visible to any other threads.
- //
- // The return value indicates whether there's data on this port.
- pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
- let state = match self.state.load(Ordering::SeqCst) {
- // Each of these states means that no further activity will happen
- // with regard to abortion selection
- s @ (EMPTY | DATA | DISCONNECTED) => s,
-
- // If we've got a blocked thread, then use an atomic to gain ownership
- // of it (may fail)
- ptr => self
- .state
- .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
- .unwrap_or_else(|x| x),
- };
-
- // Now that we've got ownership of our state, figure out what to do
- // about it.
- match state {
- EMPTY => unreachable!(),
- // our thread used for select was stolen
- DATA => Ok(true),
-
- // If the other end has hung up, then we have complete ownership
- // of the port. First, check if there was data waiting for us. This
- // is possible if the other end sent something and then hung up.
- //
- // We then need to check to see if there was an upgrade requested,
- // and if so, the upgraded port needs to have its selection aborted.
- DISCONNECTED => unsafe {
- if (*self.data.get()).is_some() {
- Ok(true)
- } else {
- match ptr::replace(self.upgrade.get(), SendUsed) {
- GoUp(port) => Err(port),
- _ => Ok(true),
- }
- }
- },
-
- // We woke ourselves up from select.
- ptr => unsafe {
- drop(SignalToken::from_raw(ptr));
- Ok(false)
- },
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
- }
-}