summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/io
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/io
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/io')
-rw-r--r--vendor/tokio/src/runtime/io/metrics.rs24
-rw-r--r--vendor/tokio/src/runtime/io/mod.rs356
-rw-r--r--vendor/tokio/src/runtime/io/registration.rs252
-rw-r--r--vendor/tokio/src/runtime/io/scheduled_io.rs558
4 files changed, 1190 insertions, 0 deletions
diff --git a/vendor/tokio/src/runtime/io/metrics.rs b/vendor/tokio/src/runtime/io/metrics.rs
new file mode 100644
index 000000000..ec341efe6
--- /dev/null
+++ b/vendor/tokio/src/runtime/io/metrics.rs
@@ -0,0 +1,24 @@
+//! This file contains mocks of the metrics types used in the I/O driver.
+//!
+//! The reason these mocks don't live in `src/runtime/mock.rs` is because
+//! these need to be available in the case when `net` is enabled but
+//! `rt` is not.
+
+cfg_not_rt_and_metrics_and_net! {
+ #[derive(Default)]
+ pub(crate) struct IoDriverMetrics {}
+
+ impl IoDriverMetrics {
+ pub(crate) fn incr_fd_count(&self) {}
+ pub(crate) fn dec_fd_count(&self) {}
+ pub(crate) fn incr_ready_count_by(&self, _amt: u64) {}
+ }
+}
+
+cfg_net! {
+ cfg_rt! {
+ cfg_metrics! {
+ pub(crate) use crate::runtime::IoDriverMetrics;
+ }
+ }
+}
diff --git a/vendor/tokio/src/runtime/io/mod.rs b/vendor/tokio/src/runtime/io/mod.rs
new file mode 100644
index 000000000..2dd426f11
--- /dev/null
+++ b/vendor/tokio/src/runtime/io/mod.rs
@@ -0,0 +1,356 @@
+#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]
+
+mod registration;
+pub(crate) use registration::Registration;
+
+mod scheduled_io;
+use scheduled_io::ScheduledIo;
+
+mod metrics;
+
+use crate::io::interest::Interest;
+use crate::io::ready::Ready;
+use crate::runtime::driver;
+use crate::util::slab::{self, Slab};
+use crate::{loom::sync::RwLock, util::bit};
+
+use metrics::IoDriverMetrics;
+
+use std::fmt;
+use std::io;
+use std::time::Duration;
+
+/// I/O driver, backed by Mio.
+pub(crate) struct Driver {
+ /// Tracks the number of times `turn` is called. It is safe for this to wrap
+ /// as it is mostly used to determine when to call `compact()`.
+ tick: u8,
+
+ /// True when an event with the signal token is received
+ signal_ready: bool,
+
+ /// Reuse the `mio::Events` value across calls to poll.
+ events: mio::Events,
+
+ /// Primary slab handle containing the state for each resource registered
+ /// with this driver.
+ resources: Slab<ScheduledIo>,
+
+ /// The system event queue.
+ poll: mio::Poll,
+}
+
+/// A reference to an I/O driver.
+pub(crate) struct Handle {
+ /// Registers I/O resources.
+ registry: mio::Registry,
+
+ /// Allocates `ScheduledIo` handles when creating new resources.
+ io_dispatch: RwLock<IoDispatcher>,
+
+ /// Used to wake up the reactor from a call to `turn`.
+ /// Not supported on Wasi due to lack of threading support.
+ #[cfg(not(tokio_wasi))]
+ waker: mio::Waker,
+
+ pub(crate) metrics: IoDriverMetrics,
+}
+
+#[derive(Debug)]
+pub(crate) struct ReadyEvent {
+ tick: u8,
+ pub(crate) ready: Ready,
+ is_shutdown: bool,
+}
+
+cfg_net_unix!(
+ impl ReadyEvent {
+ pub(crate) fn with_ready(&self, ready: Ready) -> Self {
+ Self {
+ ready,
+ tick: self.tick,
+ is_shutdown: self.is_shutdown,
+ }
+ }
+ }
+);
+
+struct IoDispatcher {
+ allocator: slab::Allocator<ScheduledIo>,
+ is_shutdown: bool,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+enum Direction {
+ Read,
+ Write,
+}
+
+enum Tick {
+ Set(u8),
+ Clear(u8),
+}
+
+// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
+// token.
+const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
+const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31));
+
+const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
+
+// Packs the generation value in the `readiness` field.
+//
+// The generation prevents a race condition where a slab slot is reused for a
+// new socket while the I/O driver is about to apply a readiness event. The
+// generation value is checked when setting new readiness. If the generation do
+// not match, then the readiness event is discarded.
+const GENERATION: bit::Pack = ADDRESS.then(7);
+
+fn _assert_kinds() {
+ fn _assert<T: Send + Sync>() {}
+
+ _assert::<Handle>();
+}
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new event loop, returning any error that happened during the
+ /// creation.
+ pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
+ let poll = mio::Poll::new()?;
+ #[cfg(not(tokio_wasi))]
+ let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
+ let registry = poll.registry().try_clone()?;
+
+ let slab = Slab::new();
+ let allocator = slab.allocator();
+
+ let driver = Driver {
+ tick: 0,
+ signal_ready: false,
+ events: mio::Events::with_capacity(nevents),
+ poll,
+ resources: slab,
+ };
+
+ let handle = Handle {
+ registry,
+ io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
+ #[cfg(not(tokio_wasi))]
+ waker,
+ metrics: IoDriverMetrics::default(),
+ };
+
+ Ok((driver, handle))
+ }
+
+ pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
+ let handle = rt_handle.io();
+ self.turn(handle, None);
+ }
+
+ pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
+ let handle = rt_handle.io();
+ self.turn(handle, Some(duration));
+ }
+
+ pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
+ let handle = rt_handle.io();
+
+ if handle.shutdown() {
+ self.resources.for_each(|io| {
+ // If a task is waiting on the I/O resource, notify it that the
+ // runtime is being shutdown. And shutdown will clear all wakers.
+ io.shutdown();
+ });
+ }
+ }
+
+ fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
+ // How often to call `compact()` on the resource slab
+ const COMPACT_INTERVAL: u8 = 255;
+
+ self.tick = self.tick.wrapping_add(1);
+
+ if self.tick == COMPACT_INTERVAL {
+ self.resources.compact()
+ }
+
+ let events = &mut self.events;
+
+ // Block waiting for an event to happen, peeling out how many events
+ // happened.
+ match self.poll.poll(events, max_wait) {
+ Ok(_) => {}
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+ #[cfg(tokio_wasi)]
+ Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
+ // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
+ // just return from the park, as there would be nothing, which wakes us up.
+ }
+ Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
+ }
+
+ // Process all the events that came in, dispatching appropriately
+ let mut ready_count = 0;
+ for event in events.iter() {
+ let token = event.token();
+
+ if token == TOKEN_WAKEUP {
+ // Nothing to do, the event is used to unblock the I/O driver
+ } else if token == TOKEN_SIGNAL {
+ self.signal_ready = true;
+ } else {
+ Self::dispatch(
+ &mut self.resources,
+ self.tick,
+ token,
+ Ready::from_mio(event),
+ );
+ ready_count += 1;
+ }
+ }
+
+ handle.metrics.incr_ready_count_by(ready_count);
+ }
+
+ fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
+ let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
+
+ let io = match resources.get(addr) {
+ Some(io) => io,
+ None => return,
+ };
+
+ let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready);
+
+ if res.is_err() {
+ // token no longer valid!
+ return;
+ }
+
+ io.wake(ready);
+ }
+}
+
+impl fmt::Debug for Driver {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Driver")
+ }
+}
+
+impl Handle {
+ /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
+ /// makes the next call to `turn` return immediately.
+ ///
+ /// This method is intended to be used in situations where a notification
+ /// needs to otherwise be sent to the main reactor. If the reactor is
+ /// currently blocked inside of `turn` then it will wake up and soon return
+ /// after this method has been called. If the reactor is not currently
+ /// blocked in `turn`, then the next call to `turn` will not block and
+ /// return immediately.
+ pub(crate) fn unpark(&self) {
+ #[cfg(not(tokio_wasi))]
+ self.waker.wake().expect("failed to wake I/O driver");
+ }
+
+ /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
+ ///
+ /// The registration token is returned.
+ pub(super) fn add_source(
+ &self,
+ source: &mut impl mio::event::Source,
+ interest: Interest,
+ ) -> io::Result<slab::Ref<ScheduledIo>> {
+ let (address, shared) = self.allocate()?;
+
+ let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
+
+ self.registry
+ .register(source, mio::Token(token), interest.to_mio())?;
+
+ self.metrics.incr_fd_count();
+
+ Ok(shared)
+ }
+
+ /// Deregisters an I/O resource from the reactor.
+ pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
+ self.registry.deregister(source)?;
+
+ self.metrics.dec_fd_count();
+
+ Ok(())
+ }
+
+ /// shutdown the dispatcher.
+ fn shutdown(&self) -> bool {
+ let mut io = self.io_dispatch.write().unwrap();
+ if io.is_shutdown {
+ return false;
+ }
+ io.is_shutdown = true;
+ true
+ }
+
+ fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
+ let io = self.io_dispatch.read().unwrap();
+ if io.is_shutdown {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
+ ));
+ }
+ io.allocator.allocate().ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::Other,
+ "reactor at max registered I/O resources",
+ )
+ })
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
+
+// ===== impl IoDispatcher =====
+
+impl IoDispatcher {
+ fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
+ Self {
+ allocator,
+ is_shutdown: false,
+ }
+ }
+}
+
+impl Direction {
+ pub(super) fn mask(self) -> Ready {
+ match self {
+ Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
+ Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
+ }
+ }
+}
+
+// Signal handling
+cfg_signal_internal_and_unix! {
+ impl Handle {
+ pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
+ self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
+ Ok(())
+ }
+ }
+
+ impl Driver {
+ pub(crate) fn consume_signal_ready(&mut self) -> bool {
+ let ret = self.signal_ready;
+ self.signal_ready = false;
+ ret
+ }
+ }
+}
diff --git a/vendor/tokio/src/runtime/io/registration.rs b/vendor/tokio/src/runtime/io/registration.rs
new file mode 100644
index 000000000..341fa0539
--- /dev/null
+++ b/vendor/tokio/src/runtime/io/registration.rs
@@ -0,0 +1,252 @@
+#![cfg_attr(not(feature = "net"), allow(dead_code))]
+
+use crate::io::interest::Interest;
+use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo};
+use crate::runtime::scheduler;
+use crate::util::slab;
+
+use mio::event::Source;
+use std::io;
+use std::task::{Context, Poll};
+
+cfg_io_driver! {
+ /// Associates an I/O resource with the reactor instance that drives it.
+ ///
+ /// A registration represents an I/O resource registered with a Reactor such
+ /// that it will receive task notifications on readiness. This is the lowest
+ /// level API for integrating with a reactor.
+ ///
+ /// The association between an I/O resource is made by calling
+ /// [`new_with_interest_and_handle`].
+ /// Once the association is established, it remains established until the
+ /// registration instance is dropped.
+ ///
+ /// A registration instance represents two separate readiness streams. One
+ /// for the read readiness and one for write readiness. These streams are
+ /// independent and can be consumed from separate tasks.
+ ///
+ /// **Note**: while `Registration` is `Sync`, the caller must ensure that
+ /// there are at most two tasks that use a registration instance
+ /// concurrently. One task for [`poll_read_ready`] and one task for
+ /// [`poll_write_ready`]. While violating this requirement is "safe" from a
+ /// Rust memory safety point of view, it will result in unexpected behavior
+ /// in the form of lost notifications and tasks hanging.
+ ///
+ /// ## Platform-specific events
+ ///
+ /// `Registration` also allows receiving platform-specific `mio::Ready`
+ /// events. These events are included as part of the read readiness event
+ /// stream. The write readiness event stream is only for `Ready::writable()`
+ /// events.
+ ///
+ /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
+ /// [`poll_read_ready`]: method@Self::poll_read_ready`
+ /// [`poll_write_ready`]: method@Self::poll_write_ready`
+ #[derive(Debug)]
+ pub(crate) struct Registration {
+ /// Handle to the associated runtime.
+ handle: scheduler::Handle,
+
+ /// Reference to state stored by the driver.
+ shared: slab::Ref<ScheduledIo>,
+ }
+}
+
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+
+// ===== impl Registration =====
+
+impl Registration {
+ /// Registers the I/O resource with the reactor for the provided handle, for
+ /// a specific `Interest`. This does not add `hup` or `error` so if you are
+ /// interested in those states, you will need to add them to the readiness
+ /// state passed to this function.
+ ///
+ /// # Return
+ ///
+ /// - `Ok` if the registration happened successfully
+ /// - `Err` if an error was encountered during registration
+ #[track_caller]
+ pub(crate) fn new_with_interest_and_handle(
+ io: &mut impl Source,
+ interest: Interest,
+ handle: scheduler::Handle,
+ ) -> io::Result<Registration> {
+ let shared = handle.driver().io().add_source(io, interest)?;
+
+ Ok(Registration { handle, shared })
+ }
+
+ /// Deregisters the I/O resource from the reactor it is associated with.
+ ///
+ /// This function must be called before the I/O resource associated with the
+ /// registration is dropped.
+ ///
+ /// Note that deregistering does not guarantee that the I/O resource can be
+ /// registered with a different reactor. Some I/O resource types can only be
+ /// associated with a single reactor instance for their lifetime.
+ ///
+ /// # Return
+ ///
+ /// If the deregistration was successful, `Ok` is returned. Any calls to
+ /// `Reactor::turn` that happen after a successful call to `deregister` will
+ /// no longer result in notifications getting sent for this registration.
+ ///
+ /// `Err` is returned if an error is encountered.
+ pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
+ self.handle().deregister_source(io)
+ }
+
+ pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
+ self.shared.clear_readiness(event);
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.poll_ready(cx, Direction::Read)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.poll_ready(cx, Direction::Write)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ #[cfg(not(tokio_wasi))]
+ pub(crate) fn poll_read_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ self.poll_io(cx, Direction::Read, f)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_write_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ self.poll_io(cx, Direction::Write, f)
+ }
+
+ /// Polls for events on the I/O resource's `direction` readiness stream.
+ ///
+ /// If called with a task context, notify the task when a new event is
+ /// received.
+ fn poll_ready(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ ) -> Poll<io::Result<ReadyEvent>> {
+ ready!(crate::trace::trace_leaf(cx));
+ // Keep track of task budget
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
+ let ev = ready!(self.shared.poll_readiness(cx, direction));
+
+ if ev.is_shutdown {
+ return Poll::Ready(Err(gone()));
+ }
+
+ coop.made_progress();
+ Poll::Ready(Ok(ev))
+ }
+
+ fn poll_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ loop {
+ let ev = ready!(self.poll_ready(cx, direction))?;
+
+ match f() {
+ Ok(ret) => {
+ return Poll::Ready(Ok(ret));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
+ }
+
+ pub(crate) fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ let ev = self.shared.ready_event(interest);
+
+ // Don't attempt the operation if the resource is not ready.
+ if ev.ready.is_empty() {
+ return Err(io::ErrorKind::WouldBlock.into());
+ }
+
+ match f() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(ev);
+ Err(io::ErrorKind::WouldBlock.into())
+ }
+ res => res,
+ }
+ }
+
+ fn handle(&self) -> &Handle {
+ self.handle.driver().io()
+ }
+}
+
+impl Drop for Registration {
+ fn drop(&mut self) {
+ // It is possible for a cycle to be created between wakers stored in
+ // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
+ // cycle, wakers are cleared. This is an imperfect solution as it is
+ // possible to store a `Registration` in a waker. In this case, the
+ // cycle would remain.
+ //
+ // See tokio-rs/tokio#3481 for more details.
+ self.shared.clear_wakers();
+ }
+}
+
+fn gone() -> io::Error {
+ io::Error::new(
+ io::ErrorKind::Other,
+ crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
+ )
+}
+
+cfg_io_readiness! {
+ impl Registration {
+ pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
+ let ev = self.shared.readiness(interest).await;
+
+ if ev.is_shutdown {
+ return Err(gone())
+ }
+
+ Ok(ev)
+ }
+
+ pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
+ loop {
+ let event = self.readiness(interest).await?;
+
+ match f() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(event);
+ }
+ x => return x,
+ }
+ }
+ }
+ }
+}
diff --git a/vendor/tokio/src/runtime/io/scheduled_io.rs b/vendor/tokio/src/runtime/io/scheduled_io.rs
new file mode 100644
index 000000000..197a4e0e2
--- /dev/null
+++ b/vendor/tokio/src/runtime/io/scheduled_io.rs
@@ -0,0 +1,558 @@
+use super::{ReadyEvent, Tick};
+use crate::io::interest::Interest;
+use crate::io::ready::Ready;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Mutex;
+use crate::util::bit;
+use crate::util::slab::Entry;
+use crate::util::WakeList;
+
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::task::{Context, Poll, Waker};
+
+use super::Direction;
+
+cfg_io_readiness! {
+ use crate::util::linked_list::{self, LinkedList};
+
+ use std::cell::UnsafeCell;
+ use std::future::Future;
+ use std::marker::PhantomPinned;
+ use std::pin::Pin;
+ use std::ptr::NonNull;
+}
+
+/// Stored in the I/O driver resource slab.
+#[derive(Debug)]
+pub(crate) struct ScheduledIo {
+ /// Packs the resource's readiness with the resource's generation.
+ readiness: AtomicUsize,
+
+ waiters: Mutex<Waiters>,
+}
+
+cfg_io_readiness! {
+ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
+}
+
+#[derive(Debug, Default)]
+struct Waiters {
+ #[cfg(feature = "net")]
+ /// List of all current waiters.
+ list: WaitList,
+
+ /// Waker used for AsyncRead.
+ reader: Option<Waker>,
+
+ /// Waker used for AsyncWrite.
+ writer: Option<Waker>,
+}
+
+cfg_io_readiness! {
+ #[derive(Debug)]
+ struct Waiter {
+ pointers: linked_list::Pointers<Waiter>,
+
+ /// The waker for this task.
+ waker: Option<Waker>,
+
+ /// The interest this waiter is waiting on.
+ interest: Interest,
+
+ is_ready: bool,
+
+ /// Should never be `!Unpin`.
+ _p: PhantomPinned,
+ }
+
+ generate_addr_of_methods! {
+ impl<> Waiter {
+ unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
+ &self.pointers
+ }
+ }
+ }
+
+ /// Future returned by `readiness()`.
+ struct Readiness<'a> {
+ scheduled_io: &'a ScheduledIo,
+
+ state: State,
+
+ /// Entry in the waiter `LinkedList`.
+ waiter: UnsafeCell<Waiter>,
+ }
+
+ enum State {
+ Init,
+ Waiting,
+ Done,
+ }
+}
+
+// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
+//
+// | shutdown | generation | driver tick | readiness |
+// |----------+------------+--------------+-----------|
+// | 1 bit | 7 bits + 8 bits + 16 bits |
+
+const READINESS: bit::Pack = bit::Pack::least_significant(16);
+
+const TICK: bit::Pack = READINESS.then(8);
+
+const GENERATION: bit::Pack = TICK.then(7);
+
+const SHUTDOWN: bit::Pack = GENERATION.then(1);
+
+#[test]
+fn test_generations_assert_same() {
+ assert_eq!(super::GENERATION, GENERATION);
+}
+
+// ===== impl ScheduledIo =====
+
+impl Entry for ScheduledIo {
+ fn reset(&self) {
+ let state = self.readiness.load(Acquire);
+
+ let generation = GENERATION.unpack(state);
+ let next = GENERATION.pack_lossy(generation + 1, 0);
+
+ self.readiness.store(next, Release);
+ }
+}
+
+impl Default for ScheduledIo {
+ fn default() -> ScheduledIo {
+ ScheduledIo {
+ readiness: AtomicUsize::new(0),
+ waiters: Mutex::new(Default::default()),
+ }
+ }
+}
+
+impl ScheduledIo {
+ pub(crate) fn generation(&self) -> usize {
+ GENERATION.unpack(self.readiness.load(Acquire))
+ }
+
+ /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
+ /// permanently shutdown state.
+ pub(super) fn shutdown(&self) {
+ let mask = SHUTDOWN.pack(1, 0);
+ self.readiness.fetch_or(mask, AcqRel);
+ self.wake(Ready::ALL);
+ }
+
+ /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
+ /// the current value, returning the previous readiness value.
+ ///
+ /// # Arguments
+ /// - `token`: the token for this `ScheduledIo`.
+ /// - `tick`: whether setting the tick or trying to clear readiness for a
+ /// specific tick.
+ /// - `f`: a closure returning a new readiness value given the previous
+ /// readiness.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `Err`.
+ /// Otherwise, this returns the previous readiness.
+ pub(super) fn set_readiness(
+ &self,
+ token: Option<usize>,
+ tick: Tick,
+ f: impl Fn(Ready) -> Ready,
+ ) -> Result<(), ()> {
+ let mut current = self.readiness.load(Acquire);
+
+ loop {
+ let current_generation = GENERATION.unpack(current);
+
+ if let Some(token) = token {
+ // Check that the generation for this access is still the
+ // current one.
+ if GENERATION.unpack(token) != current_generation {
+ return Err(());
+ }
+ }
+
+ // Mask out the tick/generation bits so that the modifying
+ // function doesn't see them.
+ let current_readiness = Ready::from_usize(current);
+ let new = f(current_readiness);
+
+ let packed = match tick {
+ Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
+ Tick::Clear(t) => {
+ if TICK.unpack(current) as u8 != t {
+ // Trying to clear readiness with an old event!
+ return Err(());
+ }
+
+ TICK.pack(t as usize, new.as_usize())
+ }
+ };
+
+ let next = GENERATION.pack(current_generation, packed);
+
+ match self
+ .readiness
+ .compare_exchange(current, next, AcqRel, Acquire)
+ {
+ Ok(_) => return Ok(()),
+ // we lost the race, retry!
+ Err(actual) => current = actual,
+ }
+ }
+ }
+
+ /// Notifies all pending waiters that have registered interest in `ready`.
+ ///
+ /// There may be many waiters to notify. Waking the pending task **must** be
+ /// done from outside of the lock otherwise there is a potential for a
+ /// deadlock.
+ ///
+ /// A stack array of wakers is created and filled with wakers to notify, the
+ /// lock is released, and the wakers are notified. Because there may be more
+ /// than 32 wakers to notify, if the stack array fills up, the lock is
+ /// released, the array is cleared, and the iteration continues.
+ pub(super) fn wake(&self, ready: Ready) {
+ let mut wakers = WakeList::new();
+
+ let mut waiters = self.waiters.lock();
+
+ // check for AsyncRead slot
+ if ready.is_readable() {
+ if let Some(waker) = waiters.reader.take() {
+ wakers.push(waker);
+ }
+ }
+
+ // check for AsyncWrite slot
+ if ready.is_writable() {
+ if let Some(waker) = waiters.writer.take() {
+ wakers.push(waker);
+ }
+ }
+
+ #[cfg(feature = "net")]
+ 'outer: loop {
+ let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
+
+ while wakers.can_push() {
+ match iter.next() {
+ Some(waiter) => {
+ let waiter = unsafe { &mut *waiter.as_ptr() };
+
+ if let Some(waker) = waiter.waker.take() {
+ waiter.is_ready = true;
+ wakers.push(waker);
+ }
+ }
+ None => {
+ break 'outer;
+ }
+ }
+ }
+
+ drop(waiters);
+
+ wakers.wake_all();
+
+ // Acquire the lock again.
+ waiters = self.waiters.lock();
+ }
+
+ // Release the lock before notifying
+ drop(waiters);
+
+ wakers.wake_all();
+ }
+
+ pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
+ let curr = self.readiness.load(Acquire);
+
+ ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
+ is_shutdown: SHUTDOWN.unpack(curr) != 0,
+ }
+ }
+
+ /// Polls for readiness events in a given direction.
+ ///
+ /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
+ /// which cannot use the `async fn` version. This uses reserved reader
+ /// and writer slots.
+ pub(super) fn poll_readiness(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ ) -> Poll<ReadyEvent> {
+ let curr = self.readiness.load(Acquire);
+
+ let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+
+ if ready.is_empty() && !is_shutdown {
+ // Update the task info
+ let mut waiters = self.waiters.lock();
+ let slot = match direction {
+ Direction::Read => &mut waiters.reader,
+ Direction::Write => &mut waiters.writer,
+ };
+
+ // Avoid cloning the waker if one is already stored that matches the
+ // current task.
+ match slot {
+ Some(existing) => {
+ if !existing.will_wake(cx.waker()) {
+ *existing = cx.waker().clone();
+ }
+ }
+ None => {
+ *slot = Some(cx.waker().clone());
+ }
+ }
+
+ // Try again, in case the readiness was changed while we were
+ // taking the waiters lock
+ let curr = self.readiness.load(Acquire);
+ let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+ if is_shutdown {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready: direction.mask(),
+ is_shutdown,
+ })
+ } else if ready.is_empty() {
+ Poll::Pending
+ } else {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready,
+ is_shutdown,
+ })
+ }
+ } else {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready,
+ is_shutdown,
+ })
+ }
+ }
+
+ pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
+ // This consumes the current readiness state **except** for closed
+ // states. Closed states are excluded because they are final states.
+ let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
+
+ // result isn't important
+ let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
+ }
+
+ pub(crate) fn clear_wakers(&self) {
+ let mut waiters = self.waiters.lock();
+ waiters.reader.take();
+ waiters.writer.take();
+ }
+}
+
+impl Drop for ScheduledIo {
+ fn drop(&mut self) {
+ self.wake(Ready::ALL);
+ }
+}
+
+unsafe impl Send for ScheduledIo {}
+unsafe impl Sync for ScheduledIo {}
+
+cfg_io_readiness! {
+ impl ScheduledIo {
+ /// An async version of `poll_readiness` which uses a linked list of wakers.
+ pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
+ self.readiness_fut(interest).await
+ }
+
+ // This is in a separate function so that the borrow checker doesn't think
+ // we are borrowing the `UnsafeCell` possibly over await boundaries.
+ //
+ // Go figure.
+ fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
+ Readiness {
+ scheduled_io: self,
+ state: State::Init,
+ waiter: UnsafeCell::new(Waiter {
+ pointers: linked_list::Pointers::new(),
+ waker: None,
+ is_ready: false,
+ interest,
+ _p: PhantomPinned,
+ }),
+ }
+ }
+ }
+
+ unsafe impl linked_list::Link for Waiter {
+ type Handle = NonNull<Waiter>;
+ type Target = Waiter;
+
+ fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
+ *handle
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
+ ptr
+ }
+
+ unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
+ Waiter::addr_of_pointers(target)
+ }
+ }
+
+ // ===== impl Readiness =====
+
+ impl Future for Readiness<'_> {
+ type Output = ReadyEvent;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ use std::sync::atomic::Ordering::SeqCst;
+
+ let (scheduled_io, state, waiter) = unsafe {
+ let me = self.get_unchecked_mut();
+ (&me.scheduled_io, &mut me.state, &me.waiter)
+ };
+
+ loop {
+ match *state {
+ State::Init => {
+ // Optimistically check existing readiness
+ let curr = scheduled_io.readiness.load(SeqCst);
+ let ready = Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+
+ // Safety: `waiter.interest` never changes
+ let interest = unsafe { (*waiter.get()).interest };
+ let ready = ready.intersection(interest);
+
+ if !ready.is_empty() || is_shutdown {
+ // Currently ready!
+ let tick = TICK.unpack(curr) as u8;
+ *state = State::Done;
+ return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
+ }
+
+ // Wasn't ready, take the lock (and check again while locked).
+ let mut waiters = scheduled_io.waiters.lock();
+
+ let curr = scheduled_io.readiness.load(SeqCst);
+ let mut ready = Ready::from_usize(READINESS.unpack(curr));
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+
+ if is_shutdown {
+ ready = Ready::ALL;
+ }
+
+ let ready = ready.intersection(interest);
+
+ if !ready.is_empty() || is_shutdown {
+ // Currently ready!
+ let tick = TICK.unpack(curr) as u8;
+ *state = State::Done;
+ return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
+ }
+
+ // Not ready even after locked, insert into list...
+
+ // Safety: called while locked
+ unsafe {
+ (*waiter.get()).waker = Some(cx.waker().clone());
+ }
+
+ // Insert the waiter into the linked list
+ //
+ // safety: pointers from `UnsafeCell` are never null.
+ waiters
+ .list
+ .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
+ *state = State::Waiting;
+ }
+ State::Waiting => {
+ // Currently in the "Waiting" state, implying the caller has
+ // a waiter stored in the waiter list (guarded by
+ // `notify.waiters`). In order to access the waker fields,
+ // we must hold the lock.
+
+ let waiters = scheduled_io.waiters.lock();
+
+ // Safety: called while locked
+ let w = unsafe { &mut *waiter.get() };
+
+ if w.is_ready {
+ // Our waker has been notified.
+ *state = State::Done;
+ } else {
+ // Update the waker, if necessary.
+ if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
+ w.waker = Some(cx.waker().clone());
+ }
+
+ return Poll::Pending;
+ }
+
+ // Explicit drop of the lock to indicate the scope that the
+ // lock is held. Because holding the lock is required to
+ // ensure safe access to fields not held within the lock, it
+ // is helpful to visualize the scope of the critical
+ // section.
+ drop(waiters);
+ }
+ State::Done => {
+ // Safety: State::Done means it is no longer shared
+ let w = unsafe { &mut *waiter.get() };
+
+ let curr = scheduled_io.readiness.load(Acquire);
+ let is_shutdown = SHUTDOWN.unpack(curr) != 0;
+
+ // The returned tick might be newer than the event
+ // which notified our waker. This is ok because the future
+ // still didn't return `Poll::Ready`.
+ let tick = TICK.unpack(curr) as u8;
+
+ // The readiness state could have been cleared in the meantime,
+ // but we allow the returned ready set to be empty.
+ let curr_ready = Ready::from_usize(READINESS.unpack(curr));
+ let ready = curr_ready.intersection(w.interest);
+
+ return Poll::Ready(ReadyEvent {
+ tick,
+ ready,
+ is_shutdown,
+ });
+ }
+ }
+ }
+ }
+ }
+
+ impl Drop for Readiness<'_> {
+ fn drop(&mut self) {
+ let mut waiters = self.scheduled_io.waiters.lock();
+
+ // Safety: `waiter` is only ever stored in `waiters`
+ unsafe {
+ waiters
+ .list
+ .remove(NonNull::new_unchecked(self.waiter.get()))
+ };
+ }
+ }
+
+ unsafe impl Send for Readiness<'_> {}
+ unsafe impl Sync for Readiness<'_> {}
+}