diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/io/driver | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/io/driver')
-rw-r--r-- | vendor/tokio/src/io/driver/interest.rs | 112 | ||||
-rw-r--r-- | vendor/tokio/src/io/driver/mod.rs | 353 | ||||
-rw-r--r-- | vendor/tokio/src/io/driver/platform.rs | 44 | ||||
-rw-r--r-- | vendor/tokio/src/io/driver/ready.rs | 239 | ||||
-rw-r--r-- | vendor/tokio/src/io/driver/registration.rs | 262 | ||||
-rw-r--r-- | vendor/tokio/src/io/driver/scheduled_io.rs | 544 |
6 files changed, 0 insertions, 1554 deletions
diff --git a/vendor/tokio/src/io/driver/interest.rs b/vendor/tokio/src/io/driver/interest.rs deleted file mode 100644 index 36951cf5a..000000000 --- a/vendor/tokio/src/io/driver/interest.rs +++ /dev/null @@ -1,112 +0,0 @@ -#![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))] - -use crate::io::driver::Ready; - -use std::fmt; -use std::ops; - -/// Readiness event interest -/// -/// Specifies the readiness events the caller is interested in when awaiting on -/// I/O resource readiness states. -#[cfg_attr(docsrs, doc(cfg(feature = "net")))] -#[derive(Clone, Copy, Eq, PartialEq)] -pub struct Interest(mio::Interest); - -impl Interest { - /// Interest in all readable events. - /// - /// Readable interest includes read-closed events. - pub const READABLE: Interest = Interest(mio::Interest::READABLE); - - /// Interest in all writable events - /// - /// Writable interest includes write-closed events. - pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); - - /// Returns true if the value includes readable interest. - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Interest; - /// - /// assert!(Interest::READABLE.is_readable()); - /// assert!(!Interest::WRITABLE.is_readable()); - /// - /// let both = Interest::READABLE | Interest::WRITABLE; - /// assert!(both.is_readable()); - /// ``` - pub const fn is_readable(self) -> bool { - self.0.is_readable() - } - - /// Returns true if the value includes writable interest. - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Interest; - /// - /// assert!(!Interest::READABLE.is_writable()); - /// assert!(Interest::WRITABLE.is_writable()); - /// - /// let both = Interest::READABLE | Interest::WRITABLE; - /// assert!(both.is_writable()); - /// ``` - pub const fn is_writable(self) -> bool { - self.0.is_writable() - } - - /// Add together two `Interest` values. - /// - /// This function works from a `const` context. - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Interest; - /// - /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); - /// - /// assert!(BOTH.is_readable()); - /// assert!(BOTH.is_writable()); - pub const fn add(self, other: Interest) -> Interest { - Interest(self.0.add(other.0)) - } - - // This function must be crate-private to avoid exposing a `mio` dependency. - pub(crate) const fn to_mio(self) -> mio::Interest { - self.0 - } - - pub(super) fn mask(self) -> Ready { - match self { - Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED, - Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, - _ => Ready::EMPTY, - } - } -} - -impl ops::BitOr for Interest { - type Output = Self; - - #[inline] - fn bitor(self, other: Self) -> Self { - self.add(other) - } -} - -impl ops::BitOrAssign for Interest { - #[inline] - fn bitor_assign(&mut self, other: Self) { - self.0 = (*self | other).0; - } -} - -impl fmt::Debug for Interest { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(fmt) - } -} diff --git a/vendor/tokio/src/io/driver/mod.rs b/vendor/tokio/src/io/driver/mod.rs deleted file mode 100644 index 3aa0cfbb2..000000000 --- a/vendor/tokio/src/io/driver/mod.rs +++ /dev/null @@ -1,353 +0,0 @@ -#![cfg_attr(not(feature = "rt"), allow(dead_code))] - -mod interest; -#[allow(unreachable_pub)] -pub use interest::Interest; - -mod ready; -#[allow(unreachable_pub)] -pub use ready::Ready; - -mod registration; -pub(crate) use registration::Registration; - -mod scheduled_io; -use scheduled_io::ScheduledIo; - -use crate::park::{Park, Unpark}; -use crate::util::slab::{self, Slab}; -use crate::{loom::sync::Mutex, util::bit}; - -use std::fmt; -use std::io; -use std::sync::{Arc, Weak}; -use std::time::Duration; - -/// I/O driver, backed by Mio -pub(crate) struct Driver { - /// Tracks the number of times `turn` is called. It is safe for this to wrap - /// as it is mostly used to determine when to call `compact()` - tick: u8, - - /// Reuse the `mio::Events` value across calls to poll. - events: Option<mio::Events>, - - /// Primary slab handle containing the state for each resource registered - /// with this driver. During Drop this is moved into the Inner structure, so - /// this is an Option to allow it to be vacated (until Drop this is always - /// Some) - resources: Option<Slab<ScheduledIo>>, - - /// The system event queue - poll: mio::Poll, - - /// State shared between the reactor and the handles. - inner: Arc<Inner>, -} - -/// A reference to an I/O driver -#[derive(Clone)] -pub(crate) struct Handle { - inner: Weak<Inner>, -} - -pub(crate) struct ReadyEvent { - tick: u8, - pub(crate) ready: Ready, -} - -pub(super) struct Inner { - /// Primary slab handle containing the state for each resource registered - /// with this driver. - /// - /// The ownership of this slab is moved into this structure during - /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles - /// without risking new ones being registered in the meantime. - resources: Mutex<Option<Slab<ScheduledIo>>>, - - /// Registers I/O resources - registry: mio::Registry, - - /// Allocates `ScheduledIo` handles when creating new resources. - pub(super) io_dispatch: slab::Allocator<ScheduledIo>, - - /// Used to wake up the reactor from a call to `turn` - waker: mio::Waker, -} - -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -enum Direction { - Read, - Write, -} - -enum Tick { - Set(u8), - Clear(u8), -} - -// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup -// token. -const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); - -const ADDRESS: bit::Pack = bit::Pack::least_significant(24); - -// Packs the generation value in the `readiness` field. -// -// The generation prevents a race condition where a slab slot is reused for a -// new socket while the I/O driver is about to apply a readiness event. The -// generation value is checked when setting new readiness. If the generation do -// not match, then the readiness event is discarded. -const GENERATION: bit::Pack = ADDRESS.then(7); - -fn _assert_kinds() { - fn _assert<T: Send + Sync>() {} - - _assert::<Handle>(); -} - -// ===== impl Driver ===== - -impl Driver { - /// Creates a new event loop, returning any error that happened during the - /// creation. - pub(crate) fn new() -> io::Result<Driver> { - let poll = mio::Poll::new()?; - let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; - let registry = poll.registry().try_clone()?; - - let slab = Slab::new(); - let allocator = slab.allocator(); - - Ok(Driver { - tick: 0, - events: Some(mio::Events::with_capacity(1024)), - poll, - resources: Some(slab), - inner: Arc::new(Inner { - resources: Mutex::new(None), - registry, - io_dispatch: allocator, - waker, - }), - }) - } - - /// Returns a handle to this event loop which can be sent across threads - /// and can be used as a proxy to the event loop itself. - /// - /// Handles are cloneable and clones always refer to the same event loop. - /// This handle is typically passed into functions that create I/O objects - /// to bind them to this event loop. - pub(crate) fn handle(&self) -> Handle { - Handle { - inner: Arc::downgrade(&self.inner), - } - } - - fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { - // How often to call `compact()` on the resource slab - const COMPACT_INTERVAL: u8 = 255; - - self.tick = self.tick.wrapping_add(1); - - if self.tick == COMPACT_INTERVAL { - self.resources.as_mut().unwrap().compact() - } - - let mut events = self.events.take().expect("i/o driver event store missing"); - - // Block waiting for an event to happen, peeling out how many events - // happened. - match self.poll.poll(&mut events, max_wait) { - Ok(_) => {} - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - - // Process all the events that came in, dispatching appropriately - for event in events.iter() { - let token = event.token(); - - if token != TOKEN_WAKEUP { - self.dispatch(token, Ready::from_mio(event)); - } - } - - self.events = Some(events); - - Ok(()) - } - - fn dispatch(&mut self, token: mio::Token, ready: Ready) { - let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - - let resources = self.resources.as_mut().unwrap(); - - let io = match resources.get(addr) { - Some(io) => io, - None => return, - }; - - let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready); - - if res.is_err() { - // token no longer valid! - return; - } - - io.wake(ready); - } -} - -impl Drop for Driver { - fn drop(&mut self) { - (*self.inner.resources.lock()) = self.resources.take(); - } -} - -impl Drop for Inner { - fn drop(&mut self) { - let resources = self.resources.lock().take(); - - if let Some(mut slab) = resources { - slab.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. - io.shutdown(); - }); - } - } -} - -impl Park for Driver { - type Unpark = Handle; - type Error = io::Error; - - fn unpark(&self) -> Self::Unpark { - self.handle() - } - - fn park(&mut self) -> io::Result<()> { - self.turn(None)?; - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { - self.turn(Some(duration))?; - Ok(()) - } - - fn shutdown(&mut self) {} -} - -impl fmt::Debug for Driver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Driver") - } -} - -// ===== impl Handle ===== - -cfg_rt! { - impl Handle { - /// Returns a handle to the current reactor - /// - /// # Panics - /// - /// This function panics if there is no current reactor set and `rt` feature - /// flag is not enabled. - pub(super) fn current() -> Self { - crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") - } - } -} - -cfg_not_rt! { - impl Handle { - /// Returns a handle to the current reactor - /// - /// # Panics - /// - /// This function panics if there is no current reactor set, or if the `rt` - /// feature flag is not enabled. - pub(super) fn current() -> Self { - panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) - } - } -} - -impl Handle { - /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise - /// makes the next call to `turn` return immediately. - /// - /// This method is intended to be used in situations where a notification - /// needs to otherwise be sent to the main reactor. If the reactor is - /// currently blocked inside of `turn` then it will wake up and soon return - /// after this method has been called. If the reactor is not currently - /// blocked in `turn`, then the next call to `turn` will not block and - /// return immediately. - fn wakeup(&self) { - if let Some(inner) = self.inner() { - inner.waker.wake().expect("failed to wake I/O driver"); - } - } - - pub(super) fn inner(&self) -> Option<Arc<Inner>> { - self.inner.upgrade() - } -} - -impl Unpark for Handle { - fn unpark(&self) { - self.wakeup(); - } -} - -impl fmt::Debug for Handle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Handle") - } -} - -// ===== impl Inner ===== - -impl Inner { - /// Registers an I/O resource with the reactor for a given `mio::Ready` state. - /// - /// The registration token is returned. - pub(super) fn add_source( - &self, - source: &mut impl mio::event::Source, - interest: Interest, - ) -> io::Result<slab::Ref<ScheduledIo>> { - let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "reactor at max registered I/O resources", - ) - })?; - - let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - - self.registry - .register(source, mio::Token(token), interest.to_mio())?; - - Ok(shared) - } - - /// Deregisters an I/O resource from the reactor. - pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { - self.registry.deregister(source) - } -} - -impl Direction { - pub(super) fn mask(self) -> Ready { - match self { - Direction::Read => Ready::READABLE | Ready::READ_CLOSED, - Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, - } - } -} diff --git a/vendor/tokio/src/io/driver/platform.rs b/vendor/tokio/src/io/driver/platform.rs deleted file mode 100644 index 6b27988ce..000000000 --- a/vendor/tokio/src/io/driver/platform.rs +++ /dev/null @@ -1,44 +0,0 @@ -pub(crate) use self::sys::*; - -#[cfg(unix)] -mod sys { - use mio::unix::UnixReady; - use mio::Ready; - - pub(crate) fn hup() -> Ready { - UnixReady::hup().into() - } - - pub(crate) fn is_hup(ready: Ready) -> bool { - UnixReady::from(ready).is_hup() - } - - pub(crate) fn error() -> Ready { - UnixReady::error().into() - } - - pub(crate) fn is_error(ready: Ready) -> bool { - UnixReady::from(ready).is_error() - } -} - -#[cfg(windows)] -mod sys { - use mio::Ready; - - pub(crate) fn hup() -> Ready { - Ready::empty() - } - - pub(crate) fn is_hup(_: Ready) -> bool { - false - } - - pub(crate) fn error() -> Ready { - Ready::empty() - } - - pub(crate) fn is_error(_: Ready) -> bool { - false - } -} diff --git a/vendor/tokio/src/io/driver/ready.rs b/vendor/tokio/src/io/driver/ready.rs deleted file mode 100644 index 2ac01bdbe..000000000 --- a/vendor/tokio/src/io/driver/ready.rs +++ /dev/null @@ -1,239 +0,0 @@ -#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] - -use std::fmt; -use std::ops; - -const READABLE: usize = 0b0_01; -const WRITABLE: usize = 0b0_10; -const READ_CLOSED: usize = 0b0_0100; -const WRITE_CLOSED: usize = 0b0_1000; - -/// Describes the readiness state of an I/O resources. -/// -/// `Ready` tracks which operation an I/O resource is ready to perform. -#[cfg_attr(docsrs, doc(cfg(feature = "net")))] -#[derive(Clone, Copy, PartialEq, PartialOrd)] -pub struct Ready(usize); - -impl Ready { - /// Returns the empty `Ready` set. - pub const EMPTY: Ready = Ready(0); - - /// Returns a `Ready` representing readable readiness. - pub const READABLE: Ready = Ready(READABLE); - - /// Returns a `Ready` representing writable readiness. - pub const WRITABLE: Ready = Ready(WRITABLE); - - /// Returns a `Ready` representing read closed readiness. - pub const READ_CLOSED: Ready = Ready(READ_CLOSED); - - /// Returns a `Ready` representing write closed readiness. - pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); - - /// Returns a `Ready` representing readiness for all operations. - pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); - - // Must remain crate-private to avoid adding a public dependency on Mio. - pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { - let mut ready = Ready::EMPTY; - - if event.is_readable() { - ready |= Ready::READABLE; - } - - if event.is_writable() { - ready |= Ready::WRITABLE; - } - - if event.is_read_closed() { - ready |= Ready::READ_CLOSED; - } - - if event.is_write_closed() { - ready |= Ready::WRITE_CLOSED; - } - - ready - } - - /// Returns true if `Ready` is the empty set - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Ready; - /// - /// assert!(Ready::EMPTY.is_empty()); - /// assert!(!Ready::READABLE.is_empty()); - /// ``` - pub fn is_empty(self) -> bool { - self == Ready::EMPTY - } - - /// Returns `true` if the value includes `readable` - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Ready; - /// - /// assert!(!Ready::EMPTY.is_readable()); - /// assert!(Ready::READABLE.is_readable()); - /// assert!(Ready::READ_CLOSED.is_readable()); - /// assert!(!Ready::WRITABLE.is_readable()); - /// ``` - pub fn is_readable(self) -> bool { - self.contains(Ready::READABLE) || self.is_read_closed() - } - - /// Returns `true` if the value includes writable `readiness` - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Ready; - /// - /// assert!(!Ready::EMPTY.is_writable()); - /// assert!(!Ready::READABLE.is_writable()); - /// assert!(Ready::WRITABLE.is_writable()); - /// assert!(Ready::WRITE_CLOSED.is_writable()); - /// ``` - pub fn is_writable(self) -> bool { - self.contains(Ready::WRITABLE) || self.is_write_closed() - } - - /// Returns `true` if the value includes read-closed `readiness` - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Ready; - /// - /// assert!(!Ready::EMPTY.is_read_closed()); - /// assert!(!Ready::READABLE.is_read_closed()); - /// assert!(Ready::READ_CLOSED.is_read_closed()); - /// ``` - pub fn is_read_closed(self) -> bool { - self.contains(Ready::READ_CLOSED) - } - - /// Returns `true` if the value includes write-closed `readiness` - /// - /// # Examples - /// - /// ``` - /// use tokio::io::Ready; - /// - /// assert!(!Ready::EMPTY.is_write_closed()); - /// assert!(!Ready::WRITABLE.is_write_closed()); - /// assert!(Ready::WRITE_CLOSED.is_write_closed()); - /// ``` - pub fn is_write_closed(self) -> bool { - self.contains(Ready::WRITE_CLOSED) - } - - /// Returns true if `self` is a superset of `other`. - /// - /// `other` may represent more than one readiness operations, in which case - /// the function only returns true if `self` contains all readiness - /// specified in `other`. - pub(crate) fn contains<T: Into<Self>>(self, other: T) -> bool { - let other = other.into(); - (self & other) == other - } - - /// Create a `Ready` instance using the given `usize` representation. - /// - /// The `usize` representation must have been obtained from a call to - /// `Readiness::as_usize`. - /// - /// This function is mainly provided to allow the caller to get a - /// readiness value from an `AtomicUsize`. - pub(crate) fn from_usize(val: usize) -> Ready { - Ready(val & Ready::ALL.as_usize()) - } - - /// Returns a `usize` representation of the `Ready` value. - /// - /// This function is mainly provided to allow the caller to store a - /// readiness value in an `AtomicUsize`. - pub(crate) fn as_usize(self) -> usize { - self.0 - } -} - -cfg_io_readiness! { - use crate::io::Interest; - - impl Ready { - pub(crate) fn from_interest(interest: Interest) -> Ready { - let mut ready = Ready::EMPTY; - - if interest.is_readable() { - ready |= Ready::READABLE; - ready |= Ready::READ_CLOSED; - } - - if interest.is_writable() { - ready |= Ready::WRITABLE; - ready |= Ready::WRITE_CLOSED; - } - - ready - } - - pub(crate) fn intersection(self, interest: Interest) -> Ready { - Ready(self.0 & Ready::from_interest(interest).0) - } - - pub(crate) fn satisfies(self, interest: Interest) -> bool { - self.0 & Ready::from_interest(interest).0 != 0 - } - } -} - -impl ops::BitOr<Ready> for Ready { - type Output = Ready; - - #[inline] - fn bitor(self, other: Ready) -> Ready { - Ready(self.0 | other.0) - } -} - -impl ops::BitOrAssign<Ready> for Ready { - #[inline] - fn bitor_assign(&mut self, other: Ready) { - self.0 |= other.0; - } -} - -impl ops::BitAnd<Ready> for Ready { - type Output = Ready; - - #[inline] - fn bitand(self, other: Ready) -> Ready { - Ready(self.0 & other.0) - } -} - -impl ops::Sub<Ready> for Ready { - type Output = Ready; - - #[inline] - fn sub(self, other: Ready) -> Ready { - Ready(self.0 & !other.0) - } -} - -impl fmt::Debug for Ready { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Ready") - .field("is_readable", &self.is_readable()) - .field("is_writable", &self.is_writable()) - .field("is_read_closed", &self.is_read_closed()) - .field("is_write_closed", &self.is_write_closed()) - .finish() - } -} diff --git a/vendor/tokio/src/io/driver/registration.rs b/vendor/tokio/src/io/driver/registration.rs deleted file mode 100644 index 7350be634..000000000 --- a/vendor/tokio/src/io/driver/registration.rs +++ /dev/null @@ -1,262 +0,0 @@ -#![cfg_attr(not(feature = "net"), allow(dead_code))] - -use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; -use crate::util::slab; - -use mio::event::Source; -use std::io; -use std::task::{Context, Poll}; - -cfg_io_driver! { - /// Associates an I/O resource with the reactor instance that drives it. - /// - /// A registration represents an I/O resource registered with a Reactor such - /// that it will receive task notifications on readiness. This is the lowest - /// level API for integrating with a reactor. - /// - /// The association between an I/O resource is made by calling - /// [`new_with_interest_and_handle`]. - /// Once the association is established, it remains established until the - /// registration instance is dropped. - /// - /// A registration instance represents two separate readiness streams. One - /// for the read readiness and one for write readiness. These streams are - /// independent and can be consumed from separate tasks. - /// - /// **Note**: while `Registration` is `Sync`, the caller must ensure that - /// there are at most two tasks that use a registration instance - /// concurrently. One task for [`poll_read_ready`] and one task for - /// [`poll_write_ready`]. While violating this requirement is "safe" from a - /// Rust memory safety point of view, it will result in unexpected behavior - /// in the form of lost notifications and tasks hanging. - /// - /// ## Platform-specific events - /// - /// `Registration` also allows receiving platform-specific `mio::Ready` - /// events. These events are included as part of the read readiness event - /// stream. The write readiness event stream is only for `Ready::writable()` - /// events. - /// - /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle - /// [`poll_read_ready`]: method@Self::poll_read_ready` - /// [`poll_write_ready`]: method@Self::poll_write_ready` - #[derive(Debug)] - pub(crate) struct Registration { - /// Handle to the associated driver. - handle: Handle, - - /// Reference to state stored by the driver. - shared: slab::Ref<ScheduledIo>, - } -} - -unsafe impl Send for Registration {} -unsafe impl Sync for Registration {} - -// ===== impl Registration ===== - -impl Registration { - /// Registers the I/O resource with the default reactor, for a specific - /// `Interest`. `new_with_interest` should be used over `new` when you need - /// control over the readiness state, such as when a file descriptor only - /// allows reads. This does not add `hup` or `error` so if you are - /// interested in those states, you will need to add them to the readiness - /// state passed to this function. - /// - /// # Return - /// - /// - `Ok` if the registration happened successfully - /// - `Err` if an error was encountered during registration - pub(crate) fn new_with_interest_and_handle( - io: &mut impl Source, - interest: Interest, - handle: Handle, - ) -> io::Result<Registration> { - let shared = if let Some(inner) = handle.inner() { - inner.add_source(io, interest)? - } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to find event loop", - )); - }; - - Ok(Registration { handle, shared }) - } - - /// Deregisters the I/O resource from the reactor it is associated with. - /// - /// This function must be called before the I/O resource associated with the - /// registration is dropped. - /// - /// Note that deregistering does not guarantee that the I/O resource can be - /// registered with a different reactor. Some I/O resource types can only be - /// associated with a single reactor instance for their lifetime. - /// - /// # Return - /// - /// If the deregistration was successful, `Ok` is returned. Any calls to - /// `Reactor::turn` that happen after a successful call to `deregister` will - /// no longer result in notifications getting sent for this registration. - /// - /// `Err` is returned if an error is encountered. - pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - inner.deregister_source(io) - } - - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - self.shared.clear_readiness(event); - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { - self.poll_ready(cx, Direction::Read) - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { - self.poll_ready(cx, Direction::Write) - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_read_io<R>( - &self, - cx: &mut Context<'_>, - f: impl FnMut() -> io::Result<R>, - ) -> Poll<io::Result<R>> { - self.poll_io(cx, Direction::Read, f) - } - - // Uses the poll path, requiring the caller to ensure mutual exclusion for - // correctness. Only the last task to call this function is notified. - pub(crate) fn poll_write_io<R>( - &self, - cx: &mut Context<'_>, - f: impl FnMut() -> io::Result<R>, - ) -> Poll<io::Result<R>> { - self.poll_io(cx, Direction::Write, f) - } - - /// Polls for events on the I/O resource's `direction` readiness stream. - /// - /// If called with a task context, notify the task when a new event is - /// received. - fn poll_ready( - &self, - cx: &mut Context<'_>, - direction: Direction, - ) -> Poll<io::Result<ReadyEvent>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - let ev = ready!(self.shared.poll_readiness(cx, direction)); - - if self.handle.inner().is_none() { - return Poll::Ready(Err(gone())); - } - - coop.made_progress(); - Poll::Ready(Ok(ev)) - } - - fn poll_io<R>( - &self, - cx: &mut Context<'_>, - direction: Direction, - mut f: impl FnMut() -> io::Result<R>, - ) -> Poll<io::Result<R>> { - loop { - let ev = ready!(self.poll_ready(cx, direction))?; - - match f() { - Ok(ret) => { - return Poll::Ready(Ok(ret)); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - } - } - } - - pub(crate) fn try_io<R>( - &self, - interest: Interest, - f: impl FnOnce() -> io::Result<R>, - ) -> io::Result<R> { - let ev = self.shared.ready_event(interest); - - // Don't attempt the operation if the resource is not ready. - if ev.ready.is_empty() { - return Err(io::ErrorKind::WouldBlock.into()); - } - - match f() { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(ev); - Err(io::ErrorKind::WouldBlock.into()) - } - res => res, - } - } -} - -impl Drop for Registration { - fn drop(&mut self) { - // It is possible for a cycle to be created between wakers stored in - // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this - // cycle, wakers are cleared. This is an imperfect solution as it is - // possible to store a `Registration` in a waker. In this case, the - // cycle would remain. - // - // See tokio-rs/tokio#3481 for more details. - self.shared.clear_wakers(); - } -} - -fn gone() -> io::Error { - io::Error::new(io::ErrorKind::Other, "IO driver has terminated") -} - -cfg_io_readiness! { - impl Registration { - pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { - use std::future::Future; - use std::pin::Pin; - - let fut = self.shared.readiness(interest); - pin!(fut); - - crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ))); - } - - Pin::new(&mut fut).poll(cx).map(Ok) - }).await - } - - pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> { - loop { - let event = self.readiness(interest).await?; - - match f() { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(event); - } - x => return x, - } - } - } - } -} diff --git a/vendor/tokio/src/io/driver/scheduled_io.rs b/vendor/tokio/src/io/driver/scheduled_io.rs deleted file mode 100644 index 517801079..000000000 --- a/vendor/tokio/src/io/driver/scheduled_io.rs +++ /dev/null @@ -1,544 +0,0 @@ -use super::{Interest, Ready, ReadyEvent, Tick}; -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Mutex; -use crate::util::bit; -use crate::util::slab::Entry; - -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; -use std::task::{Context, Poll, Waker}; - -use super::Direction; - -cfg_io_readiness! { - use crate::util::linked_list::{self, LinkedList}; - - use std::cell::UnsafeCell; - use std::future::Future; - use std::marker::PhantomPinned; - use std::pin::Pin; - use std::ptr::NonNull; -} - -/// Stored in the I/O driver resource slab. -#[derive(Debug)] -pub(crate) struct ScheduledIo { - /// Packs the resource's readiness with the resource's generation. - readiness: AtomicUsize, - - waiters: Mutex<Waiters>, -} - -cfg_io_readiness! { - type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; -} - -#[derive(Debug, Default)] -struct Waiters { - #[cfg(feature = "net")] - /// List of all current waiters - list: WaitList, - - /// Waker used for AsyncRead - reader: Option<Waker>, - - /// Waker used for AsyncWrite - writer: Option<Waker>, - - /// True if this ScheduledIo has been killed due to IO driver shutdown - is_shutdown: bool, -} - -cfg_io_readiness! { - #[derive(Debug)] - struct Waiter { - pointers: linked_list::Pointers<Waiter>, - - /// The waker for this task - waker: Option<Waker>, - - /// The interest this waiter is waiting on - interest: Interest, - - is_ready: bool, - - /// Should never be `!Unpin` - _p: PhantomPinned, - } - - /// Future returned by `readiness()` - struct Readiness<'a> { - scheduled_io: &'a ScheduledIo, - - state: State, - - /// Entry in the waiter `LinkedList`. - waiter: UnsafeCell<Waiter>, - } - - enum State { - Init, - Waiting, - Done, - } -} - -// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. -// -// | reserved | generation | driver tick | readiness | -// |----------+------------+--------------+-----------| -// | 1 bit | 7 bits + 8 bits + 16 bits | - -const READINESS: bit::Pack = bit::Pack::least_significant(16); - -const TICK: bit::Pack = READINESS.then(8); - -const GENERATION: bit::Pack = TICK.then(7); - -#[test] -fn test_generations_assert_same() { - assert_eq!(super::GENERATION, GENERATION); -} - -// ===== impl ScheduledIo ===== - -impl Entry for ScheduledIo { - fn reset(&self) { - let state = self.readiness.load(Acquire); - - let generation = GENERATION.unpack(state); - let next = GENERATION.pack_lossy(generation + 1, 0); - - self.readiness.store(next, Release); - } -} - -impl Default for ScheduledIo { - fn default() -> ScheduledIo { - ScheduledIo { - readiness: AtomicUsize::new(0), - waiters: Mutex::new(Default::default()), - } - } -} - -impl ScheduledIo { - pub(crate) fn generation(&self) -> usize { - GENERATION.unpack(self.readiness.load(Acquire)) - } - - /// Invoked when the IO driver is shut down; forces this ScheduledIo into a - /// permanently ready state. - pub(super) fn shutdown(&self) { - self.wake0(Ready::ALL, true) - } - - /// Sets the readiness on this `ScheduledIo` by invoking the given closure on - /// the current value, returning the previous readiness value. - /// - /// # Arguments - /// - `token`: the token for this `ScheduledIo`. - /// - `tick`: whether setting the tick or trying to clear readiness for a - /// specific tick. - /// - `f`: a closure returning a new readiness value given the previous - /// readiness. - /// - /// # Returns - /// - /// If the given token's generation no longer matches the `ScheduledIo`'s - /// generation, then the corresponding IO resource has been removed and - /// replaced with a new resource. In that case, this method returns `Err`. - /// Otherwise, this returns the previous readiness. - pub(super) fn set_readiness( - &self, - token: Option<usize>, - tick: Tick, - f: impl Fn(Ready) -> Ready, - ) -> Result<(), ()> { - let mut current = self.readiness.load(Acquire); - - loop { - let current_generation = GENERATION.unpack(current); - - if let Some(token) = token { - // Check that the generation for this access is still the - // current one. - if GENERATION.unpack(token) != current_generation { - return Err(()); - } - } - - // Mask out the tick/generation bits so that the modifying - // function doesn't see them. - let current_readiness = Ready::from_usize(current); - let new = f(current_readiness); - - let packed = match tick { - Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), - Tick::Clear(t) => { - if TICK.unpack(current) as u8 != t { - // Trying to clear readiness with an old event! - return Err(()); - } - - TICK.pack(t as usize, new.as_usize()) - } - }; - - let next = GENERATION.pack(current_generation, packed); - - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => return Ok(()), - // we lost the race, retry! - Err(actual) => current = actual, - } - } - } - - /// Notifies all pending waiters that have registered interest in `ready`. - /// - /// There may be many waiters to notify. Waking the pending task **must** be - /// done from outside of the lock otherwise there is a potential for a - /// deadlock. - /// - /// A stack array of wakers is created and filled with wakers to notify, the - /// lock is released, and the wakers are notified. Because there may be more - /// than 32 wakers to notify, if the stack array fills up, the lock is - /// released, the array is cleared, and the iteration continues. - pub(super) fn wake(&self, ready: Ready) { - self.wake0(ready, false); - } - - fn wake0(&self, ready: Ready, shutdown: bool) { - const NUM_WAKERS: usize = 32; - - let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); - let mut curr = 0; - - let mut waiters = self.waiters.lock(); - - waiters.is_shutdown |= shutdown; - - // check for AsyncRead slot - if ready.is_readable() { - if let Some(waker) = waiters.reader.take() { - wakers[curr] = Some(waker); - curr += 1; - } - } - - // check for AsyncWrite slot - if ready.is_writable() { - if let Some(waker) = waiters.writer.take() { - wakers[curr] = Some(waker); - curr += 1; - } - } - - #[cfg(feature = "net")] - 'outer: loop { - let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); - - while curr < NUM_WAKERS { - match iter.next() { - Some(waiter) => { - let waiter = unsafe { &mut *waiter.as_ptr() }; - - if let Some(waker) = waiter.waker.take() { - waiter.is_ready = true; - wakers[curr] = Some(waker); - curr += 1; - } - } - None => { - break 'outer; - } - } - } - - drop(waiters); - - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } - - curr = 0; - - // Acquire the lock again. - waiters = self.waiters.lock(); - } - - // Release the lock before notifying - drop(waiters); - - for waker in wakers.iter_mut().take(curr) { - waker.take().unwrap().wake(); - } - } - - pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { - let curr = self.readiness.load(Acquire); - - ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), - } - } - - /// Poll version of checking readiness for a certain direction. - /// - /// These are to support `AsyncRead` and `AsyncWrite` polling methods, - /// which cannot use the `async fn` version. This uses reserved reader - /// and writer slots. - pub(super) fn poll_readiness( - &self, - cx: &mut Context<'_>, - direction: Direction, - ) -> Poll<ReadyEvent> { - let curr = self.readiness.load(Acquire); - - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - - if ready.is_empty() { - // Update the task info - let mut waiters = self.waiters.lock(); - let slot = match direction { - Direction::Read => &mut waiters.reader, - Direction::Write => &mut waiters.writer, - }; - - // Avoid cloning the waker if one is already stored that matches the - // current task. - match slot { - Some(existing) => { - if !existing.will_wake(cx.waker()) { - *existing = cx.waker().clone(); - } - } - None => { - *slot = Some(cx.waker().clone()); - } - } - - // Try again, in case the readiness was changed while we were - // taking the waiters lock - let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if waiters.is_shutdown { - Poll::Ready(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready: direction.mask(), - }) - } else if ready.is_empty() { - Poll::Pending - } else { - Poll::Ready(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready, - }) - } - } else { - Poll::Ready(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready, - }) - } - } - - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - // This consumes the current readiness state **except** for closed - // states. Closed states are excluded because they are final states. - let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; - - // result isn't important - let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); - } - - pub(crate) fn clear_wakers(&self) { - let mut waiters = self.waiters.lock(); - waiters.reader.take(); - waiters.writer.take(); - } -} - -impl Drop for ScheduledIo { - fn drop(&mut self) { - self.wake(Ready::ALL); - } -} - -unsafe impl Send for ScheduledIo {} -unsafe impl Sync for ScheduledIo {} - -cfg_io_readiness! { - impl ScheduledIo { - /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { - self.readiness_fut(interest).await - } - - // This is in a separate function so that the borrow checker doesn't think - // we are borrowing the `UnsafeCell` possibly over await boundaries. - // - // Go figure. - fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { - Readiness { - scheduled_io: self, - state: State::Init, - waiter: UnsafeCell::new(Waiter { - pointers: linked_list::Pointers::new(), - waker: None, - is_ready: false, - interest, - _p: PhantomPinned, - }), - } - } - } - - unsafe impl linked_list::Link for Waiter { - type Handle = NonNull<Waiter>; - type Target = Waiter; - - fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { - *handle - } - - unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { - ptr - } - - unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { - NonNull::from(&mut target.as_mut().pointers) - } - } - - // ===== impl Readiness ===== - - impl Future for Readiness<'_> { - type Output = ReadyEvent; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - use std::sync::atomic::Ordering::SeqCst; - - let (scheduled_io, state, waiter) = unsafe { - let me = self.get_unchecked_mut(); - (&me.scheduled_io, &mut me.state, &me.waiter) - }; - - loop { - match *state { - State::Init => { - // Optimistically check existing readiness - let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); - - // Safety: `waiter.interest` never changes - let interest = unsafe { (*waiter.get()).interest }; - let ready = ready.intersection(interest); - - if !ready.is_empty() { - // Currently ready! - let tick = TICK.unpack(curr) as u8; - *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); - } - - // Wasn't ready, take the lock (and check again while locked). - let mut waiters = scheduled_io.waiters.lock(); - - let curr = scheduled_io.readiness.load(SeqCst); - let mut ready = Ready::from_usize(READINESS.unpack(curr)); - - if waiters.is_shutdown { - ready = Ready::ALL; - } - - let ready = ready.intersection(interest); - - if !ready.is_empty() { - // Currently ready! - let tick = TICK.unpack(curr) as u8; - *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); - } - - // Not ready even after locked, insert into list... - - // Safety: called while locked - unsafe { - (*waiter.get()).waker = Some(cx.waker().clone()); - } - - // Insert the waiter into the linked list - // - // safety: pointers from `UnsafeCell` are never null. - waiters - .list - .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); - *state = State::Waiting; - } - State::Waiting => { - // Currently in the "Waiting" state, implying the caller has - // a waiter stored in the waiter list (guarded by - // `notify.waiters`). In order to access the waker fields, - // we must hold the lock. - - let waiters = scheduled_io.waiters.lock(); - - // Safety: called while locked - let w = unsafe { &mut *waiter.get() }; - - if w.is_ready { - // Our waker has been notified. - *state = State::Done; - } else { - // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); - } - - return Poll::Pending; - } - - // Explicit drop of the lock to indicate the scope that the - // lock is held. Because holding the lock is required to - // ensure safe access to fields not held within the lock, it - // is helpful to visualize the scope of the critical - // section. - drop(waiters); - } - State::Done => { - let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; - - // Safety: State::Done means it is no longer shared - let w = unsafe { &mut *waiter.get() }; - - return Poll::Ready(ReadyEvent { - tick, - ready: Ready::from_interest(w.interest), - }); - } - } - } - } - } - - impl Drop for Readiness<'_> { - fn drop(&mut self) { - let mut waiters = self.scheduled_io.waiters.lock(); - - // Safety: `waiter` is only ever stored in `waiters` - unsafe { - waiters - .list - .remove(NonNull::new_unchecked(self.waiter.get())) - }; - } - } - - unsafe impl Send for Readiness<'_> {} - unsafe impl Sync for Readiness<'_> {} -} |