use std::ops::{Deref, DerefMut}; #[cfg(unix)] use std::os::unix::io::AsRawFd; #[cfg(windows)] use std::os::windows::io::AsRawSocket; #[cfg(debug_assertions)] use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, io}; use crate::sys::IoSourceState; use crate::{event, Interest, Registry, Token}; /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] /// implementation. /// /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. /// /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, /// Mio supports registering any FD or socket that can be registered with the /// underlying OS selector. `IoSource` provides the necessary bridge. /// /// [`RawFd`]: std::os::unix::io::RawFd /// [`RawSocket`]: std::os::windows::io::RawSocket /// /// # Notes /// /// To handle the registrations and events properly **all** I/O operations (such /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the /// internal state is updated accordingly. /// /// [`Poll`]: crate::Poll /// [`do_io`]: IoSource::do_io /* /// /// # Examples /// /// Basic usage. /// /// ``` /// # use std::error::Error; /// # fn main() -> Result<(), Box> { /// use mio::{Interest, Poll, Token}; /// use mio::IoSource; /// /// use std::net; /// /// let poll = Poll::new()?; /// /// // Bind a std TCP listener. /// let listener = net::TcpListener::bind("127.0.0.1:0")?; /// // Wrap it in the `IoSource` type. /// let mut listener = IoSource::new(listener); /// /// // Register the listener. /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?; /// # Ok(()) /// # } /// ``` */ pub struct IoSource { state: IoSourceState, inner: T, #[cfg(debug_assertions)] selector_id: SelectorId, } impl IoSource { /// Create a new `IoSource`. pub fn new(io: T) -> IoSource { IoSource { state: IoSourceState::new(), inner: io, #[cfg(debug_assertions)] selector_id: SelectorId::new(), } } /// Execute an I/O operations ensuring that the socket receives more events /// if it hits a [`WouldBlock`] error. /// /// # Notes /// /// This method is required to be called for **all** I/O operations to /// ensure the user will receive events once the socket is ready again after /// returning a [`WouldBlock`] error. /// /// [`WouldBlock`]: io::ErrorKind::WouldBlock pub fn do_io(&self, f: F) -> io::Result where F: FnOnce(&T) -> io::Result, { self.state.do_io(f, &self.inner) } /// Returns the I/O source, dropping the state. /// /// # Notes /// /// To ensure no more events are to be received for this I/O source first /// [`deregister`] it. /// /// [`deregister`]: Registry::deregister pub fn into_inner(self) -> T { self.inner } } /// Be careful when using this method. All I/O operations that may block must go /// through the [`do_io`] method. /// /// [`do_io`]: IoSource::do_io impl Deref for IoSource { type Target = T; fn deref(&self) -> &Self::Target { &self.inner } } /// Be careful when using this method. All I/O operations that may block must go /// through the [`do_io`] method. /// /// [`do_io`]: IoSource::do_io impl DerefMut for IoSource { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } #[cfg(unix)] impl event::Source for IoSource where T: AsRawFd, { fn register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; registry .selector() .register(self.inner.as_raw_fd(), token, interests) } fn reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; registry .selector() .reregister(self.inner.as_raw_fd(), token, interests) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(registry)?; registry.selector().deregister(self.inner.as_raw_fd()) } } #[cfg(windows)] impl event::Source for IoSource where T: AsRawSocket, { fn register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; self.state .register(registry, token, interests, self.inner.as_raw_socket()) } fn reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; self.state.reregister(registry, token, interests) } fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(_registry)?; self.state.deregister() } } impl fmt::Debug for IoSource where T: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.inner.fmt(f) } } /// Used to associate an `IoSource` with a `sys::Selector`. #[cfg(debug_assertions)] #[derive(Debug)] struct SelectorId { id: AtomicUsize, } #[cfg(debug_assertions)] impl SelectorId { /// Value of `id` if `SelectorId` is not associated with any /// `sys::Selector`. Valid selector ids start at 1. const UNASSOCIATED: usize = 0; /// Create a new `SelectorId`. const fn new() -> SelectorId { SelectorId { id: AtomicUsize::new(Self::UNASSOCIATED), } } /// Associate an I/O source with `registry`, returning an error if its /// already registered. fn associate(&self, registry: &Registry) -> io::Result<()> { let registry_id = registry.selector().id(); let previous_id = self.id.swap(registry_id, Ordering::AcqRel); if previous_id == Self::UNASSOCIATED { Ok(()) } else { Err(io::Error::new( io::ErrorKind::AlreadyExists, "I/O source already registered with a `Registry`", )) } } /// Check the association of an I/O source with `registry`, returning an /// error if its registered with a different `Registry` or not registered at /// all. fn check_association(&self, registry: &Registry) -> io::Result<()> { let registry_id = registry.selector().id(); let id = self.id.load(Ordering::Acquire); if id == registry_id { Ok(()) } else if id == Self::UNASSOCIATED { Err(io::Error::new( io::ErrorKind::NotFound, "I/O source not registered with `Registry`", )) } else { Err(io::Error::new( io::ErrorKind::AlreadyExists, "I/O source already registered with a different `Registry`", )) } } /// Remove a previously made association from `registry`, returns an error /// if it was not previously associated with `registry`. fn remove_association(&self, registry: &Registry) -> io::Result<()> { let registry_id = registry.selector().id(); let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); if previous_id == registry_id { Ok(()) } else { Err(io::Error::new( io::ErrorKind::NotFound, "I/O source not registered with `Registry`", )) } } } #[cfg(debug_assertions)] impl Clone for SelectorId { fn clone(&self) -> SelectorId { SelectorId { id: AtomicUsize::new(self.id.load(Ordering::Acquire)), } } }