diff options
Diffstat (limited to 'third_party/rust/tokio/src/io/driver')
-rw-r--r-- | third_party/rust/tokio/src/io/driver/interest.rs | 132 | ||||
-rw-r--r-- | third_party/rust/tokio/src/io/driver/mod.rs | 354 | ||||
-rw-r--r-- | third_party/rust/tokio/src/io/driver/platform.rs | 44 | ||||
-rw-r--r-- | third_party/rust/tokio/src/io/driver/ready.rs | 250 | ||||
-rw-r--r-- | third_party/rust/tokio/src/io/driver/registration.rs | 262 | ||||
-rw-r--r-- | third_party/rust/tokio/src/io/driver/scheduled_io.rs | 533 |
6 files changed, 1575 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/io/driver/interest.rs b/third_party/rust/tokio/src/io/driver/interest.rs new file mode 100644 index 0000000000..d6b46dfb7c --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/interest.rs @@ -0,0 +1,132 @@ +#![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))] + +use crate::io::driver::Ready; + +use std::fmt; +use std::ops; + +/// Readiness event interest. +/// +/// Specifies the readiness events the caller is interested in when awaiting on +/// I/O resource readiness states. +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct Interest(mio::Interest); + +impl Interest { + // The non-FreeBSD definitions in this block are active only when + // building documentation. + cfg_aio! { + /// Interest for POSIX AIO. + #[cfg(target_os = "freebsd")] + pub const AIO: Interest = Interest(mio::Interest::AIO); + + /// Interest for POSIX AIO. + #[cfg(not(target_os = "freebsd"))] + pub const AIO: Interest = Interest(mio::Interest::READABLE); + + /// Interest for POSIX AIO lio_listio events. + #[cfg(target_os = "freebsd")] + pub const LIO: Interest = Interest(mio::Interest::LIO); + + /// Interest for POSIX AIO lio_listio events. + #[cfg(not(target_os = "freebsd"))] + pub const LIO: Interest = Interest(mio::Interest::READABLE); + } + + /// Interest in all readable events. + /// + /// Readable interest includes read-closed events. + pub const READABLE: Interest = Interest(mio::Interest::READABLE); + + /// Interest in all writable events. + /// + /// Writable interest includes write-closed events. + pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + + /// Returns true if the value includes readable interest. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(Interest::READABLE.is_readable()); + /// assert!(!Interest::WRITABLE.is_readable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_readable()); + /// ``` + pub const fn is_readable(self) -> bool { + self.0.is_readable() + } + + /// Returns true if the value includes writable interest. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(!Interest::READABLE.is_writable()); + /// assert!(Interest::WRITABLE.is_writable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_writable()); + /// ``` + pub const fn is_writable(self) -> bool { + self.0.is_writable() + } + + /// Add together two `Interest` values. + /// + /// This function works from a `const` context. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// + /// assert!(BOTH.is_readable()); + /// assert!(BOTH.is_writable()); + pub const fn add(self, other: Interest) -> Interest { + Interest(self.0.add(other.0)) + } + + // This function must be crate-private to avoid exposing a `mio` dependency. + pub(crate) const fn to_mio(self) -> mio::Interest { + self.0 + } + + pub(super) fn mask(self) -> Ready { + match self { + Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED, + Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, + _ => Ready::EMPTY, + } + } +} + +impl ops::BitOr for Interest { + type Output = Self; + + #[inline] + fn bitor(self, other: Self) -> Self { + self.add(other) + } +} + +impl ops::BitOrAssign for Interest { + #[inline] + fn bitor_assign(&mut self, other: Self) { + self.0 = (*self | other).0; + } +} + +impl fmt::Debug for Interest { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} diff --git a/third_party/rust/tokio/src/io/driver/mod.rs b/third_party/rust/tokio/src/io/driver/mod.rs new file mode 100644 index 0000000000..19f67a24e7 --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/mod.rs @@ -0,0 +1,354 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +mod interest; +#[allow(unreachable_pub)] +pub use interest::Interest; + +mod ready; +#[allow(unreachable_pub)] +pub use ready::Ready; + +mod registration; +pub(crate) use registration::Registration; + +mod scheduled_io; +use scheduled_io::ScheduledIo; + +use crate::park::{Park, Unpark}; +use crate::util::slab::{self, Slab}; +use crate::{loom::sync::Mutex, util::bit}; + +use std::fmt; +use std::io; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +/// I/O driver, backed by Mio. +pub(crate) struct Driver { + /// Tracks the number of times `turn` is called. It is safe for this to wrap + /// as it is mostly used to determine when to call `compact()`. + tick: u8, + + /// Reuse the `mio::Events` value across calls to poll. + events: Option<mio::Events>, + + /// Primary slab handle containing the state for each resource registered + /// with this driver. During Drop this is moved into the Inner structure, so + /// this is an Option to allow it to be vacated (until Drop this is always + /// Some). + resources: Option<Slab<ScheduledIo>>, + + /// The system event queue. + poll: mio::Poll, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, +} + +/// A reference to an I/O driver. +#[derive(Clone)] +pub(crate) struct Handle { + inner: Weak<Inner>, +} + +#[derive(Debug)] +pub(crate) struct ReadyEvent { + tick: u8, + pub(crate) ready: Ready, +} + +pub(super) struct Inner { + /// Primary slab handle containing the state for each resource registered + /// with this driver. + /// + /// The ownership of this slab is moved into this structure during + /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles + /// without risking new ones being registered in the meantime. + resources: Mutex<Option<Slab<ScheduledIo>>>, + + /// Registers I/O resources. + registry: mio::Registry, + + /// Allocates `ScheduledIo` handles when creating new resources. + pub(super) io_dispatch: slab::Allocator<ScheduledIo>, + + /// Used to wake up the reactor from a call to `turn`. + waker: mio::Waker, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +enum Direction { + Read, + Write, +} + +enum Tick { + Set(u8), + Clear(u8), +} + +// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup +// token. +const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); + +const ADDRESS: bit::Pack = bit::Pack::least_significant(24); + +// Packs the generation value in the `readiness` field. +// +// The generation prevents a race condition where a slab slot is reused for a +// new socket while the I/O driver is about to apply a readiness event. The +// generation value is checked when setting new readiness. If the generation do +// not match, then the readiness event is discarded. +const GENERATION: bit::Pack = ADDRESS.then(7); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub(crate) fn new() -> io::Result<Driver> { + let poll = mio::Poll::new()?; + let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + let registry = poll.registry().try_clone()?; + + let slab = Slab::new(); + let allocator = slab.allocator(); + + Ok(Driver { + tick: 0, + events: Some(mio::Events::with_capacity(1024)), + poll, + resources: Some(slab), + inner: Arc::new(Inner { + resources: Mutex::new(None), + registry, + io_dispatch: allocator, + waker, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // How often to call `compact()` on the resource slab + const COMPACT_INTERVAL: u8 = 255; + + self.tick = self.tick.wrapping_add(1); + + if self.tick == COMPACT_INTERVAL { + self.resources.as_mut().unwrap().compact() + } + + let mut events = self.events.take().expect("i/o driver event store missing"); + + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.poll.poll(&mut events, max_wait) { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + + // Process all the events that came in, dispatching appropriately + for event in events.iter() { + let token = event.token(); + + if token != TOKEN_WAKEUP { + self.dispatch(token, Ready::from_mio(event)); + } + } + + self.events = Some(events); + + Ok(()) + } + + fn dispatch(&mut self, token: mio::Token, ready: Ready) { + let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); + + let resources = self.resources.as_mut().unwrap(); + + let io = match resources.get(addr) { + Some(io) => io, + None => return, + }; + + let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready); + + if res.is_err() { + // token no longer valid! + return; + } + + io.wake(ready); + } +} + +impl Drop for Driver { + fn drop(&mut self) { + (*self.inner.resources.lock()) = self.resources.take(); + } +} + +impl Drop for Inner { + fn drop(&mut self) { + let resources = self.resources.lock().take(); + + if let Some(mut slab) = resources { + slab.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. + io.shutdown(); + }); + } + } +} + +impl Park for Driver { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } + + fn shutdown(&mut self) {} +} + +impl fmt::Debug for Driver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Driver") + } +} + +// ===== impl Handle ===== + +cfg_rt! { + impl Handle { + /// Returns a handle to the current reactor. + /// + /// # Panics + /// + /// This function panics if there is no current reactor set and `rt` feature + /// flag is not enabled. + pub(super) fn current() -> Self { + crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") + } + } +} + +cfg_not_rt! { + impl Handle { + /// Returns a handle to the current reactor. + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt` + /// feature flag is not enabled. + pub(super) fn current() -> Self { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } + } +} + +impl Handle { + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.waker.wake().expect("failed to wake I/O driver"); + } + } + + pub(super) fn inner(&self) -> Option<Arc<Inner>> { + self.inner.upgrade() + } +} + +impl Unpark for Handle { + fn unpark(&self) { + self.wakeup(); + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Registers an I/O resource with the reactor for a given `mio::Ready` state. + /// + /// The registration token is returned. + pub(super) fn add_source( + &self, + source: &mut impl mio::event::Source, + interest: Interest, + ) -> io::Result<slab::Ref<ScheduledIo>> { + let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "reactor at max registered I/O resources", + ) + })?; + + let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); + + self.registry + .register(source, mio::Token(token), interest.to_mio())?; + + Ok(shared) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { + self.registry.deregister(source) + } +} + +impl Direction { + pub(super) fn mask(self) -> Ready { + match self { + Direction::Read => Ready::READABLE | Ready::READ_CLOSED, + Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, + } + } +} diff --git a/third_party/rust/tokio/src/io/driver/platform.rs b/third_party/rust/tokio/src/io/driver/platform.rs new file mode 100644 index 0000000000..6b27988ce6 --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/platform.rs @@ -0,0 +1,44 @@ +pub(crate) use self::sys::*; + +#[cfg(unix)] +mod sys { + use mio::unix::UnixReady; + use mio::Ready; + + pub(crate) fn hup() -> Ready { + UnixReady::hup().into() + } + + pub(crate) fn is_hup(ready: Ready) -> bool { + UnixReady::from(ready).is_hup() + } + + pub(crate) fn error() -> Ready { + UnixReady::error().into() + } + + pub(crate) fn is_error(ready: Ready) -> bool { + UnixReady::from(ready).is_error() + } +} + +#[cfg(windows)] +mod sys { + use mio::Ready; + + pub(crate) fn hup() -> Ready { + Ready::empty() + } + + pub(crate) fn is_hup(_: Ready) -> bool { + false + } + + pub(crate) fn error() -> Ready { + Ready::empty() + } + + pub(crate) fn is_error(_: Ready) -> bool { + false + } +} diff --git a/third_party/rust/tokio/src/io/driver/ready.rs b/third_party/rust/tokio/src/io/driver/ready.rs new file mode 100644 index 0000000000..2430d3022f --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/ready.rs @@ -0,0 +1,250 @@ +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + +use std::fmt; +use std::ops; + +const READABLE: usize = 0b0_01; +const WRITABLE: usize = 0b0_10; +const READ_CLOSED: usize = 0b0_0100; +const WRITE_CLOSED: usize = 0b0_1000; + +/// Describes the readiness state of an I/O resources. +/// +/// `Ready` tracks which operation an I/O resource is ready to perform. +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[derive(Clone, Copy, PartialEq, PartialOrd)] +pub struct Ready(usize); + +impl Ready { + /// Returns the empty `Ready` set. + pub const EMPTY: Ready = Ready(0); + + /// Returns a `Ready` representing readable readiness. + pub const READABLE: Ready = Ready(READABLE); + + /// Returns a `Ready` representing writable readiness. + pub const WRITABLE: Ready = Ready(WRITABLE); + + /// Returns a `Ready` representing read closed readiness. + pub const READ_CLOSED: Ready = Ready(READ_CLOSED); + + /// Returns a `Ready` representing write closed readiness. + pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + + /// Returns a `Ready` representing readiness for all operations. + pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + + // Must remain crate-private to avoid adding a public dependency on Mio. + pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { + let mut ready = Ready::EMPTY; + + #[cfg(all(target_os = "freebsd", feature = "net"))] + { + if event.is_aio() { + ready |= Ready::READABLE; + } + + if event.is_lio() { + ready |= Ready::READABLE; + } + } + + if event.is_readable() { + ready |= Ready::READABLE; + } + + if event.is_writable() { + ready |= Ready::WRITABLE; + } + + if event.is_read_closed() { + ready |= Ready::READ_CLOSED; + } + + if event.is_write_closed() { + ready |= Ready::WRITE_CLOSED; + } + + ready + } + + /// Returns true if `Ready` is the empty set. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(Ready::EMPTY.is_empty()); + /// assert!(!Ready::READABLE.is_empty()); + /// ``` + pub fn is_empty(self) -> bool { + self == Ready::EMPTY + } + + /// Returns `true` if the value includes `readable`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_readable()); + /// assert!(Ready::READABLE.is_readable()); + /// assert!(Ready::READ_CLOSED.is_readable()); + /// assert!(!Ready::WRITABLE.is_readable()); + /// ``` + pub fn is_readable(self) -> bool { + self.contains(Ready::READABLE) || self.is_read_closed() + } + + /// Returns `true` if the value includes writable `readiness`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_writable()); + /// assert!(!Ready::READABLE.is_writable()); + /// assert!(Ready::WRITABLE.is_writable()); + /// assert!(Ready::WRITE_CLOSED.is_writable()); + /// ``` + pub fn is_writable(self) -> bool { + self.contains(Ready::WRITABLE) || self.is_write_closed() + } + + /// Returns `true` if the value includes read-closed `readiness`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_read_closed()); + /// assert!(!Ready::READABLE.is_read_closed()); + /// assert!(Ready::READ_CLOSED.is_read_closed()); + /// ``` + pub fn is_read_closed(self) -> bool { + self.contains(Ready::READ_CLOSED) + } + + /// Returns `true` if the value includes write-closed `readiness`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_write_closed()); + /// assert!(!Ready::WRITABLE.is_write_closed()); + /// assert!(Ready::WRITE_CLOSED.is_write_closed()); + /// ``` + pub fn is_write_closed(self) -> bool { + self.contains(Ready::WRITE_CLOSED) + } + + /// Returns true if `self` is a superset of `other`. + /// + /// `other` may represent more than one readiness operations, in which case + /// the function only returns true if `self` contains all readiness + /// specified in `other`. + pub(crate) fn contains<T: Into<Self>>(self, other: T) -> bool { + let other = other.into(); + (self & other) == other + } + + /// Creates a `Ready` instance using the given `usize` representation. + /// + /// The `usize` representation must have been obtained from a call to + /// `Readiness::as_usize`. + /// + /// This function is mainly provided to allow the caller to get a + /// readiness value from an `AtomicUsize`. + pub(crate) fn from_usize(val: usize) -> Ready { + Ready(val & Ready::ALL.as_usize()) + } + + /// Returns a `usize` representation of the `Ready` value. + /// + /// This function is mainly provided to allow the caller to store a + /// readiness value in an `AtomicUsize`. + pub(crate) fn as_usize(self) -> usize { + self.0 + } +} + +cfg_io_readiness! { + use crate::io::Interest; + + impl Ready { + pub(crate) fn from_interest(interest: Interest) -> Ready { + let mut ready = Ready::EMPTY; + + if interest.is_readable() { + ready |= Ready::READABLE; + ready |= Ready::READ_CLOSED; + } + + if interest.is_writable() { + ready |= Ready::WRITABLE; + ready |= Ready::WRITE_CLOSED; + } + + ready + } + + pub(crate) fn intersection(self, interest: Interest) -> Ready { + Ready(self.0 & Ready::from_interest(interest).0) + } + + pub(crate) fn satisfies(self, interest: Interest) -> bool { + self.0 & Ready::from_interest(interest).0 != 0 + } + } +} + +impl ops::BitOr<Ready> for Ready { + type Output = Ready; + + #[inline] + fn bitor(self, other: Ready) -> Ready { + Ready(self.0 | other.0) + } +} + +impl ops::BitOrAssign<Ready> for Ready { + #[inline] + fn bitor_assign(&mut self, other: Ready) { + self.0 |= other.0; + } +} + +impl ops::BitAnd<Ready> for Ready { + type Output = Ready; + + #[inline] + fn bitand(self, other: Ready) -> Ready { + Ready(self.0 & other.0) + } +} + +impl ops::Sub<Ready> for Ready { + type Output = Ready; + + #[inline] + fn sub(self, other: Ready) -> Ready { + Ready(self.0 & !other.0) + } +} + +impl fmt::Debug for Ready { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Ready") + .field("is_readable", &self.is_readable()) + .field("is_writable", &self.is_writable()) + .field("is_read_closed", &self.is_read_closed()) + .field("is_write_closed", &self.is_write_closed()) + .finish() + } +} diff --git a/third_party/rust/tokio/src/io/driver/registration.rs b/third_party/rust/tokio/src/io/driver/registration.rs new file mode 100644 index 0000000000..7350be6345 --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/registration.rs @@ -0,0 +1,262 @@ +#![cfg_attr(not(feature = "net"), allow(dead_code))] + +use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; +use crate::util::slab; + +use mio::event::Source; +use std::io; +use std::task::{Context, Poll}; + +cfg_io_driver! { + /// Associates an I/O resource with the reactor instance that drives it. + /// + /// A registration represents an I/O resource registered with a Reactor such + /// that it will receive task notifications on readiness. This is the lowest + /// level API for integrating with a reactor. + /// + /// The association between an I/O resource is made by calling + /// [`new_with_interest_and_handle`]. + /// Once the association is established, it remains established until the + /// registration instance is dropped. + /// + /// A registration instance represents two separate readiness streams. One + /// for the read readiness and one for write readiness. These streams are + /// independent and can be consumed from separate tasks. + /// + /// **Note**: while `Registration` is `Sync`, the caller must ensure that + /// there are at most two tasks that use a registration instance + /// concurrently. One task for [`poll_read_ready`] and one task for + /// [`poll_write_ready`]. While violating this requirement is "safe" from a + /// Rust memory safety point of view, it will result in unexpected behavior + /// in the form of lost notifications and tasks hanging. + /// + /// ## Platform-specific events + /// + /// `Registration` also allows receiving platform-specific `mio::Ready` + /// events. These events are included as part of the read readiness event + /// stream. The write readiness event stream is only for `Ready::writable()` + /// events. + /// + /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle + /// [`poll_read_ready`]: method@Self::poll_read_ready` + /// [`poll_write_ready`]: method@Self::poll_write_ready` + #[derive(Debug)] + pub(crate) struct Registration { + /// Handle to the associated driver. + handle: Handle, + + /// Reference to state stored by the driver. + shared: slab::Ref<ScheduledIo>, + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +// ===== impl Registration ===== + +impl Registration { + /// Registers the I/O resource with the default reactor, for a specific + /// `Interest`. `new_with_interest` should be used over `new` when you need + /// control over the readiness state, such as when a file descriptor only + /// allows reads. This does not add `hup` or `error` so if you are + /// interested in those states, you will need to add them to the readiness + /// state passed to this function. + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + pub(crate) fn new_with_interest_and_handle( + io: &mut impl Source, + interest: Interest, + handle: Handle, + ) -> io::Result<Registration> { + let shared = if let Some(inner) = handle.inner() { + inner.add_source(io, interest)? + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + }; + + Ok(Registration { handle, shared }) + } + + /// Deregisters the I/O resource from the reactor it is associated with. + /// + /// This function must be called before the I/O resource associated with the + /// registration is dropped. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + /// + /// # Return + /// + /// If the deregistration was successful, `Ok` is returned. Any calls to + /// `Reactor::turn` that happen after a successful call to `deregister` will + /// no longer result in notifications getting sent for this registration. + /// + /// `Err` is returned if an error is encountered. + pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + inner.deregister_source(io) + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + self.shared.clear_readiness(event); + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.poll_ready(cx, Direction::Read) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.poll_ready(cx, Direction::Write) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_io<R>( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + self.poll_io(cx, Direction::Read, f) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_io<R>( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + self.poll_io(cx, Direction::Write, f) + } + + /// Polls for events on the I/O resource's `direction` readiness stream. + /// + /// If called with a task context, notify the task when a new event is + /// received. + fn poll_ready( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<io::Result<ReadyEvent>> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + let ev = ready!(self.shared.poll_readiness(cx, direction)); + + if self.handle.inner().is_none() { + return Poll::Ready(Err(gone())); + } + + coop.made_progress(); + Poll::Ready(Ok(ev)) + } + + fn poll_io<R>( + &self, + cx: &mut Context<'_>, + direction: Direction, + mut f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + loop { + let ev = ready!(self.poll_ready(cx, direction))?; + + match f() { + Ok(ret) => { + return Poll::Ready(Ok(ret)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), + } + } + } + + pub(crate) fn try_io<R>( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result<R>, + ) -> io::Result<R> { + let ev = self.shared.ready_event(interest); + + // Don't attempt the operation if the resource is not ready. + if ev.ready.is_empty() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + Err(io::ErrorKind::WouldBlock.into()) + } + res => res, + } + } +} + +impl Drop for Registration { + fn drop(&mut self) { + // It is possible for a cycle to be created between wakers stored in + // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this + // cycle, wakers are cleared. This is an imperfect solution as it is + // possible to store a `Registration` in a waker. In this case, the + // cycle would remain. + // + // See tokio-rs/tokio#3481 for more details. + self.shared.clear_wakers(); + } +} + +fn gone() -> io::Error { + io::Error::new(io::ErrorKind::Other, "IO driver has terminated") +} + +cfg_io_readiness! { + impl Registration { + pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { + use std::future::Future; + use std::pin::Pin; + + let fut = self.shared.readiness(interest); + pin!(fut); + + crate::future::poll_fn(|cx| { + if self.handle.inner().is_none() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR + ))); + } + + Pin::new(&mut fut).poll(cx).map(Ok) + }).await + } + + pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> { + loop { + let event = self.readiness(interest).await?; + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } + } + } +} diff --git a/third_party/rust/tokio/src/io/driver/scheduled_io.rs b/third_party/rust/tokio/src/io/driver/scheduled_io.rs new file mode 100644 index 0000000000..76f93431ba --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/scheduled_io.rs @@ -0,0 +1,533 @@ +use super::{Interest, Ready, ReadyEvent, Tick}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::util::bit; +use crate::util::slab::Entry; +use crate::util::WakeList; + +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::{Context, Poll, Waker}; + +use super::Direction; + +cfg_io_readiness! { + use crate::util::linked_list::{self, LinkedList}; + + use std::cell::UnsafeCell; + use std::future::Future; + use std::marker::PhantomPinned; + use std::pin::Pin; + use std::ptr::NonNull; +} + +/// Stored in the I/O driver resource slab. +#[derive(Debug)] +pub(crate) struct ScheduledIo { + /// Packs the resource's readiness with the resource's generation. + readiness: AtomicUsize, + + waiters: Mutex<Waiters>, +} + +cfg_io_readiness! { + type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; +} + +#[derive(Debug, Default)] +struct Waiters { + #[cfg(feature = "net")] + /// List of all current waiters. + list: WaitList, + + /// Waker used for AsyncRead. + reader: Option<Waker>, + + /// Waker used for AsyncWrite. + writer: Option<Waker>, + + /// True if this ScheduledIo has been killed due to IO driver shutdown. + is_shutdown: bool, +} + +cfg_io_readiness! { + #[derive(Debug)] + struct Waiter { + pointers: linked_list::Pointers<Waiter>, + + /// The waker for this task. + waker: Option<Waker>, + + /// The interest this waiter is waiting on. + interest: Interest, + + is_ready: bool, + + /// Should never be `!Unpin`. + _p: PhantomPinned, + } + + /// Future returned by `readiness()`. + struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, + + state: State, + + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell<Waiter>, + } + + enum State { + Init, + Waiting, + Done, + } +} + +// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. +// +// | reserved | generation | driver tick | readiness | +// |----------+------------+--------------+-----------| +// | 1 bit | 7 bits + 8 bits + 16 bits | + +const READINESS: bit::Pack = bit::Pack::least_significant(16); + +const TICK: bit::Pack = READINESS.then(8); + +const GENERATION: bit::Pack = TICK.then(7); + +#[test] +fn test_generations_assert_same() { + assert_eq!(super::GENERATION, GENERATION); +} + +// ===== impl ScheduledIo ===== + +impl Entry for ScheduledIo { + fn reset(&self) { + let state = self.readiness.load(Acquire); + + let generation = GENERATION.unpack(state); + let next = GENERATION.pack_lossy(generation + 1, 0); + + self.readiness.store(next, Release); + } +} + +impl Default for ScheduledIo { + fn default() -> ScheduledIo { + ScheduledIo { + readiness: AtomicUsize::new(0), + waiters: Mutex::new(Default::default()), + } + } +} + +impl ScheduledIo { + pub(crate) fn generation(&self) -> usize { + GENERATION.unpack(self.readiness.load(Acquire)) + } + + /// Invoked when the IO driver is shut down; forces this ScheduledIo into a + /// permanently ready state. + pub(super) fn shutdown(&self) { + self.wake0(Ready::ALL, true) + } + + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on + /// the current value, returning the previous readiness value. + /// + /// # Arguments + /// - `token`: the token for this `ScheduledIo`. + /// - `tick`: whether setting the tick or trying to clear readiness for a + /// specific tick. + /// - `f`: a closure returning a new readiness value given the previous + /// readiness. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `Err`. + /// Otherwise, this returns the previous readiness. + pub(super) fn set_readiness( + &self, + token: Option<usize>, + tick: Tick, + f: impl Fn(Ready) -> Ready, + ) -> Result<(), ()> { + let mut current = self.readiness.load(Acquire); + + loop { + let current_generation = GENERATION.unpack(current); + + if let Some(token) = token { + // Check that the generation for this access is still the + // current one. + if GENERATION.unpack(token) != current_generation { + return Err(()); + } + } + + // Mask out the tick/generation bits so that the modifying + // function doesn't see them. + let current_readiness = Ready::from_usize(current); + let new = f(current_readiness); + + let packed = match tick { + Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), + Tick::Clear(t) => { + if TICK.unpack(current) as u8 != t { + // Trying to clear readiness with an old event! + return Err(()); + } + + TICK.pack(t as usize, new.as_usize()) + } + }; + + let next = GENERATION.pack(current_generation, packed); + + match self + .readiness + .compare_exchange(current, next, AcqRel, Acquire) + { + Ok(_) => return Ok(()), + // we lost the race, retry! + Err(actual) => current = actual, + } + } + } + + /// Notifies all pending waiters that have registered interest in `ready`. + /// + /// There may be many waiters to notify. Waking the pending task **must** be + /// done from outside of the lock otherwise there is a potential for a + /// deadlock. + /// + /// A stack array of wakers is created and filled with wakers to notify, the + /// lock is released, and the wakers are notified. Because there may be more + /// than 32 wakers to notify, if the stack array fills up, the lock is + /// released, the array is cleared, and the iteration continues. + pub(super) fn wake(&self, ready: Ready) { + self.wake0(ready, false); + } + + fn wake0(&self, ready: Ready, shutdown: bool) { + let mut wakers = WakeList::new(); + + let mut waiters = self.waiters.lock(); + + waiters.is_shutdown |= shutdown; + + // check for AsyncRead slot + if ready.is_readable() { + if let Some(waker) = waiters.reader.take() { + wakers.push(waker); + } + } + + // check for AsyncWrite slot + if ready.is_writable() { + if let Some(waker) = waiters.writer.take() { + wakers.push(waker); + } + } + + #[cfg(feature = "net")] + 'outer: loop { + let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); + + while wakers.can_push() { + match iter.next() { + Some(waiter) => { + let waiter = unsafe { &mut *waiter.as_ptr() }; + + if let Some(waker) = waiter.waker.take() { + waiter.is_ready = true; + wakers.push(waker); + } + } + None => { + break 'outer; + } + } + } + + drop(waiters); + + wakers.wake_all(); + + // Acquire the lock again. + waiters = self.waiters.lock(); + } + + // Release the lock before notifying + drop(waiters); + + wakers.wake_all(); + } + + pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { + let curr = self.readiness.load(Acquire); + + ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + } + } + + /// Polls for readiness events in a given direction. + /// + /// These are to support `AsyncRead` and `AsyncWrite` polling methods, + /// which cannot use the `async fn` version. This uses reserved reader + /// and writer slots. + pub(super) fn poll_readiness( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<ReadyEvent> { + let curr = self.readiness.load(Acquire); + + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + + if ready.is_empty() { + // Update the task info + let mut waiters = self.waiters.lock(); + let slot = match direction { + Direction::Read => &mut waiters.reader, + Direction::Write => &mut waiters.writer, + }; + + // Avoid cloning the waker if one is already stored that matches the + // current task. + match slot { + Some(existing) => { + if !existing.will_wake(cx.waker()) { + *existing = cx.waker().clone(); + } + } + None => { + *slot = Some(cx.waker().clone()); + } + } + + // Try again, in case the readiness was changed while we were + // taking the waiters lock + let curr = self.readiness.load(Acquire); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + if waiters.is_shutdown { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: direction.mask(), + }) + } else if ready.is_empty() { + Poll::Pending + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + }) + } + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + }) + } + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + // This consumes the current readiness state **except** for closed + // states. Closed states are excluded because they are final states. + let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; + + // result isn't important + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); + } + + pub(crate) fn clear_wakers(&self) { + let mut waiters = self.waiters.lock(); + waiters.reader.take(); + waiters.writer.take(); + } +} + +impl Drop for ScheduledIo { + fn drop(&mut self) { + self.wake(Ready::ALL); + } +} + +unsafe impl Send for ScheduledIo {} +unsafe impl Sync for ScheduledIo {} + +cfg_io_readiness! { + impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers. + pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { + self.readiness_fut(interest).await + } + + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + is_ready: false, + interest, + _p: PhantomPinned, + }), + } + } + } + + unsafe impl linked_list::Link for Waiter { + type Handle = NonNull<Waiter>; + type Target = Waiter; + + fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { + *handle + } + + unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { + ptr + } + + unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { + NonNull::from(&mut target.as_mut().pointers) + } + } + + // ===== impl Readiness ===== + + impl Future for Readiness<'_> { + type Output = ReadyEvent; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + use std::sync::atomic::Ordering::SeqCst; + + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; + + loop { + match *state { + State::Init => { + // Optimistically check existing readiness + let curr = scheduled_io.readiness.load(SeqCst); + let ready = Ready::from_usize(READINESS.unpack(curr)); + + // Safety: `waiter.interest` never changes + let interest = unsafe { (*waiter.get()).interest }; + let ready = ready.intersection(interest); + + if !ready.is_empty() { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { tick, ready }); + } + + // Wasn't ready, take the lock (and check again while locked). + let mut waiters = scheduled_io.waiters.lock(); + + let curr = scheduled_io.readiness.load(SeqCst); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); + + if waiters.is_shutdown { + ready = Ready::ALL; + } + + let ready = ready.intersection(interest); + + if !ready.is_empty() { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { tick, ready }); + } + + // Not ready even after locked, insert into list... + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; + } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.is_ready { + // Our waker has been notified. + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; + + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; + + return Poll::Ready(ReadyEvent { + tick, + ready: Ready::from_interest(w.interest), + }); + } + } + } + } + } + + impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock(); + + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } + } + + unsafe impl Send for Readiness<'_> {} + unsafe impl Sync for Readiness<'_> {} +} |