diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/io | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/io')
-rw-r--r-- | vendor/tokio/src/runtime/io/metrics.rs | 24 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/io/mod.rs | 356 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/io/registration.rs | 252 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/io/scheduled_io.rs | 558 |
4 files changed, 1190 insertions, 0 deletions
diff --git a/vendor/tokio/src/runtime/io/metrics.rs b/vendor/tokio/src/runtime/io/metrics.rs new file mode 100644 index 000000000..ec341efe6 --- /dev/null +++ b/vendor/tokio/src/runtime/io/metrics.rs @@ -0,0 +1,24 @@ +//! This file contains mocks of the metrics types used in the I/O driver. +//! +//! The reason these mocks don't live in `src/runtime/mock.rs` is because +//! these need to be available in the case when `net` is enabled but +//! `rt` is not. + +cfg_not_rt_and_metrics_and_net! { + #[derive(Default)] + pub(crate) struct IoDriverMetrics {} + + impl IoDriverMetrics { + pub(crate) fn incr_fd_count(&self) {} + pub(crate) fn dec_fd_count(&self) {} + pub(crate) fn incr_ready_count_by(&self, _amt: u64) {} + } +} + +cfg_net! { + cfg_rt! { + cfg_metrics! { + pub(crate) use crate::runtime::IoDriverMetrics; + } + } +} diff --git a/vendor/tokio/src/runtime/io/mod.rs b/vendor/tokio/src/runtime/io/mod.rs new file mode 100644 index 000000000..2dd426f11 --- /dev/null +++ b/vendor/tokio/src/runtime/io/mod.rs @@ -0,0 +1,356 @@ +#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))] + +mod registration; +pub(crate) use registration::Registration; + +mod scheduled_io; +use scheduled_io::ScheduledIo; + +mod metrics; + +use crate::io::interest::Interest; +use crate::io::ready::Ready; +use crate::runtime::driver; +use crate::util::slab::{self, Slab}; +use crate::{loom::sync::RwLock, util::bit}; + +use metrics::IoDriverMetrics; + +use std::fmt; +use std::io; +use std::time::Duration; + +/// I/O driver, backed by Mio. +pub(crate) struct Driver { + /// Tracks the number of times `turn` is called. It is safe for this to wrap + /// as it is mostly used to determine when to call `compact()`. + tick: u8, + + /// True when an event with the signal token is received + signal_ready: bool, + + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// Primary slab handle containing the state for each resource registered + /// with this driver. + resources: Slab<ScheduledIo>, + + /// The system event queue. + poll: mio::Poll, +} + +/// A reference to an I/O driver. +pub(crate) struct Handle { + /// Registers I/O resources. + registry: mio::Registry, + + /// Allocates `ScheduledIo` handles when creating new resources. + io_dispatch: RwLock<IoDispatcher>, + + /// Used to wake up the reactor from a call to `turn`. + /// Not supported on Wasi due to lack of threading support. + #[cfg(not(tokio_wasi))] + waker: mio::Waker, + + pub(crate) metrics: IoDriverMetrics, +} + +#[derive(Debug)] +pub(crate) struct ReadyEvent { + tick: u8, + pub(crate) ready: Ready, + is_shutdown: bool, +} + +cfg_net_unix!( + impl ReadyEvent { + pub(crate) fn with_ready(&self, ready: Ready) -> Self { + Self { + ready, + tick: self.tick, + is_shutdown: self.is_shutdown, + } + } + } +); + +struct IoDispatcher { + allocator: slab::Allocator<ScheduledIo>, + is_shutdown: bool, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +enum Direction { + Read, + Write, +} + +enum Tick { + Set(u8), + Clear(u8), +} + +// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup +// token. +const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); +const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31)); + +const ADDRESS: bit::Pack = bit::Pack::least_significant(24); + +// Packs the generation value in the `readiness` field. +// +// The generation prevents a race condition where a slab slot is reused for a +// new socket while the I/O driver is about to apply a readiness event. The +// generation value is checked when setting new readiness. If the generation do +// not match, then the readiness event is discarded. +const GENERATION: bit::Pack = ADDRESS.then(7); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { + let poll = mio::Poll::new()?; + #[cfg(not(tokio_wasi))] + let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + let registry = poll.registry().try_clone()?; + + let slab = Slab::new(); + let allocator = slab.allocator(); + + let driver = Driver { + tick: 0, + signal_ready: false, + events: mio::Events::with_capacity(nevents), + poll, + resources: slab, + }; + + let handle = Handle { + registry, + io_dispatch: RwLock::new(IoDispatcher::new(allocator)), + #[cfg(not(tokio_wasi))] + waker, + metrics: IoDriverMetrics::default(), + }; + + Ok((driver, handle)) + } + + pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.io(); + self.turn(handle, None); + } + + pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { + let handle = rt_handle.io(); + self.turn(handle, Some(duration)); + } + + pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.io(); + + if handle.shutdown() { + self.resources.for_each(|io| { + // If a task is waiting on the I/O resource, notify it that the + // runtime is being shutdown. And shutdown will clear all wakers. + io.shutdown(); + }); + } + } + + fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) { + // How often to call `compact()` on the resource slab + const COMPACT_INTERVAL: u8 = 255; + + self.tick = self.tick.wrapping_add(1); + + if self.tick == COMPACT_INTERVAL { + self.resources.compact() + } + + let events = &mut self.events; + + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.poll.poll(events, max_wait) { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + #[cfg(tokio_wasi)] + Err(e) if e.kind() == io::ErrorKind::InvalidInput => { + // In case of wasm32_wasi this error happens, when trying to poll without subscriptions + // just return from the park, as there would be nothing, which wakes us up. + } + Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e), + } + + // Process all the events that came in, dispatching appropriately + let mut ready_count = 0; + for event in events.iter() { + let token = event.token(); + + if token == TOKEN_WAKEUP { + // Nothing to do, the event is used to unblock the I/O driver + } else if token == TOKEN_SIGNAL { + self.signal_ready = true; + } else { + Self::dispatch( + &mut self.resources, + self.tick, + token, + Ready::from_mio(event), + ); + ready_count += 1; + } + } + + handle.metrics.incr_ready_count_by(ready_count); + } + + fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) { + let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); + + let io = match resources.get(addr) { + Some(io) => io, + None => return, + }; + + let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready); + + if res.is_err() { + // token no longer valid! + return; + } + + io.wake(ready); + } +} + +impl fmt::Debug for Driver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Driver") + } +} + +impl Handle { + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + pub(crate) fn unpark(&self) { + #[cfg(not(tokio_wasi))] + self.waker.wake().expect("failed to wake I/O driver"); + } + + /// Registers an I/O resource with the reactor for a given `mio::Ready` state. + /// + /// The registration token is returned. + pub(super) fn add_source( + &self, + source: &mut impl mio::event::Source, + interest: Interest, + ) -> io::Result<slab::Ref<ScheduledIo>> { + let (address, shared) = self.allocate()?; + + let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); + + self.registry + .register(source, mio::Token(token), interest.to_mio())?; + + self.metrics.incr_fd_count(); + + Ok(shared) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { + self.registry.deregister(source)?; + + self.metrics.dec_fd_count(); + + Ok(()) + } + + /// shutdown the dispatcher. + fn shutdown(&self) -> bool { + let mut io = self.io_dispatch.write().unwrap(); + if io.is_shutdown { + return false; + } + io.is_shutdown = true; + true + } + + fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> { + let io = self.io_dispatch.read().unwrap(); + if io.is_shutdown { + return Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + )); + } + io.allocator.allocate().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "reactor at max registered I/O resources", + ) + }) + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl IoDispatcher ===== + +impl IoDispatcher { + fn new(allocator: slab::Allocator<ScheduledIo>) -> Self { + Self { + allocator, + is_shutdown: false, + } + } +} + +impl Direction { + pub(super) fn mask(self) -> Ready { + match self { + Direction::Read => Ready::READABLE | Ready::READ_CLOSED, + Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, + } + } +} + +// Signal handling +cfg_signal_internal_and_unix! { + impl Handle { + pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { + self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; + Ok(()) + } + } + + impl Driver { + pub(crate) fn consume_signal_ready(&mut self) -> bool { + let ret = self.signal_ready; + self.signal_ready = false; + ret + } + } +} diff --git a/vendor/tokio/src/runtime/io/registration.rs b/vendor/tokio/src/runtime/io/registration.rs new file mode 100644 index 000000000..341fa0539 --- /dev/null +++ b/vendor/tokio/src/runtime/io/registration.rs @@ -0,0 +1,252 @@ +#![cfg_attr(not(feature = "net"), allow(dead_code))] + +use crate::io::interest::Interest; +use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo}; +use crate::runtime::scheduler; +use crate::util::slab; + +use mio::event::Source; +use std::io; +use std::task::{Context, Poll}; + +cfg_io_driver! { + /// Associates an I/O resource with the reactor instance that drives it. + /// + /// A registration represents an I/O resource registered with a Reactor such + /// that it will receive task notifications on readiness. This is the lowest + /// level API for integrating with a reactor. + /// + /// The association between an I/O resource is made by calling + /// [`new_with_interest_and_handle`]. + /// Once the association is established, it remains established until the + /// registration instance is dropped. + /// + /// A registration instance represents two separate readiness streams. One + /// for the read readiness and one for write readiness. These streams are + /// independent and can be consumed from separate tasks. + /// + /// **Note**: while `Registration` is `Sync`, the caller must ensure that + /// there are at most two tasks that use a registration instance + /// concurrently. One task for [`poll_read_ready`] and one task for + /// [`poll_write_ready`]. While violating this requirement is "safe" from a + /// Rust memory safety point of view, it will result in unexpected behavior + /// in the form of lost notifications and tasks hanging. + /// + /// ## Platform-specific events + /// + /// `Registration` also allows receiving platform-specific `mio::Ready` + /// events. These events are included as part of the read readiness event + /// stream. The write readiness event stream is only for `Ready::writable()` + /// events. + /// + /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle + /// [`poll_read_ready`]: method@Self::poll_read_ready` + /// [`poll_write_ready`]: method@Self::poll_write_ready` + #[derive(Debug)] + pub(crate) struct Registration { + /// Handle to the associated runtime. + handle: scheduler::Handle, + + /// Reference to state stored by the driver. + shared: slab::Ref<ScheduledIo>, + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +// ===== impl Registration ===== + +impl Registration { + /// Registers the I/O resource with the reactor for the provided handle, for + /// a specific `Interest`. This does not add `hup` or `error` so if you are + /// interested in those states, you will need to add them to the readiness + /// state passed to this function. + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + #[track_caller] + pub(crate) fn new_with_interest_and_handle( + io: &mut impl Source, + interest: Interest, + handle: scheduler::Handle, + ) -> io::Result<Registration> { + let shared = handle.driver().io().add_source(io, interest)?; + + Ok(Registration { handle, shared }) + } + + /// Deregisters the I/O resource from the reactor it is associated with. + /// + /// This function must be called before the I/O resource associated with the + /// registration is dropped. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + /// + /// # Return + /// + /// If the deregistration was successful, `Ok` is returned. Any calls to + /// `Reactor::turn` that happen after a successful call to `deregister` will + /// no longer result in notifications getting sent for this registration. + /// + /// `Err` is returned if an error is encountered. + pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { + self.handle().deregister_source(io) + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + self.shared.clear_readiness(event); + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.poll_ready(cx, Direction::Read) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.poll_ready(cx, Direction::Write) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + #[cfg(not(tokio_wasi))] + pub(crate) fn poll_read_io<R>( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + self.poll_io(cx, Direction::Read, f) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_io<R>( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + self.poll_io(cx, Direction::Write, f) + } + + /// Polls for events on the I/O resource's `direction` readiness stream. + /// + /// If called with a task context, notify the task when a new event is + /// received. + fn poll_ready( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<io::Result<ReadyEvent>> { + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget + let coop = ready!(crate::runtime::coop::poll_proceed(cx)); + let ev = ready!(self.shared.poll_readiness(cx, direction)); + + if ev.is_shutdown { + return Poll::Ready(Err(gone())); + } + + coop.made_progress(); + Poll::Ready(Ok(ev)) + } + + fn poll_io<R>( + &self, + cx: &mut Context<'_>, + direction: Direction, + mut f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + loop { + let ev = ready!(self.poll_ready(cx, direction))?; + + match f() { + Ok(ret) => { + return Poll::Ready(Ok(ret)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), + } + } + } + + pub(crate) fn try_io<R>( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result<R>, + ) -> io::Result<R> { + let ev = self.shared.ready_event(interest); + + // Don't attempt the operation if the resource is not ready. + if ev.ready.is_empty() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + Err(io::ErrorKind::WouldBlock.into()) + } + res => res, + } + } + + fn handle(&self) -> &Handle { + self.handle.driver().io() + } +} + +impl Drop for Registration { + fn drop(&mut self) { + // It is possible for a cycle to be created between wakers stored in + // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this + // cycle, wakers are cleared. This is an imperfect solution as it is + // possible to store a `Registration` in a waker. In this case, the + // cycle would remain. + // + // See tokio-rs/tokio#3481 for more details. + self.shared.clear_wakers(); + } +} + +fn gone() -> io::Error { + io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + ) +} + +cfg_io_readiness! { + impl Registration { + pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { + let ev = self.shared.readiness(interest).await; + + if ev.is_shutdown { + return Err(gone()) + } + + Ok(ev) + } + + pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> { + loop { + let event = self.readiness(interest).await?; + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } + } + } +} diff --git a/vendor/tokio/src/runtime/io/scheduled_io.rs b/vendor/tokio/src/runtime/io/scheduled_io.rs new file mode 100644 index 000000000..197a4e0e2 --- /dev/null +++ b/vendor/tokio/src/runtime/io/scheduled_io.rs @@ -0,0 +1,558 @@ +use super::{ReadyEvent, Tick}; +use crate::io::interest::Interest; +use crate::io::ready::Ready; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::util::bit; +use crate::util::slab::Entry; +use crate::util::WakeList; + +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::{Context, Poll, Waker}; + +use super::Direction; + +cfg_io_readiness! { + use crate::util::linked_list::{self, LinkedList}; + + use std::cell::UnsafeCell; + use std::future::Future; + use std::marker::PhantomPinned; + use std::pin::Pin; + use std::ptr::NonNull; +} + +/// Stored in the I/O driver resource slab. +#[derive(Debug)] +pub(crate) struct ScheduledIo { + /// Packs the resource's readiness with the resource's generation. + readiness: AtomicUsize, + + waiters: Mutex<Waiters>, +} + +cfg_io_readiness! { + type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; +} + +#[derive(Debug, Default)] +struct Waiters { + #[cfg(feature = "net")] + /// List of all current waiters. + list: WaitList, + + /// Waker used for AsyncRead. + reader: Option<Waker>, + + /// Waker used for AsyncWrite. + writer: Option<Waker>, +} + +cfg_io_readiness! { + #[derive(Debug)] + struct Waiter { + pointers: linked_list::Pointers<Waiter>, + + /// The waker for this task. + waker: Option<Waker>, + + /// The interest this waiter is waiting on. + interest: Interest, + + is_ready: bool, + + /// Should never be `!Unpin`. + _p: PhantomPinned, + } + + generate_addr_of_methods! { + impl<> Waiter { + unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { + &self.pointers + } + } + } + + /// Future returned by `readiness()`. + struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, + + state: State, + + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell<Waiter>, + } + + enum State { + Init, + Waiting, + Done, + } +} + +// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. +// +// | shutdown | generation | driver tick | readiness | +// |----------+------------+--------------+-----------| +// | 1 bit | 7 bits + 8 bits + 16 bits | + +const READINESS: bit::Pack = bit::Pack::least_significant(16); + +const TICK: bit::Pack = READINESS.then(8); + +const GENERATION: bit::Pack = TICK.then(7); + +const SHUTDOWN: bit::Pack = GENERATION.then(1); + +#[test] +fn test_generations_assert_same() { + assert_eq!(super::GENERATION, GENERATION); +} + +// ===== impl ScheduledIo ===== + +impl Entry for ScheduledIo { + fn reset(&self) { + let state = self.readiness.load(Acquire); + + let generation = GENERATION.unpack(state); + let next = GENERATION.pack_lossy(generation + 1, 0); + + self.readiness.store(next, Release); + } +} + +impl Default for ScheduledIo { + fn default() -> ScheduledIo { + ScheduledIo { + readiness: AtomicUsize::new(0), + waiters: Mutex::new(Default::default()), + } + } +} + +impl ScheduledIo { + pub(crate) fn generation(&self) -> usize { + GENERATION.unpack(self.readiness.load(Acquire)) + } + + /// Invoked when the IO driver is shut down; forces this ScheduledIo into a + /// permanently shutdown state. + pub(super) fn shutdown(&self) { + let mask = SHUTDOWN.pack(1, 0); + self.readiness.fetch_or(mask, AcqRel); + self.wake(Ready::ALL); + } + + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on + /// the current value, returning the previous readiness value. + /// + /// # Arguments + /// - `token`: the token for this `ScheduledIo`. + /// - `tick`: whether setting the tick or trying to clear readiness for a + /// specific tick. + /// - `f`: a closure returning a new readiness value given the previous + /// readiness. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `Err`. + /// Otherwise, this returns the previous readiness. + pub(super) fn set_readiness( + &self, + token: Option<usize>, + tick: Tick, + f: impl Fn(Ready) -> Ready, + ) -> Result<(), ()> { + let mut current = self.readiness.load(Acquire); + + loop { + let current_generation = GENERATION.unpack(current); + + if let Some(token) = token { + // Check that the generation for this access is still the + // current one. + if GENERATION.unpack(token) != current_generation { + return Err(()); + } + } + + // Mask out the tick/generation bits so that the modifying + // function doesn't see them. + let current_readiness = Ready::from_usize(current); + let new = f(current_readiness); + + let packed = match tick { + Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), + Tick::Clear(t) => { + if TICK.unpack(current) as u8 != t { + // Trying to clear readiness with an old event! + return Err(()); + } + + TICK.pack(t as usize, new.as_usize()) + } + }; + + let next = GENERATION.pack(current_generation, packed); + + match self + .readiness + .compare_exchange(current, next, AcqRel, Acquire) + { + Ok(_) => return Ok(()), + // we lost the race, retry! + Err(actual) => current = actual, + } + } + } + + /// Notifies all pending waiters that have registered interest in `ready`. + /// + /// There may be many waiters to notify. Waking the pending task **must** be + /// done from outside of the lock otherwise there is a potential for a + /// deadlock. + /// + /// A stack array of wakers is created and filled with wakers to notify, the + /// lock is released, and the wakers are notified. Because there may be more + /// than 32 wakers to notify, if the stack array fills up, the lock is + /// released, the array is cleared, and the iteration continues. + pub(super) fn wake(&self, ready: Ready) { + let mut wakers = WakeList::new(); + + let mut waiters = self.waiters.lock(); + + // check for AsyncRead slot + if ready.is_readable() { + if let Some(waker) = waiters.reader.take() { + wakers.push(waker); + } + } + + // check for AsyncWrite slot + if ready.is_writable() { + if let Some(waker) = waiters.writer.take() { + wakers.push(waker); + } + } + + #[cfg(feature = "net")] + 'outer: loop { + let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); + + while wakers.can_push() { + match iter.next() { + Some(waiter) => { + let waiter = unsafe { &mut *waiter.as_ptr() }; + + if let Some(waker) = waiter.waker.take() { + waiter.is_ready = true; + wakers.push(waker); + } + } + None => { + break 'outer; + } + } + } + + drop(waiters); + + wakers.wake_all(); + + // Acquire the lock again. + waiters = self.waiters.lock(); + } + + // Release the lock before notifying + drop(waiters); + + wakers.wake_all(); + } + + pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { + let curr = self.readiness.load(Acquire); + + ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + is_shutdown: SHUTDOWN.unpack(curr) != 0, + } + } + + /// Polls for readiness events in a given direction. + /// + /// These are to support `AsyncRead` and `AsyncWrite` polling methods, + /// which cannot use the `async fn` version. This uses reserved reader + /// and writer slots. + pub(super) fn poll_readiness( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<ReadyEvent> { + let curr = self.readiness.load(Acquire); + + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + + if ready.is_empty() && !is_shutdown { + // Update the task info + let mut waiters = self.waiters.lock(); + let slot = match direction { + Direction::Read => &mut waiters.reader, + Direction::Write => &mut waiters.writer, + }; + + // Avoid cloning the waker if one is already stored that matches the + // current task. + match slot { + Some(existing) => { + if !existing.will_wake(cx.waker()) { + *existing = cx.waker().clone(); + } + } + None => { + *slot = Some(cx.waker().clone()); + } + } + + // Try again, in case the readiness was changed while we were + // taking the waiters lock + let curr = self.readiness.load(Acquire); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + if is_shutdown { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: direction.mask(), + is_shutdown, + }) + } else if ready.is_empty() { + Poll::Pending + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + is_shutdown, + }) + } + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + is_shutdown, + }) + } + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + // This consumes the current readiness state **except** for closed + // states. Closed states are excluded because they are final states. + let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; + + // result isn't important + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); + } + + pub(crate) fn clear_wakers(&self) { + let mut waiters = self.waiters.lock(); + waiters.reader.take(); + waiters.writer.take(); + } +} + +impl Drop for ScheduledIo { + fn drop(&mut self) { + self.wake(Ready::ALL); + } +} + +unsafe impl Send for ScheduledIo {} +unsafe impl Sync for ScheduledIo {} + +cfg_io_readiness! { + impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers. + pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { + self.readiness_fut(interest).await + } + + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + is_ready: false, + interest, + _p: PhantomPinned, + }), + } + } + } + + unsafe impl linked_list::Link for Waiter { + type Handle = NonNull<Waiter>; + type Target = Waiter; + + fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { + *handle + } + + unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { + ptr + } + + unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { + Waiter::addr_of_pointers(target) + } + } + + // ===== impl Readiness ===== + + impl Future for Readiness<'_> { + type Output = ReadyEvent; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + use std::sync::atomic::Ordering::SeqCst; + + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; + + loop { + match *state { + State::Init => { + // Optimistically check existing readiness + let curr = scheduled_io.readiness.load(SeqCst); + let ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + + // Safety: `waiter.interest` never changes + let interest = unsafe { (*waiter.get()).interest }; + let ready = ready.intersection(interest); + + if !ready.is_empty() || is_shutdown { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); + } + + // Wasn't ready, take the lock (and check again while locked). + let mut waiters = scheduled_io.waiters.lock(); + + let curr = scheduled_io.readiness.load(SeqCst); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + + if is_shutdown { + ready = Ready::ALL; + } + + let ready = ready.intersection(interest); + + if !ready.is_empty() || is_shutdown { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); + } + + // Not ready even after locked, insert into list... + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; + } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.is_ready { + // Our waker has been notified. + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; + + let curr = scheduled_io.readiness.load(Acquire); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + + // The returned tick might be newer than the event + // which notified our waker. This is ok because the future + // still didn't return `Poll::Ready`. + let tick = TICK.unpack(curr) as u8; + + // The readiness state could have been cleared in the meantime, + // but we allow the returned ready set to be empty. + let curr_ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = curr_ready.intersection(w.interest); + + return Poll::Ready(ReadyEvent { + tick, + ready, + is_shutdown, + }); + } + } + } + } + } + + impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock(); + + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } + } + + unsafe impl Send for Readiness<'_> {} + unsafe impl Sync for Readiness<'_> {} +} |