From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/mio-extras/src/channel.rs | 431 +++++++++++++++++++++++++++++ 1 file changed, 431 insertions(+) create mode 100644 third_party/rust/mio-extras/src/channel.rs (limited to 'third_party/rust/mio-extras/src/channel.rs') diff --git a/third_party/rust/mio-extras/src/channel.rs b/third_party/rust/mio-extras/src/channel.rs new file mode 100644 index 0000000000..561317ecbf --- /dev/null +++ b/third_party/rust/mio-extras/src/channel.rs @@ -0,0 +1,431 @@ +//! Thread safe communication channel implementing `Evented` +use lazycell::{AtomicLazyCell, LazyCell}; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use std::any::Any; +use std::error; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{mpsc, Arc}; +use std::{fmt, io}; + +/// Creates a new asynchronous channel, where the `Receiver` can be registered +/// with `Poll`. +pub fn channel() -> (Sender, Receiver) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::channel(); + + let tx = Sender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +/// Creates a new synchronous, bounded channel where the `Receiver` can be +/// registered with `Poll`. +pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::sync_channel(bound); + + let tx = SyncSender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +fn ctl_pair() -> (SenderCtl, ReceiverCtl) { + let inner = Arc::new(Inner { + pending: AtomicUsize::new(0), + senders: AtomicUsize::new(1), + set_readiness: AtomicLazyCell::new(), + }); + + let tx = SenderCtl { + inner: Arc::clone(&inner), + }; + + let rx = ReceiverCtl { + registration: LazyCell::new(), + inner, + }; + + (tx, rx) +} + +/// Tracks messages sent on a channel in order to update readiness. +struct SenderCtl { + inner: Arc, +} + +/// Tracks messages received on a channel in order to track readiness. +struct ReceiverCtl { + registration: LazyCell, + inner: Arc, +} + +/// The sending half of a channel. +pub struct Sender { + tx: mpsc::Sender, + ctl: SenderCtl, +} + +/// The sending half of a synchronous channel. +pub struct SyncSender { + tx: mpsc::SyncSender, + ctl: SenderCtl, +} + +/// The receiving half of a channel. +pub struct Receiver { + rx: mpsc::Receiver, + ctl: ReceiverCtl, +} + +/// An error returned from the `Sender::send` or `SyncSender::send` function. +pub enum SendError { + /// An IO error. + Io(io::Error), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +/// An error returned from the `SyncSender::try_send` function. +pub enum TrySendError { + /// An IO error. + Io(io::Error), + + /// Data could not be sent because it would require the callee to block. + Full(T), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +struct Inner { + // The number of outstanding messages for the receiver to read + pending: AtomicUsize, + // The number of sender handles + senders: AtomicUsize, + // The set readiness handle + set_readiness: AtomicLazyCell, +} + +impl Sender { + /// Attempts to send a value on this channel, returning it back if it could not be sent. + pub fn send(&self, t: T) -> Result<(), SendError> { + self.tx.send(t).map_err(SendError::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl Clone for Sender { + fn clone(&self) -> Sender { + Sender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl SyncSender { + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + pub fn send(&self, t: T) -> Result<(), SendError> { + self.tx.send(t).map_err(From::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from `send` by returning immediately if the channel's + /// buffer is full or no receiver is waiting to acquire some data. + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { + self.tx.try_send(t).map_err(From::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl Clone for SyncSender { + fn clone(&self) -> SyncSender { + SyncSender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl Receiver { + /// Attempts to return a pending value on this receiver without blocking. + pub fn try_recv(&self) -> Result { + self.rx.try_recv().and_then(|res| { + let _ = self.ctl.dec(); + Ok(res) + }) + } +} + +impl Evented for Receiver { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.ctl.deregister(poll) + } +} + +/* + * + * ===== SenderCtl / ReceiverCtl ===== + * + */ + +impl SenderCtl { + /// Call to track that a message has been sent + fn inc(&self) -> io::Result<()> { + let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire); + + if 0 == cnt { + // Toggle readiness to readable + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Clone for SenderCtl { + fn clone(&self) -> SenderCtl { + self.inner.senders.fetch_add(1, Ordering::Relaxed); + SenderCtl { + inner: Arc::clone(&self.inner), + } + } +} + +impl Drop for SenderCtl { + fn drop(&mut self) { + if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 { + let _ = self.inc(); + } + } +} + +impl ReceiverCtl { + fn dec(&self) -> io::Result<()> { + let first = self.inner.pending.load(Ordering::Acquire); + + if first == 1 { + // Unset readiness + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::empty())?; + } + } + + // Decrement + let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel); + + if first == 1 && second > 1 { + // There are still pending messages. Since readiness was + // previously unset, it must be reset here + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Evented for ReceiverCtl { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.registration.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "receiver already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + + if self.inner.pending.load(Ordering::Relaxed) > 0 { + // TODO: Don't drop readiness + let _ = set_readiness.set_readiness(Ready::readable()); + } + + self.registration + .fill(registration) + .expect("unexpected state encountered"); + self.inner + .set_readiness + .fill(set_readiness) + .expect("unexpected state encountered"); + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.reregister(registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.deregister(registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +/* + * + * ===== Error conversions ===== + * + */ + +impl From> for SendError { + fn from(src: mpsc::SendError) -> SendError { + SendError::Disconnected(src.0) + } +} + +impl From for SendError { + fn from(src: io::Error) -> SendError { + SendError::Io(src) + } +} + +impl From> for TrySendError { + fn from(src: mpsc::TrySendError) -> TrySendError { + match src { + mpsc::TrySendError::Full(v) => TrySendError::Full(v), + mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v), + } + } +} + +impl From> for TrySendError { + fn from(src: mpsc::SendError) -> TrySendError { + TrySendError::Disconnected(src.0) + } +} + +impl From for TrySendError { + fn from(src: io::Error) -> TrySendError { + TrySendError::Io(src) + } +} + +/* + * + * ===== Implement Error, Debug and Display for Errors ===== + * + */ + +impl error::Error for SendError { + fn description(&self) -> &str { + match *self { + SendError::Io(ref io_err) => io_err.description(), + SendError::Disconnected(..) => "Disconnected", + } + } +} + +impl error::Error for TrySendError { + fn description(&self) -> &str { + match *self { + TrySendError::Io(ref io_err) => io_err.description(), + TrySendError::Full(..) => "Full", + TrySendError::Disconnected(..) => "Disconnected", + } + } +} + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl fmt::Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +#[inline] +fn format_send_error(e: &SendError, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + SendError::Io(ref io_err) => write!(f, "{}", io_err), + SendError::Disconnected(..) => write!(f, "Disconnected"), + } +} + +#[inline] +fn format_try_send_error(e: &TrySendError, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + TrySendError::Io(ref io_err) => write!(f, "{}", io_err), + TrySendError::Full(..) => write!(f, "Full"), + TrySendError::Disconnected(..) => write!(f, "Disconnected"), + } +} -- cgit v1.2.3