diff options
Diffstat (limited to 'third_party/rust/tokio/src/io/poll_evented.rs')
-rw-r--r-- | third_party/rust/tokio/src/io/poll_evented.rs | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/io/poll_evented.rs b/third_party/rust/tokio/src/io/poll_evented.rs new file mode 100644 index 0000000000..ce4c1426ac --- /dev/null +++ b/third_party/rust/tokio/src/io/poll_evented.rs @@ -0,0 +1,214 @@ +use crate::io::driver::{Handle, Interest, Registration}; + +use mio::event::Source; +use std::fmt; +use std::io; +use std::ops::Deref; +use std::panic::{RefUnwindSafe, UnwindSafe}; + +cfg_io_driver! { + /// Associates an I/O resource that implements the [`std::io::Read`] and/or + /// [`std::io::Write`] traits with the reactor that drives it. + /// + /// `PollEvented` uses [`Registration`] internally to take a type that + /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or + /// [`std::io::Write`] and associate it with a reactor that will drive it. + /// + /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be + /// used from within the future's execution model. As such, the + /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] + /// implementations using the underlying I/O resource as well as readiness + /// events provided by the reactor. + /// + /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is + /// `Sync`), the caller must ensure that there are at most two tasks that + /// use a `PollEvented` instance concurrently. One for reading and one for + /// writing. While violating this requirement is "safe" from a Rust memory + /// model point of view, it will result in unexpected behavior in the form + /// of lost notifications and tasks hanging. + /// + /// ## Readiness events + /// + /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, + /// this type also supports access to the underlying readiness event stream. + /// While similar in function to what [`Registration`] provides, the + /// semantics are a bit different. + /// + /// Two functions are provided to access the readiness events: + /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the + /// current readiness state of the `PollEvented` instance. If + /// [`poll_read_ready`] indicates read readiness, immediately calling + /// [`poll_read_ready`] again will also indicate read readiness. + /// + /// When the operation is attempted and is unable to succeed due to the I/O + /// resource not being ready, the caller must call `clear_readiness`. + /// This clears the readiness state until a new readiness event is received. + /// + /// This allows the caller to implement additional functions. For example, + /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and + /// `clear_read_ready`. + /// + /// ## Platform-specific events + /// + /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. + /// These events are included as part of the read readiness event stream. The + /// write readiness event stream is only for `Ready::writable()` events. + /// + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`AsyncWrite`]: crate::io::AsyncWrite + /// [`TcpListener`]: crate::net::TcpListener + /// [`poll_read_ready`]: Registration::poll_read_ready + /// [`poll_write_ready`]: Registration::poll_write_ready + pub(crate) struct PollEvented<E: Source> { + io: Option<E>, + registration: Registration, + } +} + +// ===== impl PollEvented ===== + +impl<E: Source> PollEvented<E> { + /// Creates a new `PollEvented` associated with the default reactor. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[cfg_attr(feature = "signal", allow(unused))] + pub(crate) fn new(io: E) -> io::Result<Self> { + PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) + } + + /// Creates a new `PollEvented` associated with the default reactor, for + /// specific `Interest` state. `new_with_interest` should be used over `new` + /// when you need control over the readiness state, such as when a file + /// descriptor only allows reads. This does not add `hup` or `error` so if + /// you are interested in those states, you will need to add them to the + /// readiness state passed to this function. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called from + /// a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) + /// function. + #[cfg_attr(feature = "signal", allow(unused))] + pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { + Self::new_with_interest_and_handle(io, interest, Handle::current()) + } + + pub(crate) fn new_with_interest_and_handle( + mut io: E, + interest: Interest, + handle: Handle, + ) -> io::Result<Self> { + let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; + Ok(Self { + io: Some(io), + registration, + }) + } + + /// Returns a reference to the registration. + #[cfg(any( + feature = "net", + all(unix, feature = "process"), + all(unix, feature = "signal"), + ))] + pub(crate) fn registration(&self) -> &Registration { + &self.registration + } + + /// Deregisters the inner io from the registration and returns a Result containing the inner io. + #[cfg(any(feature = "net", feature = "process"))] + pub(crate) fn into_inner(mut self) -> io::Result<E> { + let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. + self.registration.deregister(&mut inner)?; + Ok(inner) + } +} + +feature! { + #![any(feature = "net", feature = "process")] + + use crate::io::ReadBuf; + use std::task::{Context, Poll}; + + impl<E: Source> PollEvented<E> { + // Safety: The caller must ensure that `E` can read into uninitialized memory + pub(crate) unsafe fn poll_read<'a>( + &'a self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> + where + &'a E: io::Read + 'a, + { + use std::io::Read; + + let n = ready!(self.registration.poll_read_io(cx, || { + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); + self.io.as_ref().unwrap().read(b) + }))?; + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + buf.assume_init(n); + buf.advance(n); + Poll::Ready(Ok(())) + } + + pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> + where + &'a E: io::Write + 'a, + { + use std::io::Write; + self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf)) + } + + #[cfg(feature = "net")] + pub(crate) fn poll_write_vectored<'a>( + &'a self, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll<io::Result<usize>> + where + &'a E: io::Write + 'a, + { + use std::io::Write; + self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) + } + } +} + +impl<E: Source> UnwindSafe for PollEvented<E> {} + +impl<E: Source> RefUnwindSafe for PollEvented<E> {} + +impl<E: Source> Deref for PollEvented<E> { + type Target = E; + + fn deref(&self) -> &E { + self.io.as_ref().unwrap() + } +} + +impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollEvented").field("io", &self.io).finish() + } +} + +impl<E: Source> Drop for PollEvented<E> { + fn drop(&mut self) { + if let Some(mut io) = self.io.take() { + // Ignore errors + let _ = self.registration.deregister(&mut io); + } + } +} |