From 698f8c2f01ea549d77d7dc3338a12e04c11057b9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:02:58 +0200 Subject: Adding upstream version 1.64.0+dfsg1. Signed-off-by: Daniel Baumann --- library/std/src/sync/mpsc/oneshot.rs | 315 +++++++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 library/std/src/sync/mpsc/oneshot.rs (limited to 'library/std/src/sync/mpsc/oneshot.rs') diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs new file mode 100644 index 000000000..0e259b8ae --- /dev/null +++ b/library/std/src/sync/mpsc/oneshot.rs @@ -0,0 +1,315 @@ +/// Oneshot channels/ports +/// +/// This is the initial flavor of channels/ports used for comm module. This is +/// an optimization for the one-use case of a channel. The major optimization of +/// this type is to have one and exactly one allocation when the chan/port pair +/// is created. +/// +/// Another possible optimization would be to not use an Arc box because +/// in theory we know when the shared packet can be deallocated (no real need +/// for the atomic reference counting), but I was having trouble how to destroy +/// the data early in a drop of a Port. +/// +/// # Implementation +/// +/// Oneshots are implemented around one atomic usize variable. This variable +/// indicates both the state of the port/chan but also contains any threads +/// blocked on the port. All atomic operations happen on this one word. +/// +/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect +/// on behalf of the channel side of things (it can be mentally thought of as +/// consuming the port). This upgrade is then also stored in the shared packet. +/// The one caveat to consider is that when a port sees a disconnected channel +/// it must check for data because there is no "data plus upgrade" state. +pub use self::Failure::*; +use self::MyUpgrade::*; +pub use self::UpgradeResult::*; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::sync::atomic::{AtomicPtr, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::Receiver; +use crate::time::Instant; + +// Various states you can find a port in. +const EMPTY: *mut u8 = ptr::invalid_mut::(0); // initial state: no data, no blocked receiver +const DATA: *mut u8 = ptr::invalid_mut::(1); // data ready for receiver to take +const DISCONNECTED: *mut u8 = ptr::invalid_mut::(2); // channel is disconnected OR upgraded +// Any other value represents a pointer to a SignalToken value. The +// protocol ensures that when the state moves *to* a pointer, +// ownership of the token is given to the packet, and when the state +// moves *from* a pointer, ownership of the token is transferred to +// whoever changed the state. + +pub struct Packet { + // Internal state of the chan/port pair (stores the blocked thread as well) + state: AtomicPtr, + // One-shot data slot location + data: UnsafeCell>, + // when used for the second time, a oneshot channel must be upgraded, and + // this contains the slot for the upgrade + upgrade: UnsafeCell>, +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Receiver), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(SignalToken), +} + +enum MyUpgrade { + NothingSent, + SendUsed, + GoUp(Receiver), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + data: UnsafeCell::new(None), + upgrade: UnsafeCell::new(NothingSent), + state: AtomicPtr::new(EMPTY), + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + unsafe { + // Sanity check + match *self.upgrade.get() { + NothingSent => {} + _ => panic!("sending on a oneshot that's already sent on "), + } + assert!((*self.data.get()).is_none()); + ptr::write(self.data.get(), Some(t)); + ptr::write(self.upgrade.get(), SendUsed); + + match self.state.swap(DATA, Ordering::SeqCst) { + // Sent the data, no one was waiting + EMPTY => Ok(()), + + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. + DISCONNECTED => { + self.state.swap(DISCONNECTED, Ordering::SeqCst); + ptr::write(self.upgrade.get(), NothingSent); + Err((&mut *self.data.get()).take().unwrap()) + } + + // Not possible, these are one-use channels + DATA => unreachable!(), + + // There is a thread waiting on the other end. We leave the 'DATA' + // state inside so it'll pick it up on the other end. + ptr => { + SignalToken::from_raw(ptr).signal(); + Ok(()) + } + } + } + } + + // Just tests whether this channel has been sent on or not, this is only + // safe to use from the sender. + pub fn sent(&self) -> bool { + unsafe { !matches!(*self.upgrade.get(), NothingSent) } + } + + pub fn recv(&self, deadline: Option) -> Result> { + // Attempt to not block the thread (it's a little expensive). If it looks + // like we're not empty, then immediately go through to `try_recv`. + if self.state.load(Ordering::SeqCst) == EMPTY { + let (wait_token, signal_token) = blocking::tokens(); + let ptr = unsafe { signal_token.to_raw() }; + + // race with senders to enter the blocking state + if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + // Try to reset the state + if timed_out { + self.abort_selection().map_err(Upgraded)?; + } + } else { + wait_token.wait(); + debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); + } + } else { + // drop the signal token, since we never blocked + drop(unsafe { SignalToken::from_raw(ptr) }); + } + } + + self.try_recv() + } + + pub fn try_recv(&self) -> Result> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + let _ = self.state.compare_exchange( + DATA, + EMPTY, + Ordering::SeqCst, + Ordering::SeqCst, + ); + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => unreachable!(), + } + } + + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => match ptr::replace(self.upgrade.get(), SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)), + }, + }, + + // We are the sole receiver; there cannot be a blocking + // receiver already. + _ => unreachable!(), + } + } + } + + // Returns whether the upgrade was completed. If the upgrade wasn't + // completed, then the port couldn't get sent to the other half (it will + // never receive it). + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { + unsafe { + let prev = match *self.upgrade.get() { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => panic!("upgrading again"), + }; + ptr::write(self.upgrade.get(), GoUp(up)); + + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { + ptr::replace(self.upgrade.get(), prev); + UpDisconnected + } + + // If someone's waiting, we gotta wake them up + ptr => UpWoke(SignalToken::from_raw(ptr)), + } + } + } + + pub fn drop_chan(&self) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + DATA | DISCONNECTED | EMPTY => {} + + // If someone's waiting, we gotta wake them up + ptr => unsafe { + SignalToken::from_raw(ptr).signal(); + }, + } + } + + pub fn drop_port(&self) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // An empty channel has nothing to do, and a remotely disconnected + // channel also has nothing to do b/c we're about to run the drop + // glue + DISCONNECTED | EMPTY => {} + + // There's data on the channel, so make sure we destroy it promptly. + // This is why not using an arc is a little difficult (need the box + // to stay valid while we take the data). + DATA => unsafe { + (&mut *self.data.get()).take().unwrap(); + }, + + // We're the only ones that can block on this port + _ => unreachable!(), + } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Remove a previous selecting thread from this port. This ensures that the + // blocked thread will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&self) -> Result> { + let state = match self.state.load(Ordering::SeqCst) { + // Each of these states means that no further activity will happen + // with regard to abortion selection + s @ (EMPTY | DATA | DISCONNECTED) => s, + + // If we've got a blocked thread, then use an atomic to gain ownership + // of it (may fail) + ptr => self + .state + .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst) + .unwrap_or_else(|x| x), + }; + + // Now that we've got ownership of our state, figure out what to do + // about it. + match state { + EMPTY => unreachable!(), + // our thread used for select was stolen + DATA => Ok(true), + + // If the other end has hung up, then we have complete ownership + // of the port. First, check if there was data waiting for us. This + // is possible if the other end sent something and then hung up. + // + // We then need to check to see if there was an upgrade requested, + // and if so, the upgraded port needs to have its selection aborted. + DISCONNECTED => unsafe { + if (*self.data.get()).is_some() { + Ok(true) + } else { + match ptr::replace(self.upgrade.get(), SendUsed) { + GoUp(port) => Err(port), + _ => Ok(true), + } + } + }, + + // We woke ourselves up from select. + ptr => unsafe { + drop(SignalToken::from_raw(ptr)); + Ok(false) + }, + } + } +} + +impl Drop for Packet { + fn drop(&mut self) { + assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); + } +} -- cgit v1.2.3