#![cfg_attr(not(feature = "net"), allow(dead_code))] use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; use crate::util::slab; use mio::event::Source; use std::io; use std::task::{Context, Poll}; cfg_io_driver! { /// Associates an I/O resource with the reactor instance that drives it. /// /// A registration represents an I/O resource registered with a Reactor such /// that it will receive task notifications on readiness. This is the lowest /// level API for integrating with a reactor. /// /// The association between an I/O resource is made by calling /// [`new_with_interest_and_handle`]. /// Once the association is established, it remains established until the /// registration instance is dropped. /// /// A registration instance represents two separate readiness streams. One /// for the read readiness and one for write readiness. These streams are /// independent and can be consumed from separate tasks. /// /// **Note**: while `Registration` is `Sync`, the caller must ensure that /// there are at most two tasks that use a registration instance /// concurrently. One task for [`poll_read_ready`] and one task for /// [`poll_write_ready`]. While violating this requirement is "safe" from a /// Rust memory safety point of view, it will result in unexpected behavior /// in the form of lost notifications and tasks hanging. /// /// ## Platform-specific events /// /// `Registration` also allows receiving platform-specific `mio::Ready` /// events. These events are included as part of the read readiness event /// stream. The write readiness event stream is only for `Ready::writable()` /// events. /// /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] pub(crate) struct Registration { /// Handle to the associated driver. handle: Handle, /// Reference to state stored by the driver. shared: slab::Ref, } } unsafe impl Send for Registration {} unsafe impl Sync for Registration {} // ===== impl Registration ===== impl Registration { /// Registers the I/O resource with the default reactor, for a specific /// `Interest`. `new_with_interest` should be used over `new` when you need /// control over the readiness state, such as when a file descriptor only /// allows reads. This does not add `hup` or `error` so if you are /// interested in those states, you will need to add them to the readiness /// state passed to this function. /// /// # Return /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration pub(crate) fn new_with_interest_and_handle( io: &mut impl Source, interest: Interest, handle: Handle, ) -> io::Result { let shared = if let Some(inner) = handle.inner() { inner.add_source(io, interest)? } else { return Err(io::Error::new( io::ErrorKind::Other, "failed to find event loop", )); }; Ok(Registration { handle, shared }) } /// Deregisters the I/O resource from the reactor it is associated with. /// /// This function must be called before the I/O resource associated with the /// registration is dropped. /// /// Note that deregistering does not guarantee that the I/O resource can be /// registered with a different reactor. Some I/O resource types can only be /// associated with a single reactor instance for their lifetime. /// /// # Return /// /// If the deregistration was successful, `Ok` is returned. Any calls to /// `Reactor::turn` that happen after a successful call to `deregister` will /// no longer result in notifications getting sent for this registration. /// /// `Err` is returned if an error is encountered. pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { let inner = match self.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), }; inner.deregister_source(io) } pub(crate) fn clear_readiness(&self, event: ReadyEvent) { self.shared.clear_readiness(event); } // Uses the poll path, requiring the caller to ensure mutual exclusion for // correctness. Only the last task to call this function is notified. pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { self.poll_ready(cx, Direction::Read) } // Uses the poll path, requiring the caller to ensure mutual exclusion for // correctness. Only the last task to call this function is notified. pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { self.poll_ready(cx, Direction::Write) } // Uses the poll path, requiring the caller to ensure mutual exclusion for // correctness. Only the last task to call this function is notified. pub(crate) fn poll_read_io( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result, ) -> Poll> { self.poll_io(cx, Direction::Read, f) } // Uses the poll path, requiring the caller to ensure mutual exclusion for // correctness. Only the last task to call this function is notified. pub(crate) fn poll_write_io( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result, ) -> Poll> { self.poll_io(cx, Direction::Write, f) } /// Polls for events on the I/O resource's `direction` readiness stream. /// /// If called with a task context, notify the task when a new event is /// received. fn poll_ready( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll> { // Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); if self.handle.inner().is_none() { return Poll::Ready(Err(gone())); } coop.made_progress(); Poll::Ready(Ok(ev)) } fn poll_io( &self, cx: &mut Context<'_>, direction: Direction, mut f: impl FnMut() -> io::Result, ) -> Poll> { loop { let ev = ready!(self.poll_ready(cx, direction))?; match f() { Ok(ret) => { return Poll::Ready(Ok(ret)); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(ev); } Err(e) => return Poll::Ready(Err(e)), } } } pub(crate) fn try_io( &self, interest: Interest, f: impl FnOnce() -> io::Result, ) -> io::Result { let ev = self.shared.ready_event(interest); // Don't attempt the operation if the resource is not ready. if ev.ready.is_empty() { return Err(io::ErrorKind::WouldBlock.into()); } match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(ev); Err(io::ErrorKind::WouldBlock.into()) } res => res, } } } impl Drop for Registration { fn drop(&mut self) { // It is possible for a cycle to be created between wakers stored in // `ScheduledIo` instances and `Arc`. To break this // cycle, wakers are cleared. This is an imperfect solution as it is // possible to store a `Registration` in a waker. In this case, the // cycle would remain. // // See tokio-rs/tokio#3481 for more details. self.shared.clear_wakers(); } } fn gone() -> io::Error { io::Error::new(io::ErrorKind::Other, "IO driver has terminated") } cfg_io_readiness! { impl Registration { pub(crate) async fn readiness(&self, interest: Interest) -> io::Result { use std::future::Future; use std::pin::Pin; let fut = self.shared.readiness(interest); pin!(fut); crate::future::poll_fn(|cx| { if self.handle.inner().is_none() { return Poll::Ready(Err(io::Error::new( io::ErrorKind::Other, crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR ))); } Pin::new(&mut fut).poll(cx).map(Ok) }).await } pub(crate) async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result) -> io::Result { loop { let event = self.readiness(interest).await?; match f() { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.clear_readiness(event); } x => return x, } } } } }