summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/io/driver
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/io/driver
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/io/driver')
-rw-r--r--vendor/tokio/src/io/driver/interest.rs112
-rw-r--r--vendor/tokio/src/io/driver/mod.rs353
-rw-r--r--vendor/tokio/src/io/driver/platform.rs44
-rw-r--r--vendor/tokio/src/io/driver/ready.rs239
-rw-r--r--vendor/tokio/src/io/driver/registration.rs262
-rw-r--r--vendor/tokio/src/io/driver/scheduled_io.rs544
6 files changed, 0 insertions, 1554 deletions
diff --git a/vendor/tokio/src/io/driver/interest.rs b/vendor/tokio/src/io/driver/interest.rs
deleted file mode 100644
index 36951cf5a..000000000
--- a/vendor/tokio/src/io/driver/interest.rs
+++ /dev/null
@@ -1,112 +0,0 @@
-#![cfg_attr(not(feature = "net"), allow(dead_code, unreachable_pub))]
-
-use crate::io::driver::Ready;
-
-use std::fmt;
-use std::ops;
-
-/// Readiness event interest
-///
-/// Specifies the readiness events the caller is interested in when awaiting on
-/// I/O resource readiness states.
-#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
-#[derive(Clone, Copy, Eq, PartialEq)]
-pub struct Interest(mio::Interest);
-
-impl Interest {
- /// Interest in all readable events.
- ///
- /// Readable interest includes read-closed events.
- pub const READABLE: Interest = Interest(mio::Interest::READABLE);
-
- /// Interest in all writable events
- ///
- /// Writable interest includes write-closed events.
- pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
-
- /// Returns true if the value includes readable interest.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Interest;
- ///
- /// assert!(Interest::READABLE.is_readable());
- /// assert!(!Interest::WRITABLE.is_readable());
- ///
- /// let both = Interest::READABLE | Interest::WRITABLE;
- /// assert!(both.is_readable());
- /// ```
- pub const fn is_readable(self) -> bool {
- self.0.is_readable()
- }
-
- /// Returns true if the value includes writable interest.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Interest;
- ///
- /// assert!(!Interest::READABLE.is_writable());
- /// assert!(Interest::WRITABLE.is_writable());
- ///
- /// let both = Interest::READABLE | Interest::WRITABLE;
- /// assert!(both.is_writable());
- /// ```
- pub const fn is_writable(self) -> bool {
- self.0.is_writable()
- }
-
- /// Add together two `Interest` values.
- ///
- /// This function works from a `const` context.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Interest;
- ///
- /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
- ///
- /// assert!(BOTH.is_readable());
- /// assert!(BOTH.is_writable());
- pub const fn add(self, other: Interest) -> Interest {
- Interest(self.0.add(other.0))
- }
-
- // This function must be crate-private to avoid exposing a `mio` dependency.
- pub(crate) const fn to_mio(self) -> mio::Interest {
- self.0
- }
-
- pub(super) fn mask(self) -> Ready {
- match self {
- Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
- Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
- _ => Ready::EMPTY,
- }
- }
-}
-
-impl ops::BitOr for Interest {
- type Output = Self;
-
- #[inline]
- fn bitor(self, other: Self) -> Self {
- self.add(other)
- }
-}
-
-impl ops::BitOrAssign for Interest {
- #[inline]
- fn bitor_assign(&mut self, other: Self) {
- self.0 = (*self | other).0;
- }
-}
-
-impl fmt::Debug for Interest {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(fmt)
- }
-}
diff --git a/vendor/tokio/src/io/driver/mod.rs b/vendor/tokio/src/io/driver/mod.rs
deleted file mode 100644
index 3aa0cfbb2..000000000
--- a/vendor/tokio/src/io/driver/mod.rs
+++ /dev/null
@@ -1,353 +0,0 @@
-#![cfg_attr(not(feature = "rt"), allow(dead_code))]
-
-mod interest;
-#[allow(unreachable_pub)]
-pub use interest::Interest;
-
-mod ready;
-#[allow(unreachable_pub)]
-pub use ready::Ready;
-
-mod registration;
-pub(crate) use registration::Registration;
-
-mod scheduled_io;
-use scheduled_io::ScheduledIo;
-
-use crate::park::{Park, Unpark};
-use crate::util::slab::{self, Slab};
-use crate::{loom::sync::Mutex, util::bit};
-
-use std::fmt;
-use std::io;
-use std::sync::{Arc, Weak};
-use std::time::Duration;
-
-/// I/O driver, backed by Mio
-pub(crate) struct Driver {
- /// Tracks the number of times `turn` is called. It is safe for this to wrap
- /// as it is mostly used to determine when to call `compact()`
- tick: u8,
-
- /// Reuse the `mio::Events` value across calls to poll.
- events: Option<mio::Events>,
-
- /// Primary slab handle containing the state for each resource registered
- /// with this driver. During Drop this is moved into the Inner structure, so
- /// this is an Option to allow it to be vacated (until Drop this is always
- /// Some)
- resources: Option<Slab<ScheduledIo>>,
-
- /// The system event queue
- poll: mio::Poll,
-
- /// State shared between the reactor and the handles.
- inner: Arc<Inner>,
-}
-
-/// A reference to an I/O driver
-#[derive(Clone)]
-pub(crate) struct Handle {
- inner: Weak<Inner>,
-}
-
-pub(crate) struct ReadyEvent {
- tick: u8,
- pub(crate) ready: Ready,
-}
-
-pub(super) struct Inner {
- /// Primary slab handle containing the state for each resource registered
- /// with this driver.
- ///
- /// The ownership of this slab is moved into this structure during
- /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
- /// without risking new ones being registered in the meantime.
- resources: Mutex<Option<Slab<ScheduledIo>>>,
-
- /// Registers I/O resources
- registry: mio::Registry,
-
- /// Allocates `ScheduledIo` handles when creating new resources.
- pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
-
- /// Used to wake up the reactor from a call to `turn`
- waker: mio::Waker,
-}
-
-#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-enum Direction {
- Read,
- Write,
-}
-
-enum Tick {
- Set(u8),
- Clear(u8),
-}
-
-// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
-// token.
-const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
-
-const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
-
-// Packs the generation value in the `readiness` field.
-//
-// The generation prevents a race condition where a slab slot is reused for a
-// new socket while the I/O driver is about to apply a readiness event. The
-// generation value is checked when setting new readiness. If the generation do
-// not match, then the readiness event is discarded.
-const GENERATION: bit::Pack = ADDRESS.then(7);
-
-fn _assert_kinds() {
- fn _assert<T: Send + Sync>() {}
-
- _assert::<Handle>();
-}
-
-// ===== impl Driver =====
-
-impl Driver {
- /// Creates a new event loop, returning any error that happened during the
- /// creation.
- pub(crate) fn new() -> io::Result<Driver> {
- let poll = mio::Poll::new()?;
- let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
- let registry = poll.registry().try_clone()?;
-
- let slab = Slab::new();
- let allocator = slab.allocator();
-
- Ok(Driver {
- tick: 0,
- events: Some(mio::Events::with_capacity(1024)),
- poll,
- resources: Some(slab),
- inner: Arc::new(Inner {
- resources: Mutex::new(None),
- registry,
- io_dispatch: allocator,
- waker,
- }),
- })
- }
-
- /// Returns a handle to this event loop which can be sent across threads
- /// and can be used as a proxy to the event loop itself.
- ///
- /// Handles are cloneable and clones always refer to the same event loop.
- /// This handle is typically passed into functions that create I/O objects
- /// to bind them to this event loop.
- pub(crate) fn handle(&self) -> Handle {
- Handle {
- inner: Arc::downgrade(&self.inner),
- }
- }
-
- fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
- // How often to call `compact()` on the resource slab
- const COMPACT_INTERVAL: u8 = 255;
-
- self.tick = self.tick.wrapping_add(1);
-
- if self.tick == COMPACT_INTERVAL {
- self.resources.as_mut().unwrap().compact()
- }
-
- let mut events = self.events.take().expect("i/o driver event store missing");
-
- // Block waiting for an event to happen, peeling out how many events
- // happened.
- match self.poll.poll(&mut events, max_wait) {
- Ok(_) => {}
- Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
- Err(e) => return Err(e),
- }
-
- // Process all the events that came in, dispatching appropriately
- for event in events.iter() {
- let token = event.token();
-
- if token != TOKEN_WAKEUP {
- self.dispatch(token, Ready::from_mio(event));
- }
- }
-
- self.events = Some(events);
-
- Ok(())
- }
-
- fn dispatch(&mut self, token: mio::Token, ready: Ready) {
- let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
-
- let resources = self.resources.as_mut().unwrap();
-
- let io = match resources.get(addr) {
- Some(io) => io,
- None => return,
- };
-
- let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);
-
- if res.is_err() {
- // token no longer valid!
- return;
- }
-
- io.wake(ready);
- }
-}
-
-impl Drop for Driver {
- fn drop(&mut self) {
- (*self.inner.resources.lock()) = self.resources.take();
- }
-}
-
-impl Drop for Inner {
- fn drop(&mut self) {
- let resources = self.resources.lock().take();
-
- if let Some(mut slab) = resources {
- slab.for_each(|io| {
- // If a task is waiting on the I/O resource, notify it. The task
- // will then attempt to use the I/O resource and fail due to the
- // driver being shutdown.
- io.shutdown();
- });
- }
- }
-}
-
-impl Park for Driver {
- type Unpark = Handle;
- type Error = io::Error;
-
- fn unpark(&self) -> Self::Unpark {
- self.handle()
- }
-
- fn park(&mut self) -> io::Result<()> {
- self.turn(None)?;
- Ok(())
- }
-
- fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
- self.turn(Some(duration))?;
- Ok(())
- }
-
- fn shutdown(&mut self) {}
-}
-
-impl fmt::Debug for Driver {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Driver")
- }
-}
-
-// ===== impl Handle =====
-
-cfg_rt! {
- impl Handle {
- /// Returns a handle to the current reactor
- ///
- /// # Panics
- ///
- /// This function panics if there is no current reactor set and `rt` feature
- /// flag is not enabled.
- pub(super) fn current() -> Self {
- crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
- }
- }
-}
-
-cfg_not_rt! {
- impl Handle {
- /// Returns a handle to the current reactor
- ///
- /// # Panics
- ///
- /// This function panics if there is no current reactor set, or if the `rt`
- /// feature flag is not enabled.
- pub(super) fn current() -> Self {
- panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
- }
- }
-}
-
-impl Handle {
- /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
- /// makes the next call to `turn` return immediately.
- ///
- /// This method is intended to be used in situations where a notification
- /// needs to otherwise be sent to the main reactor. If the reactor is
- /// currently blocked inside of `turn` then it will wake up and soon return
- /// after this method has been called. If the reactor is not currently
- /// blocked in `turn`, then the next call to `turn` will not block and
- /// return immediately.
- fn wakeup(&self) {
- if let Some(inner) = self.inner() {
- inner.waker.wake().expect("failed to wake I/O driver");
- }
- }
-
- pub(super) fn inner(&self) -> Option<Arc<Inner>> {
- self.inner.upgrade()
- }
-}
-
-impl Unpark for Handle {
- fn unpark(&self) {
- self.wakeup();
- }
-}
-
-impl fmt::Debug for Handle {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Handle")
- }
-}
-
-// ===== impl Inner =====
-
-impl Inner {
- /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
- ///
- /// The registration token is returned.
- pub(super) fn add_source(
- &self,
- source: &mut impl mio::event::Source,
- interest: Interest,
- ) -> io::Result<slab::Ref<ScheduledIo>> {
- let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
- io::Error::new(
- io::ErrorKind::Other,
- "reactor at max registered I/O resources",
- )
- })?;
-
- let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
-
- self.registry
- .register(source, mio::Token(token), interest.to_mio())?;
-
- Ok(shared)
- }
-
- /// Deregisters an I/O resource from the reactor.
- pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
- self.registry.deregister(source)
- }
-}
-
-impl Direction {
- pub(super) fn mask(self) -> Ready {
- match self {
- Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
- Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
- }
- }
-}
diff --git a/vendor/tokio/src/io/driver/platform.rs b/vendor/tokio/src/io/driver/platform.rs
deleted file mode 100644
index 6b27988ce..000000000
--- a/vendor/tokio/src/io/driver/platform.rs
+++ /dev/null
@@ -1,44 +0,0 @@
-pub(crate) use self::sys::*;
-
-#[cfg(unix)]
-mod sys {
- use mio::unix::UnixReady;
- use mio::Ready;
-
- pub(crate) fn hup() -> Ready {
- UnixReady::hup().into()
- }
-
- pub(crate) fn is_hup(ready: Ready) -> bool {
- UnixReady::from(ready).is_hup()
- }
-
- pub(crate) fn error() -> Ready {
- UnixReady::error().into()
- }
-
- pub(crate) fn is_error(ready: Ready) -> bool {
- UnixReady::from(ready).is_error()
- }
-}
-
-#[cfg(windows)]
-mod sys {
- use mio::Ready;
-
- pub(crate) fn hup() -> Ready {
- Ready::empty()
- }
-
- pub(crate) fn is_hup(_: Ready) -> bool {
- false
- }
-
- pub(crate) fn error() -> Ready {
- Ready::empty()
- }
-
- pub(crate) fn is_error(_: Ready) -> bool {
- false
- }
-}
diff --git a/vendor/tokio/src/io/driver/ready.rs b/vendor/tokio/src/io/driver/ready.rs
deleted file mode 100644
index 2ac01bdbe..000000000
--- a/vendor/tokio/src/io/driver/ready.rs
+++ /dev/null
@@ -1,239 +0,0 @@
-#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
-
-use std::fmt;
-use std::ops;
-
-const READABLE: usize = 0b0_01;
-const WRITABLE: usize = 0b0_10;
-const READ_CLOSED: usize = 0b0_0100;
-const WRITE_CLOSED: usize = 0b0_1000;
-
-/// Describes the readiness state of an I/O resources.
-///
-/// `Ready` tracks which operation an I/O resource is ready to perform.
-#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
-#[derive(Clone, Copy, PartialEq, PartialOrd)]
-pub struct Ready(usize);
-
-impl Ready {
- /// Returns the empty `Ready` set.
- pub const EMPTY: Ready = Ready(0);
-
- /// Returns a `Ready` representing readable readiness.
- pub const READABLE: Ready = Ready(READABLE);
-
- /// Returns a `Ready` representing writable readiness.
- pub const WRITABLE: Ready = Ready(WRITABLE);
-
- /// Returns a `Ready` representing read closed readiness.
- pub const READ_CLOSED: Ready = Ready(READ_CLOSED);
-
- /// Returns a `Ready` representing write closed readiness.
- pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
-
- /// Returns a `Ready` representing readiness for all operations.
- pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
-
- // Must remain crate-private to avoid adding a public dependency on Mio.
- pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
- let mut ready = Ready::EMPTY;
-
- if event.is_readable() {
- ready |= Ready::READABLE;
- }
-
- if event.is_writable() {
- ready |= Ready::WRITABLE;
- }
-
- if event.is_read_closed() {
- ready |= Ready::READ_CLOSED;
- }
-
- if event.is_write_closed() {
- ready |= Ready::WRITE_CLOSED;
- }
-
- ready
- }
-
- /// Returns true if `Ready` is the empty set
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Ready;
- ///
- /// assert!(Ready::EMPTY.is_empty());
- /// assert!(!Ready::READABLE.is_empty());
- /// ```
- pub fn is_empty(self) -> bool {
- self == Ready::EMPTY
- }
-
- /// Returns `true` if the value includes `readable`
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Ready;
- ///
- /// assert!(!Ready::EMPTY.is_readable());
- /// assert!(Ready::READABLE.is_readable());
- /// assert!(Ready::READ_CLOSED.is_readable());
- /// assert!(!Ready::WRITABLE.is_readable());
- /// ```
- pub fn is_readable(self) -> bool {
- self.contains(Ready::READABLE) || self.is_read_closed()
- }
-
- /// Returns `true` if the value includes writable `readiness`
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Ready;
- ///
- /// assert!(!Ready::EMPTY.is_writable());
- /// assert!(!Ready::READABLE.is_writable());
- /// assert!(Ready::WRITABLE.is_writable());
- /// assert!(Ready::WRITE_CLOSED.is_writable());
- /// ```
- pub fn is_writable(self) -> bool {
- self.contains(Ready::WRITABLE) || self.is_write_closed()
- }
-
- /// Returns `true` if the value includes read-closed `readiness`
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Ready;
- ///
- /// assert!(!Ready::EMPTY.is_read_closed());
- /// assert!(!Ready::READABLE.is_read_closed());
- /// assert!(Ready::READ_CLOSED.is_read_closed());
- /// ```
- pub fn is_read_closed(self) -> bool {
- self.contains(Ready::READ_CLOSED)
- }
-
- /// Returns `true` if the value includes write-closed `readiness`
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::io::Ready;
- ///
- /// assert!(!Ready::EMPTY.is_write_closed());
- /// assert!(!Ready::WRITABLE.is_write_closed());
- /// assert!(Ready::WRITE_CLOSED.is_write_closed());
- /// ```
- pub fn is_write_closed(self) -> bool {
- self.contains(Ready::WRITE_CLOSED)
- }
-
- /// Returns true if `self` is a superset of `other`.
- ///
- /// `other` may represent more than one readiness operations, in which case
- /// the function only returns true if `self` contains all readiness
- /// specified in `other`.
- pub(crate) fn contains<T: Into<Self>>(self, other: T) -> bool {
- let other = other.into();
- (self & other) == other
- }
-
- /// Create a `Ready` instance using the given `usize` representation.
- ///
- /// The `usize` representation must have been obtained from a call to
- /// `Readiness::as_usize`.
- ///
- /// This function is mainly provided to allow the caller to get a
- /// readiness value from an `AtomicUsize`.
- pub(crate) fn from_usize(val: usize) -> Ready {
- Ready(val & Ready::ALL.as_usize())
- }
-
- /// Returns a `usize` representation of the `Ready` value.
- ///
- /// This function is mainly provided to allow the caller to store a
- /// readiness value in an `AtomicUsize`.
- pub(crate) fn as_usize(self) -> usize {
- self.0
- }
-}
-
-cfg_io_readiness! {
- use crate::io::Interest;
-
- impl Ready {
- pub(crate) fn from_interest(interest: Interest) -> Ready {
- let mut ready = Ready::EMPTY;
-
- if interest.is_readable() {
- ready |= Ready::READABLE;
- ready |= Ready::READ_CLOSED;
- }
-
- if interest.is_writable() {
- ready |= Ready::WRITABLE;
- ready |= Ready::WRITE_CLOSED;
- }
-
- ready
- }
-
- pub(crate) fn intersection(self, interest: Interest) -> Ready {
- Ready(self.0 & Ready::from_interest(interest).0)
- }
-
- pub(crate) fn satisfies(self, interest: Interest) -> bool {
- self.0 & Ready::from_interest(interest).0 != 0
- }
- }
-}
-
-impl ops::BitOr<Ready> for Ready {
- type Output = Ready;
-
- #[inline]
- fn bitor(self, other: Ready) -> Ready {
- Ready(self.0 | other.0)
- }
-}
-
-impl ops::BitOrAssign<Ready> for Ready {
- #[inline]
- fn bitor_assign(&mut self, other: Ready) {
- self.0 |= other.0;
- }
-}
-
-impl ops::BitAnd<Ready> for Ready {
- type Output = Ready;
-
- #[inline]
- fn bitand(self, other: Ready) -> Ready {
- Ready(self.0 & other.0)
- }
-}
-
-impl ops::Sub<Ready> for Ready {
- type Output = Ready;
-
- #[inline]
- fn sub(self, other: Ready) -> Ready {
- Ready(self.0 & !other.0)
- }
-}
-
-impl fmt::Debug for Ready {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("Ready")
- .field("is_readable", &self.is_readable())
- .field("is_writable", &self.is_writable())
- .field("is_read_closed", &self.is_read_closed())
- .field("is_write_closed", &self.is_write_closed())
- .finish()
- }
-}
diff --git a/vendor/tokio/src/io/driver/registration.rs b/vendor/tokio/src/io/driver/registration.rs
deleted file mode 100644
index 7350be634..000000000
--- a/vendor/tokio/src/io/driver/registration.rs
+++ /dev/null
@@ -1,262 +0,0 @@
-#![cfg_attr(not(feature = "net"), allow(dead_code))]
-
-use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
-use crate::util::slab;
-
-use mio::event::Source;
-use std::io;
-use std::task::{Context, Poll};
-
-cfg_io_driver! {
- /// Associates an I/O resource with the reactor instance that drives it.
- ///
- /// A registration represents an I/O resource registered with a Reactor such
- /// that it will receive task notifications on readiness. This is the lowest
- /// level API for integrating with a reactor.
- ///
- /// The association between an I/O resource is made by calling
- /// [`new_with_interest_and_handle`].
- /// Once the association is established, it remains established until the
- /// registration instance is dropped.
- ///
- /// A registration instance represents two separate readiness streams. One
- /// for the read readiness and one for write readiness. These streams are
- /// independent and can be consumed from separate tasks.
- ///
- /// **Note**: while `Registration` is `Sync`, the caller must ensure that
- /// there are at most two tasks that use a registration instance
- /// concurrently. One task for [`poll_read_ready`] and one task for
- /// [`poll_write_ready`]. While violating this requirement is "safe" from a
- /// Rust memory safety point of view, it will result in unexpected behavior
- /// in the form of lost notifications and tasks hanging.
- ///
- /// ## Platform-specific events
- ///
- /// `Registration` also allows receiving platform-specific `mio::Ready`
- /// events. These events are included as part of the read readiness event
- /// stream. The write readiness event stream is only for `Ready::writable()`
- /// events.
- ///
- /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
- /// [`poll_read_ready`]: method@Self::poll_read_ready`
- /// [`poll_write_ready`]: method@Self::poll_write_ready`
- #[derive(Debug)]
- pub(crate) struct Registration {
- /// Handle to the associated driver.
- handle: Handle,
-
- /// Reference to state stored by the driver.
- shared: slab::Ref<ScheduledIo>,
- }
-}
-
-unsafe impl Send for Registration {}
-unsafe impl Sync for Registration {}
-
-// ===== impl Registration =====
-
-impl Registration {
- /// Registers the I/O resource with the default reactor, for a specific
- /// `Interest`. `new_with_interest` should be used over `new` when you need
- /// control over the readiness state, such as when a file descriptor only
- /// allows reads. This does not add `hup` or `error` so if you are
- /// interested in those states, you will need to add them to the readiness
- /// state passed to this function.
- ///
- /// # Return
- ///
- /// - `Ok` if the registration happened successfully
- /// - `Err` if an error was encountered during registration
- pub(crate) fn new_with_interest_and_handle(
- io: &mut impl Source,
- interest: Interest,
- handle: Handle,
- ) -> io::Result<Registration> {
- let shared = if let Some(inner) = handle.inner() {
- inner.add_source(io, interest)?
- } else {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to find event loop",
- ));
- };
-
- Ok(Registration { handle, shared })
- }
-
- /// Deregisters the I/O resource from the reactor it is associated with.
- ///
- /// This function must be called before the I/O resource associated with the
- /// registration is dropped.
- ///
- /// Note that deregistering does not guarantee that the I/O resource can be
- /// registered with a different reactor. Some I/O resource types can only be
- /// associated with a single reactor instance for their lifetime.
- ///
- /// # Return
- ///
- /// If the deregistration was successful, `Ok` is returned. Any calls to
- /// `Reactor::turn` that happen after a successful call to `deregister` will
- /// no longer result in notifications getting sent for this registration.
- ///
- /// `Err` is returned if an error is encountered.
- pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
- };
- inner.deregister_source(io)
- }
-
- pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- self.shared.clear_readiness(event);
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.poll_ready(cx, Direction::Read)
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.poll_ready(cx, Direction::Write)
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_read_io<R>(
- &self,
- cx: &mut Context<'_>,
- f: impl FnMut() -> io::Result<R>,
- ) -> Poll<io::Result<R>> {
- self.poll_io(cx, Direction::Read, f)
- }
-
- // Uses the poll path, requiring the caller to ensure mutual exclusion for
- // correctness. Only the last task to call this function is notified.
- pub(crate) fn poll_write_io<R>(
- &self,
- cx: &mut Context<'_>,
- f: impl FnMut() -> io::Result<R>,
- ) -> Poll<io::Result<R>> {
- self.poll_io(cx, Direction::Write, f)
- }
-
- /// Polls for events on the I/O resource's `direction` readiness stream.
- ///
- /// If called with a task context, notify the task when a new event is
- /// received.
- fn poll_ready(
- &self,
- cx: &mut Context<'_>,
- direction: Direction,
- ) -> Poll<io::Result<ReadyEvent>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
- let ev = ready!(self.shared.poll_readiness(cx, direction));
-
- if self.handle.inner().is_none() {
- return Poll::Ready(Err(gone()));
- }
-
- coop.made_progress();
- Poll::Ready(Ok(ev))
- }
-
- fn poll_io<R>(
- &self,
- cx: &mut Context<'_>,
- direction: Direction,
- mut f: impl FnMut() -> io::Result<R>,
- ) -> Poll<io::Result<R>> {
- loop {
- let ev = ready!(self.poll_ready(cx, direction))?;
-
- match f() {
- Ok(ret) => {
- return Poll::Ready(Ok(ret));
- }
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(ev);
- }
- Err(e) => return Poll::Ready(Err(e)),
- }
- }
- }
-
- pub(crate) fn try_io<R>(
- &self,
- interest: Interest,
- f: impl FnOnce() -> io::Result<R>,
- ) -> io::Result<R> {
- let ev = self.shared.ready_event(interest);
-
- // Don't attempt the operation if the resource is not ready.
- if ev.ready.is_empty() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- match f() {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(ev);
- Err(io::ErrorKind::WouldBlock.into())
- }
- res => res,
- }
- }
-}
-
-impl Drop for Registration {
- fn drop(&mut self) {
- // It is possible for a cycle to be created between wakers stored in
- // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
- // cycle, wakers are cleared. This is an imperfect solution as it is
- // possible to store a `Registration` in a waker. In this case, the
- // cycle would remain.
- //
- // See tokio-rs/tokio#3481 for more details.
- self.shared.clear_wakers();
- }
-}
-
-fn gone() -> io::Error {
- io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
-}
-
-cfg_io_readiness! {
- impl Registration {
- pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
- use std::future::Future;
- use std::pin::Pin;
-
- let fut = self.shared.readiness(interest);
- pin!(fut);
-
- crate::future::poll_fn(|cx| {
- if self.handle.inner().is_none() {
- return Poll::Ready(Err(io::Error::new(
- io::ErrorKind::Other,
- crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
- )));
- }
-
- Pin::new(&mut fut).poll(cx).map(Ok)
- }).await
- }
-
- pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
- loop {
- let event = self.readiness(interest).await?;
-
- match f() {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(event);
- }
- x => return x,
- }
- }
- }
- }
-}
diff --git a/vendor/tokio/src/io/driver/scheduled_io.rs b/vendor/tokio/src/io/driver/scheduled_io.rs
deleted file mode 100644
index 517801079..000000000
--- a/vendor/tokio/src/io/driver/scheduled_io.rs
+++ /dev/null
@@ -1,544 +0,0 @@
-use super::{Interest, Ready, ReadyEvent, Tick};
-use crate::loom::sync::atomic::AtomicUsize;
-use crate::loom::sync::Mutex;
-use crate::util::bit;
-use crate::util::slab::Entry;
-
-use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
-use std::task::{Context, Poll, Waker};
-
-use super::Direction;
-
-cfg_io_readiness! {
- use crate::util::linked_list::{self, LinkedList};
-
- use std::cell::UnsafeCell;
- use std::future::Future;
- use std::marker::PhantomPinned;
- use std::pin::Pin;
- use std::ptr::NonNull;
-}
-
-/// Stored in the I/O driver resource slab.
-#[derive(Debug)]
-pub(crate) struct ScheduledIo {
- /// Packs the resource's readiness with the resource's generation.
- readiness: AtomicUsize,
-
- waiters: Mutex<Waiters>,
-}
-
-cfg_io_readiness! {
- type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
-}
-
-#[derive(Debug, Default)]
-struct Waiters {
- #[cfg(feature = "net")]
- /// List of all current waiters
- list: WaitList,
-
- /// Waker used for AsyncRead
- reader: Option<Waker>,
-
- /// Waker used for AsyncWrite
- writer: Option<Waker>,
-
- /// True if this ScheduledIo has been killed due to IO driver shutdown
- is_shutdown: bool,
-}
-
-cfg_io_readiness! {
- #[derive(Debug)]
- struct Waiter {
- pointers: linked_list::Pointers<Waiter>,
-
- /// The waker for this task
- waker: Option<Waker>,
-
- /// The interest this waiter is waiting on
- interest: Interest,
-
- is_ready: bool,
-
- /// Should never be `!Unpin`
- _p: PhantomPinned,
- }
-
- /// Future returned by `readiness()`
- struct Readiness<'a> {
- scheduled_io: &'a ScheduledIo,
-
- state: State,
-
- /// Entry in the waiter `LinkedList`.
- waiter: UnsafeCell<Waiter>,
- }
-
- enum State {
- Init,
- Waiting,
- Done,
- }
-}
-
-// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
-//
-// | reserved | generation | driver tick | readiness |
-// |----------+------------+--------------+-----------|
-// | 1 bit | 7 bits + 8 bits + 16 bits |
-
-const READINESS: bit::Pack = bit::Pack::least_significant(16);
-
-const TICK: bit::Pack = READINESS.then(8);
-
-const GENERATION: bit::Pack = TICK.then(7);
-
-#[test]
-fn test_generations_assert_same() {
- assert_eq!(super::GENERATION, GENERATION);
-}
-
-// ===== impl ScheduledIo =====
-
-impl Entry for ScheduledIo {
- fn reset(&self) {
- let state = self.readiness.load(Acquire);
-
- let generation = GENERATION.unpack(state);
- let next = GENERATION.pack_lossy(generation + 1, 0);
-
- self.readiness.store(next, Release);
- }
-}
-
-impl Default for ScheduledIo {
- fn default() -> ScheduledIo {
- ScheduledIo {
- readiness: AtomicUsize::new(0),
- waiters: Mutex::new(Default::default()),
- }
- }
-}
-
-impl ScheduledIo {
- pub(crate) fn generation(&self) -> usize {
- GENERATION.unpack(self.readiness.load(Acquire))
- }
-
- /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
- /// permanently ready state.
- pub(super) fn shutdown(&self) {
- self.wake0(Ready::ALL, true)
- }
-
- /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
- /// the current value, returning the previous readiness value.
- ///
- /// # Arguments
- /// - `token`: the token for this `ScheduledIo`.
- /// - `tick`: whether setting the tick or trying to clear readiness for a
- /// specific tick.
- /// - `f`: a closure returning a new readiness value given the previous
- /// readiness.
- ///
- /// # Returns
- ///
- /// If the given token's generation no longer matches the `ScheduledIo`'s
- /// generation, then the corresponding IO resource has been removed and
- /// replaced with a new resource. In that case, this method returns `Err`.
- /// Otherwise, this returns the previous readiness.
- pub(super) fn set_readiness(
- &self,
- token: Option<usize>,
- tick: Tick,
- f: impl Fn(Ready) -> Ready,
- ) -> Result<(), ()> {
- let mut current = self.readiness.load(Acquire);
-
- loop {
- let current_generation = GENERATION.unpack(current);
-
- if let Some(token) = token {
- // Check that the generation for this access is still the
- // current one.
- if GENERATION.unpack(token) != current_generation {
- return Err(());
- }
- }
-
- // Mask out the tick/generation bits so that the modifying
- // function doesn't see them.
- let current_readiness = Ready::from_usize(current);
- let new = f(current_readiness);
-
- let packed = match tick {
- Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
- Tick::Clear(t) => {
- if TICK.unpack(current) as u8 != t {
- // Trying to clear readiness with an old event!
- return Err(());
- }
-
- TICK.pack(t as usize, new.as_usize())
- }
- };
-
- let next = GENERATION.pack(current_generation, packed);
-
- match self
- .readiness
- .compare_exchange(current, next, AcqRel, Acquire)
- {
- Ok(_) => return Ok(()),
- // we lost the race, retry!
- Err(actual) => current = actual,
- }
- }
- }
-
- /// Notifies all pending waiters that have registered interest in `ready`.
- ///
- /// There may be many waiters to notify. Waking the pending task **must** be
- /// done from outside of the lock otherwise there is a potential for a
- /// deadlock.
- ///
- /// A stack array of wakers is created and filled with wakers to notify, the
- /// lock is released, and the wakers are notified. Because there may be more
- /// than 32 wakers to notify, if the stack array fills up, the lock is
- /// released, the array is cleared, and the iteration continues.
- pub(super) fn wake(&self, ready: Ready) {
- self.wake0(ready, false);
- }
-
- fn wake0(&self, ready: Ready, shutdown: bool) {
- const NUM_WAKERS: usize = 32;
-
- let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
- let mut curr = 0;
-
- let mut waiters = self.waiters.lock();
-
- waiters.is_shutdown |= shutdown;
-
- // check for AsyncRead slot
- if ready.is_readable() {
- if let Some(waker) = waiters.reader.take() {
- wakers[curr] = Some(waker);
- curr += 1;
- }
- }
-
- // check for AsyncWrite slot
- if ready.is_writable() {
- if let Some(waker) = waiters.writer.take() {
- wakers[curr] = Some(waker);
- curr += 1;
- }
- }
-
- #[cfg(feature = "net")]
- 'outer: loop {
- let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
-
- while curr < NUM_WAKERS {
- match iter.next() {
- Some(waiter) => {
- let waiter = unsafe { &mut *waiter.as_ptr() };
-
- if let Some(waker) = waiter.waker.take() {
- waiter.is_ready = true;
- wakers[curr] = Some(waker);
- curr += 1;
- }
- }
- None => {
- break 'outer;
- }
- }
- }
-
- drop(waiters);
-
- for waker in wakers.iter_mut().take(curr) {
- waker.take().unwrap().wake();
- }
-
- curr = 0;
-
- // Acquire the lock again.
- waiters = self.waiters.lock();
- }
-
- // Release the lock before notifying
- drop(waiters);
-
- for waker in wakers.iter_mut().take(curr) {
- waker.take().unwrap().wake();
- }
- }
-
- pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
- let curr = self.readiness.load(Acquire);
-
- ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
- }
- }
-
- /// Poll version of checking readiness for a certain direction.
- ///
- /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
- /// which cannot use the `async fn` version. This uses reserved reader
- /// and writer slots.
- pub(super) fn poll_readiness(
- &self,
- cx: &mut Context<'_>,
- direction: Direction,
- ) -> Poll<ReadyEvent> {
- let curr = self.readiness.load(Acquire);
-
- let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
-
- if ready.is_empty() {
- // Update the task info
- let mut waiters = self.waiters.lock();
- let slot = match direction {
- Direction::Read => &mut waiters.reader,
- Direction::Write => &mut waiters.writer,
- };
-
- // Avoid cloning the waker if one is already stored that matches the
- // current task.
- match slot {
- Some(existing) => {
- if !existing.will_wake(cx.waker()) {
- *existing = cx.waker().clone();
- }
- }
- None => {
- *slot = Some(cx.waker().clone());
- }
- }
-
- // Try again, in case the readiness was changed while we were
- // taking the waiters lock
- let curr = self.readiness.load(Acquire);
- let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
- if waiters.is_shutdown {
- Poll::Ready(ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready: direction.mask(),
- })
- } else if ready.is_empty() {
- Poll::Pending
- } else {
- Poll::Ready(ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready,
- })
- }
- } else {
- Poll::Ready(ReadyEvent {
- tick: TICK.unpack(curr) as u8,
- ready,
- })
- }
- }
-
- pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- // This consumes the current readiness state **except** for closed
- // states. Closed states are excluded because they are final states.
- let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
-
- // result isn't important
- let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
- }
-
- pub(crate) fn clear_wakers(&self) {
- let mut waiters = self.waiters.lock();
- waiters.reader.take();
- waiters.writer.take();
- }
-}
-
-impl Drop for ScheduledIo {
- fn drop(&mut self) {
- self.wake(Ready::ALL);
- }
-}
-
-unsafe impl Send for ScheduledIo {}
-unsafe impl Sync for ScheduledIo {}
-
-cfg_io_readiness! {
- impl ScheduledIo {
- /// An async version of `poll_readiness` which uses a linked list of wakers
- pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
- self.readiness_fut(interest).await
- }
-
- // This is in a separate function so that the borrow checker doesn't think
- // we are borrowing the `UnsafeCell` possibly over await boundaries.
- //
- // Go figure.
- fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
- Readiness {
- scheduled_io: self,
- state: State::Init,
- waiter: UnsafeCell::new(Waiter {
- pointers: linked_list::Pointers::new(),
- waker: None,
- is_ready: false,
- interest,
- _p: PhantomPinned,
- }),
- }
- }
- }
-
- unsafe impl linked_list::Link for Waiter {
- type Handle = NonNull<Waiter>;
- type Target = Waiter;
-
- fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
- *handle
- }
-
- unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
- ptr
- }
-
- unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
- NonNull::from(&mut target.as_mut().pointers)
- }
- }
-
- // ===== impl Readiness =====
-
- impl Future for Readiness<'_> {
- type Output = ReadyEvent;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- use std::sync::atomic::Ordering::SeqCst;
-
- let (scheduled_io, state, waiter) = unsafe {
- let me = self.get_unchecked_mut();
- (&me.scheduled_io, &mut me.state, &me.waiter)
- };
-
- loop {
- match *state {
- State::Init => {
- // Optimistically check existing readiness
- let curr = scheduled_io.readiness.load(SeqCst);
- let ready = Ready::from_usize(READINESS.unpack(curr));
-
- // Safety: `waiter.interest` never changes
- let interest = unsafe { (*waiter.get()).interest };
- let ready = ready.intersection(interest);
-
- if !ready.is_empty() {
- // Currently ready!
- let tick = TICK.unpack(curr) as u8;
- *state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
- }
-
- // Wasn't ready, take the lock (and check again while locked).
- let mut waiters = scheduled_io.waiters.lock();
-
- let curr = scheduled_io.readiness.load(SeqCst);
- let mut ready = Ready::from_usize(READINESS.unpack(curr));
-
- if waiters.is_shutdown {
- ready = Ready::ALL;
- }
-
- let ready = ready.intersection(interest);
-
- if !ready.is_empty() {
- // Currently ready!
- let tick = TICK.unpack(curr) as u8;
- *state = State::Done;
- return Poll::Ready(ReadyEvent { tick, ready });
- }
-
- // Not ready even after locked, insert into list...
-
- // Safety: called while locked
- unsafe {
- (*waiter.get()).waker = Some(cx.waker().clone());
- }
-
- // Insert the waiter into the linked list
- //
- // safety: pointers from `UnsafeCell` are never null.
- waiters
- .list
- .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
- *state = State::Waiting;
- }
- State::Waiting => {
- // Currently in the "Waiting" state, implying the caller has
- // a waiter stored in the waiter list (guarded by
- // `notify.waiters`). In order to access the waker fields,
- // we must hold the lock.
-
- let waiters = scheduled_io.waiters.lock();
-
- // Safety: called while locked
- let w = unsafe { &mut *waiter.get() };
-
- if w.is_ready {
- // Our waker has been notified.
- *state = State::Done;
- } else {
- // Update the waker, if necessary.
- if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
- w.waker = Some(cx.waker().clone());
- }
-
- return Poll::Pending;
- }
-
- // Explicit drop of the lock to indicate the scope that the
- // lock is held. Because holding the lock is required to
- // ensure safe access to fields not held within the lock, it
- // is helpful to visualize the scope of the critical
- // section.
- drop(waiters);
- }
- State::Done => {
- let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;
-
- // Safety: State::Done means it is no longer shared
- let w = unsafe { &mut *waiter.get() };
-
- return Poll::Ready(ReadyEvent {
- tick,
- ready: Ready::from_interest(w.interest),
- });
- }
- }
- }
- }
- }
-
- impl Drop for Readiness<'_> {
- fn drop(&mut self) {
- let mut waiters = self.scheduled_io.waiters.lock();
-
- // Safety: `waiter` is only ever stored in `waiters`
- unsafe {
- waiters
- .list
- .remove(NonNull::new_unchecked(self.waiter.get()))
- };
- }
- }
-
- unsafe impl Send for Readiness<'_> {}
- unsafe impl Sync for Readiness<'_> {}
-}