diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/mio-extras/src/channel.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/mio-extras/src/channel.rs')
-rw-r--r-- | third_party/rust/mio-extras/src/channel.rs | 431 |
1 files changed, 431 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/src/channel.rs b/third_party/rust/mio-extras/src/channel.rs new file mode 100644 index 0000000000..561317ecbf --- /dev/null +++ b/third_party/rust/mio-extras/src/channel.rs @@ -0,0 +1,431 @@ +//! Thread safe communication channel implementing `Evented` +use lazycell::{AtomicLazyCell, LazyCell}; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use std::any::Any; +use std::error; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{mpsc, Arc}; +use std::{fmt, io}; + +/// Creates a new asynchronous channel, where the `Receiver` can be registered +/// with `Poll`. +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::channel(); + + let tx = Sender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +/// Creates a new synchronous, bounded channel where the `Receiver` can be +/// registered with `Poll`. +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::sync_channel(bound); + + let tx = SyncSender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +fn ctl_pair() -> (SenderCtl, ReceiverCtl) { + let inner = Arc::new(Inner { + pending: AtomicUsize::new(0), + senders: AtomicUsize::new(1), + set_readiness: AtomicLazyCell::new(), + }); + + let tx = SenderCtl { + inner: Arc::clone(&inner), + }; + + let rx = ReceiverCtl { + registration: LazyCell::new(), + inner, + }; + + (tx, rx) +} + +/// Tracks messages sent on a channel in order to update readiness. +struct SenderCtl { + inner: Arc<Inner>, +} + +/// Tracks messages received on a channel in order to track readiness. +struct ReceiverCtl { + registration: LazyCell<Registration>, + inner: Arc<Inner>, +} + +/// The sending half of a channel. +pub struct Sender<T> { + tx: mpsc::Sender<T>, + ctl: SenderCtl, +} + +/// The sending half of a synchronous channel. +pub struct SyncSender<T> { + tx: mpsc::SyncSender<T>, + ctl: SenderCtl, +} + +/// The receiving half of a channel. +pub struct Receiver<T> { + rx: mpsc::Receiver<T>, + ctl: ReceiverCtl, +} + +/// An error returned from the `Sender::send` or `SyncSender::send` function. +pub enum SendError<T> { + /// An IO error. + Io(io::Error), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +/// An error returned from the `SyncSender::try_send` function. +pub enum TrySendError<T> { + /// An IO error. + Io(io::Error), + + /// Data could not be sent because it would require the callee to block. + Full(T), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +struct Inner { + // The number of outstanding messages for the receiver to read + pending: AtomicUsize, + // The number of sender handles + senders: AtomicUsize, + // The set readiness handle + set_readiness: AtomicLazyCell<SetReadiness>, +} + +impl<T> Sender<T> { + /// Attempts to send a value on this channel, returning it back if it could not be sent. + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.tx.send(t).map_err(SendError::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl<T> SyncSender<T> { + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.tx.send(t).map_err(From::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from `send` by returning immediately if the channel's + /// buffer is full or no receiver is waiting to acquire some data. + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.tx.try_send(t).map_err(From::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + SyncSender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl<T> Receiver<T> { + /// Attempts to return a pending value on this receiver without blocking. + pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { + self.rx.try_recv().and_then(|res| { + let _ = self.ctl.dec(); + Ok(res) + }) + } +} + +impl<T> Evented for Receiver<T> { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.ctl.deregister(poll) + } +} + +/* + * + * ===== SenderCtl / ReceiverCtl ===== + * + */ + +impl SenderCtl { + /// Call to track that a message has been sent + fn inc(&self) -> io::Result<()> { + let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire); + + if 0 == cnt { + // Toggle readiness to readable + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Clone for SenderCtl { + fn clone(&self) -> SenderCtl { + self.inner.senders.fetch_add(1, Ordering::Relaxed); + SenderCtl { + inner: Arc::clone(&self.inner), + } + } +} + +impl Drop for SenderCtl { + fn drop(&mut self) { + if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 { + let _ = self.inc(); + } + } +} + +impl ReceiverCtl { + fn dec(&self) -> io::Result<()> { + let first = self.inner.pending.load(Ordering::Acquire); + + if first == 1 { + // Unset readiness + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::empty())?; + } + } + + // Decrement + let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel); + + if first == 1 && second > 1 { + // There are still pending messages. Since readiness was + // previously unset, it must be reset here + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Evented for ReceiverCtl { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.registration.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "receiver already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + + if self.inner.pending.load(Ordering::Relaxed) > 0 { + // TODO: Don't drop readiness + let _ = set_readiness.set_readiness(Ready::readable()); + } + + self.registration + .fill(registration) + .expect("unexpected state encountered"); + self.inner + .set_readiness + .fill(set_readiness) + .expect("unexpected state encountered"); + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.reregister(registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.deregister(registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +/* + * + * ===== Error conversions ===== + * + */ + +impl<T> From<mpsc::SendError<T>> for SendError<T> { + fn from(src: mpsc::SendError<T>) -> SendError<T> { + SendError::Disconnected(src.0) + } +} + +impl<T> From<io::Error> for SendError<T> { + fn from(src: io::Error) -> SendError<T> { + SendError::Io(src) + } +} + +impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> { + fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> { + match src { + mpsc::TrySendError::Full(v) => TrySendError::Full(v), + mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v), + } + } +} + +impl<T> From<mpsc::SendError<T>> for TrySendError<T> { + fn from(src: mpsc::SendError<T>) -> TrySendError<T> { + TrySendError::Disconnected(src.0) + } +} + +impl<T> From<io::Error> for TrySendError<T> { + fn from(src: io::Error) -> TrySendError<T> { + TrySendError::Io(src) + } +} + +/* + * + * ===== Implement Error, Debug and Display for Errors ===== + * + */ + +impl<T: Any> error::Error for SendError<T> { + fn description(&self) -> &str { + match *self { + SendError::Io(ref io_err) => io_err.description(), + SendError::Disconnected(..) => "Disconnected", + } + } +} + +impl<T: Any> error::Error for TrySendError<T> { + fn description(&self) -> &str { + match *self { + TrySendError::Io(ref io_err) => io_err.description(), + TrySendError::Full(..) => "Full", + TrySendError::Disconnected(..) => "Disconnected", + } + } +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +#[inline] +fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + SendError::Io(ref io_err) => write!(f, "{}", io_err), + SendError::Disconnected(..) => write!(f, "Disconnected"), + } +} + +#[inline] +fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + TrySendError::Io(ref io_err) => write!(f, "{}", io_err), + TrySendError::Full(..) => write!(f, "Full"), + TrySendError::Disconnected(..) => write!(f, "Disconnected"), + } +} |