From 2aa4a82499d4becd2284cdb482213d541b8804dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 16:29:10 +0200 Subject: Adding upstream version 86.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/mio/src/channel.rs | 390 ++++++++++++++++++++++++++++++++++++ 1 file changed, 390 insertions(+) create mode 100644 third_party/rust/mio/src/channel.rs (limited to 'third_party/rust/mio/src/channel.rs') diff --git a/third_party/rust/mio/src/channel.rs b/third_party/rust/mio/src/channel.rs new file mode 100644 index 0000000000..7077c51f86 --- /dev/null +++ b/third_party/rust/mio/src/channel.rs @@ -0,0 +1,390 @@ +//! Thread safe communication channel implementing `Evented` + +#![allow(unused_imports, deprecated, missing_debug_implementations)] + +use {io, Ready, Poll, PollOpt, Registration, SetReadiness, Token}; +use event::Evented; +use lazycell::{LazyCell, AtomicLazyCell}; +use std::any::Any; +use std::fmt; +use std::error; +use std::sync::{mpsc, Arc}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// Creates a new asynchronous channel, where the `Receiver` can be registered +/// with `Poll`. +pub fn channel() -> (Sender, Receiver) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::channel(); + + let tx = Sender { + tx, + ctl: tx_ctl, + }; + + let rx = Receiver { + rx, + ctl: rx_ctl, + }; + + (tx, rx) +} + +/// Creates a new synchronous, bounded channel where the `Receiver` can be +/// registered with `Poll`. +pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::sync_channel(bound); + + let tx = SyncSender { + tx, + ctl: tx_ctl, + }; + + let rx = Receiver { + rx, + ctl: rx_ctl, + }; + + (tx, rx) +} + +pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) { + let inner = Arc::new(Inner { + pending: AtomicUsize::new(0), + senders: AtomicUsize::new(1), + set_readiness: AtomicLazyCell::new(), + }); + + let tx = SenderCtl { + inner: inner.clone(), + }; + + let rx = ReceiverCtl { + registration: LazyCell::new(), + inner, + }; + + (tx, rx) +} + +/// Tracks messages sent on a channel in order to update readiness. +pub struct SenderCtl { + inner: Arc, +} + +/// Tracks messages received on a channel in order to track readiness. +pub struct ReceiverCtl { + registration: LazyCell, + inner: Arc, +} + +pub struct Sender { + tx: mpsc::Sender, + ctl: SenderCtl, +} + +pub struct SyncSender { + tx: mpsc::SyncSender, + ctl: SenderCtl, +} + +pub struct Receiver { + rx: mpsc::Receiver, + ctl: ReceiverCtl, +} + +pub enum SendError { + Io(io::Error), + Disconnected(T), +} + +pub enum TrySendError { + Io(io::Error), + Full(T), + Disconnected(T), +} + +struct Inner { + // The number of outstanding messages for the receiver to read + pending: AtomicUsize, + // The number of sender handles + senders: AtomicUsize, + // The set readiness handle + set_readiness: AtomicLazyCell, +} + +impl Sender { + pub fn send(&self, t: T) -> Result<(), SendError> { + self.tx.send(t) + .map_err(SendError::from) + .and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl Clone for Sender { + fn clone(&self) -> Sender { + Sender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl SyncSender { + pub fn send(&self, t: T) -> Result<(), SendError> { + self.tx.send(t) + .map_err(From::from) + .and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } + + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { + self.tx.try_send(t) + .map_err(From::from) + .and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl Clone for SyncSender { + fn clone(&self) -> SyncSender { + SyncSender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl Receiver { + pub fn try_recv(&self) -> Result { + self.rx.try_recv().and_then(|res| { + let _ = self.ctl.dec(); + Ok(res) + }) + } +} + +impl Evented for Receiver { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.ctl.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.ctl.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.ctl.deregister(poll) + } +} + +/* + * + * ===== SenderCtl / ReceiverCtl ===== + * + */ + +impl SenderCtl { + /// Call to track that a message has been sent + pub fn inc(&self) -> io::Result<()> { + let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire); + + if 0 == cnt { + // Toggle readiness to readable + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Clone for SenderCtl { + fn clone(&self) -> SenderCtl { + self.inner.senders.fetch_add(1, Ordering::Relaxed); + SenderCtl { inner: self.inner.clone() } + } +} + +impl Drop for SenderCtl { + fn drop(&mut self) { + if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 { + let _ = self.inc(); + } + } +} + +impl ReceiverCtl { + pub fn dec(&self) -> io::Result<()> { + let first = self.inner.pending.load(Ordering::Acquire); + + if first == 1 { + // Unset readiness + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::empty())?; + } + } + + // Decrement + let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel); + + if first == 1 && second > 1 { + // There are still pending messages. Since readiness was + // previously unset, it must be reset here + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Evented for ReceiverCtl { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + if self.registration.borrow().is_some() { + return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered")); + } + + let (registration, set_readiness) = Registration::new(poll, token, interest, opts); + + + if self.inner.pending.load(Ordering::Relaxed) > 0 { + // TODO: Don't drop readiness + let _ = set_readiness.set_readiness(Ready::readable()); + } + + self.registration.fill(registration).expect("unexpected state encountered"); + self.inner.set_readiness.fill(set_readiness).expect("unexpected state encountered"); + + Ok(()) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => registration.update(poll, token, interest, opts), + None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => registration.deregister(poll), + None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")), + } + } +} + +/* + * + * ===== Error conversions ===== + * + */ + +impl From> for SendError { + fn from(src: mpsc::SendError) -> SendError { + SendError::Disconnected(src.0) + } +} + +impl From for SendError { + fn from(src: io::Error) -> SendError { + SendError::Io(src) + } +} + +impl From> for TrySendError { + fn from(src: mpsc::TrySendError) -> TrySendError { + match src { + mpsc::TrySendError::Full(v) => TrySendError::Full(v), + mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v), + } + } +} + +impl From> for TrySendError { + fn from(src: mpsc::SendError) -> TrySendError { + TrySendError::Disconnected(src.0) + } +} + +impl From for TrySendError { + fn from(src: io::Error) -> TrySendError { + TrySendError::Io(src) + } +} + +/* + * + * ===== Implement Error, Debug and Display for Errors ===== + * + */ + +impl error::Error for SendError { + fn description(&self) -> &str { + match *self { + SendError::Io(ref io_err) => io_err.description(), + SendError::Disconnected(..) => "Disconnected", + } + } +} + +impl error::Error for TrySendError { + fn description(&self) -> &str { + match *self { + TrySendError::Io(ref io_err) => io_err.description(), + TrySendError::Full(..) => "Full", + TrySendError::Disconnected(..) => "Disconnected", + } + } +} + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl fmt::Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +#[inline] +fn format_send_error(e: &SendError, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + SendError::Io(ref io_err) => write!(f, "{}", io_err), + SendError::Disconnected(..) => write!(f, "Disconnected"), + } +} + +#[inline] +fn format_try_send_error(e: &TrySendError, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + TrySendError::Io(ref io_err) => write!(f, "{}", io_err), + TrySendError::Full(..) => write!(f, "Full"), + TrySendError::Disconnected(..) => write!(f, "Disconnected"), + } +} -- cgit v1.2.3