summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio-extras/src/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/mio-extras/src/channel.rs')
-rw-r--r--third_party/rust/mio-extras/src/channel.rs431
1 files changed, 431 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/src/channel.rs b/third_party/rust/mio-extras/src/channel.rs
new file mode 100644
index 0000000000..9ebc73bb7f
--- /dev/null
+++ b/third_party/rust/mio-extras/src/channel.rs
@@ -0,0 +1,431 @@
+//! Thread safe communication channel implementing `Evented`
+use lazycell::{AtomicLazyCell, LazyCell};
+use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+use std::any::Any;
+use std::error;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{mpsc, Arc};
+use std::{fmt, io};
+
+/// Creates a new asynchronous channel, where the `Receiver` can be registered
+/// with `Poll`.
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let (tx_ctl, rx_ctl) = ctl_pair();
+ let (tx, rx) = mpsc::channel();
+
+ let tx = Sender { tx, ctl: tx_ctl };
+
+ let rx = Receiver { rx, ctl: rx_ctl };
+
+ (tx, rx)
+}
+
+/// Creates a new synchronous, bounded channel where the `Receiver` can be
+/// registered with `Poll`.
+pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
+ let (tx_ctl, rx_ctl) = ctl_pair();
+ let (tx, rx) = mpsc::sync_channel(bound);
+
+ let tx = SyncSender { tx, ctl: tx_ctl };
+
+ let rx = Receiver { rx, ctl: rx_ctl };
+
+ (tx, rx)
+}
+
+fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
+ let inner = Arc::new(Inner {
+ pending: AtomicUsize::new(0),
+ senders: AtomicUsize::new(1),
+ set_readiness: AtomicLazyCell::new(),
+ });
+
+ let tx = SenderCtl {
+ inner: Arc::clone(&inner),
+ };
+
+ let rx = ReceiverCtl {
+ registration: LazyCell::new(),
+ inner,
+ };
+
+ (tx, rx)
+}
+
+/// Tracks messages sent on a channel in order to update readiness.
+struct SenderCtl {
+ inner: Arc<Inner>,
+}
+
+/// Tracks messages received on a channel in order to track readiness.
+struct ReceiverCtl {
+ registration: LazyCell<Registration>,
+ inner: Arc<Inner>,
+}
+
+/// The sending half of a channel.
+pub struct Sender<T> {
+ tx: mpsc::Sender<T>,
+ ctl: SenderCtl,
+}
+
+/// The sending half of a synchronous channel.
+pub struct SyncSender<T> {
+ tx: mpsc::SyncSender<T>,
+ ctl: SenderCtl,
+}
+
+/// The receiving half of a channel.
+pub struct Receiver<T> {
+ rx: mpsc::Receiver<T>,
+ ctl: ReceiverCtl,
+}
+
+/// An error returned from the `Sender::send` or `SyncSender::send` function.
+pub enum SendError<T> {
+ /// An IO error.
+ Io(io::Error),
+
+ /// The receiving half of the channel has disconnected.
+ Disconnected(T),
+}
+
+/// An error returned from the `SyncSender::try_send` function.
+pub enum TrySendError<T> {
+ /// An IO error.
+ Io(io::Error),
+
+ /// Data could not be sent because it would require the callee to block.
+ Full(T),
+
+ /// The receiving half of the channel has disconnected.
+ Disconnected(T),
+}
+
+struct Inner {
+ // The number of outstanding messages for the receiver to read
+ pending: AtomicUsize,
+ // The number of sender handles
+ senders: AtomicUsize,
+ // The set readiness handle
+ set_readiness: AtomicLazyCell<SetReadiness>,
+}
+
+impl<T> Sender<T> {
+ /// Attempts to send a value on this channel, returning it back if it could not be sent.
+ pub fn send(&self, t: T) -> Result<(), SendError<T>> {
+ self.tx.send(t).map_err(SendError::from).and_then(|_| {
+ try!(self.ctl.inc());
+ Ok(())
+ })
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Sender<T> {
+ Sender {
+ tx: self.tx.clone(),
+ ctl: self.ctl.clone(),
+ }
+ }
+}
+
+impl<T> SyncSender<T> {
+ /// Sends a value on this synchronous channel.
+ ///
+ /// This function will *block* until space in the internal buffer becomes
+ /// available or a receiver is available to hand off the message to.
+ pub fn send(&self, t: T) -> Result<(), SendError<T>> {
+ self.tx.send(t).map_err(From::from).and_then(|_| {
+ try!(self.ctl.inc());
+ Ok(())
+ })
+ }
+
+ /// Attempts to send a value on this channel without blocking.
+ ///
+ /// This method differs from `send` by returning immediately if the channel's
+ /// buffer is full or no receiver is waiting to acquire some data.
+ pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
+ self.tx.try_send(t).map_err(From::from).and_then(|_| {
+ try!(self.ctl.inc());
+ Ok(())
+ })
+ }
+}
+
+impl<T> Clone for SyncSender<T> {
+ fn clone(&self) -> SyncSender<T> {
+ SyncSender {
+ tx: self.tx.clone(),
+ ctl: self.ctl.clone(),
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Attempts to return a pending value on this receiver without blocking.
+ pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
+ self.rx.try_recv().and_then(|res| {
+ let _ = self.ctl.dec();
+ Ok(res)
+ })
+ }
+}
+
+impl<T> Evented for Receiver<T> {
+ fn register(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ self.ctl.register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ self.ctl.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.ctl.deregister(poll)
+ }
+}
+
+/*
+ *
+ * ===== SenderCtl / ReceiverCtl =====
+ *
+ */
+
+impl SenderCtl {
+ /// Call to track that a message has been sent
+ fn inc(&self) -> io::Result<()> {
+ let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
+
+ if 0 == cnt {
+ // Toggle readiness to readable
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ try!(set_readiness.set_readiness(Ready::readable()));
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Clone for SenderCtl {
+ fn clone(&self) -> SenderCtl {
+ self.inner.senders.fetch_add(1, Ordering::Relaxed);
+ SenderCtl {
+ inner: Arc::clone(&self.inner),
+ }
+ }
+}
+
+impl Drop for SenderCtl {
+ fn drop(&mut self) {
+ if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
+ let _ = self.inc();
+ }
+ }
+}
+
+impl ReceiverCtl {
+ fn dec(&self) -> io::Result<()> {
+ let first = self.inner.pending.load(Ordering::Acquire);
+
+ if first == 1 {
+ // Unset readiness
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ try!(set_readiness.set_readiness(Ready::empty()));
+ }
+ }
+
+ // Decrement
+ let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
+
+ if first == 1 && second > 1 {
+ // There are still pending messages. Since readiness was
+ // previously unset, it must be reset here
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ try!(set_readiness.set_readiness(Ready::readable()));
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Evented for ReceiverCtl {
+ fn register(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ if self.registration.borrow().is_some() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver already registered",
+ ));
+ }
+
+ let (registration, set_readiness) = Registration::new2();
+ poll.register(&registration, token, interest, opts)?;
+
+ if self.inner.pending.load(Ordering::Relaxed) > 0 {
+ // TODO: Don't drop readiness
+ let _ = set_readiness.set_readiness(Ready::readable());
+ }
+
+ self.registration
+ .fill(registration)
+ .expect("unexpected state encountered");
+ self.inner
+ .set_readiness
+ .fill(set_readiness)
+ .expect("unexpected state encountered");
+
+ Ok(())
+ }
+
+ fn reregister(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ match self.registration.borrow() {
+ Some(registration) => poll.reregister(registration, token, interest, opts),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered",
+ )),
+ }
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ match self.registration.borrow() {
+ Some(registration) => poll.deregister(registration),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered",
+ )),
+ }
+ }
+}
+
+/*
+ *
+ * ===== Error conversions =====
+ *
+ */
+
+impl<T> From<mpsc::SendError<T>> for SendError<T> {
+ fn from(src: mpsc::SendError<T>) -> SendError<T> {
+ SendError::Disconnected(src.0)
+ }
+}
+
+impl<T> From<io::Error> for SendError<T> {
+ fn from(src: io::Error) -> SendError<T> {
+ SendError::Io(src)
+ }
+}
+
+impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
+ fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
+ match src {
+ mpsc::TrySendError::Full(v) => TrySendError::Full(v),
+ mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
+ }
+ }
+}
+
+impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
+ fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
+ TrySendError::Disconnected(src.0)
+ }
+}
+
+impl<T> From<io::Error> for TrySendError<T> {
+ fn from(src: io::Error) -> TrySendError<T> {
+ TrySendError::Io(src)
+ }
+}
+
+/*
+ *
+ * ===== Implement Error, Debug and Display for Errors =====
+ *
+ */
+
+impl<T: Any> error::Error for SendError<T> {
+ fn description(&self) -> &str {
+ match *self {
+ SendError::Io(ref io_err) => io_err.description(),
+ SendError::Disconnected(..) => "Disconnected",
+ }
+ }
+}
+
+impl<T: Any> error::Error for TrySendError<T> {
+ fn description(&self) -> &str {
+ match *self {
+ TrySendError::Io(ref io_err) => io_err.description(),
+ TrySendError::Full(..) => "Full",
+ TrySendError::Disconnected(..) => "Disconnected",
+ }
+ }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_try_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_try_send_error(self, f)
+ }
+}
+
+#[inline]
+fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+ match *e {
+ SendError::Io(ref io_err) => write!(f, "{}", io_err),
+ SendError::Disconnected(..) => write!(f, "Disconnected"),
+ }
+}
+
+#[inline]
+fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+ match *e {
+ TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
+ TrySendError::Full(..) => write!(f, "Full"),
+ TrySendError::Disconnected(..) => write!(f, "Disconnected"),
+ }
+}