summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/io/driver
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/io/driver
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/io/driver')
-rw-r--r--third_party/rust/tokio/src/io/driver/mod.rs396
-rw-r--r--third_party/rust/tokio/src/io/driver/platform.rs44
-rw-r--r--third_party/rust/tokio/src/io/driver/scheduled_io.rs141
3 files changed, 581 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/io/driver/mod.rs b/third_party/rust/tokio/src/io/driver/mod.rs
new file mode 100644
index 0000000000..d8535d9ab2
--- /dev/null
+++ b/third_party/rust/tokio/src/io/driver/mod.rs
@@ -0,0 +1,396 @@
+pub(crate) mod platform;
+
+mod scheduled_io;
+pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::park::{Park, Unpark};
+use crate::runtime::context;
+use crate::util::slab::{Address, Slab};
+
+use mio::event::Evented;
+use std::fmt;
+use std::io;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Weak};
+use std::task::Waker;
+use std::time::Duration;
+
+/// I/O driver, backed by Mio
+pub(crate) struct Driver {
+ /// Reuse the `mio::Events` value across calls to poll.
+ events: mio::Events,
+
+ /// State shared between the reactor and the handles.
+ inner: Arc<Inner>,
+
+ _wakeup_registration: mio::Registration,
+}
+
+/// A reference to an I/O driver
+#[derive(Clone)]
+pub(crate) struct Handle {
+ inner: Weak<Inner>,
+}
+
+pub(super) struct Inner {
+ /// The underlying system event queue.
+ io: mio::Poll,
+
+ /// Dispatch slabs for I/O and futures events
+ pub(super) io_dispatch: Slab<ScheduledIo>,
+
+ /// The number of sources in `io_dispatch`.
+ n_sources: AtomicUsize,
+
+ /// Used to wake up the reactor from a call to `turn`
+ wakeup: mio::SetReadiness,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(super) enum Direction {
+ Read,
+ Write,
+}
+
+const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
+
+fn _assert_kinds() {
+ fn _assert<T: Send + Sync>() {}
+
+ _assert::<Handle>();
+}
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new event loop, returning any error that happened during the
+ /// creation.
+ pub(crate) fn new() -> io::Result<Driver> {
+ let io = mio::Poll::new()?;
+ let wakeup_pair = mio::Registration::new2();
+
+ io.register(
+ &wakeup_pair.0,
+ TOKEN_WAKEUP,
+ mio::Ready::readable(),
+ mio::PollOpt::level(),
+ )?;
+
+ Ok(Driver {
+ events: mio::Events::with_capacity(1024),
+ _wakeup_registration: wakeup_pair.0,
+ inner: Arc::new(Inner {
+ io,
+ io_dispatch: Slab::new(),
+ n_sources: AtomicUsize::new(0),
+ wakeup: wakeup_pair.1,
+ }),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ ///
+ /// Handles are cloneable and clones always refer to the same event loop.
+ /// This handle is typically passed into functions that create I/O objects
+ /// to bind them to this event loop.
+ pub(crate) fn handle(&self) -> Handle {
+ Handle {
+ inner: Arc::downgrade(&self.inner),
+ }
+ }
+
+ fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
+ // Block waiting for an event to happen, peeling out how many events
+ // happened.
+ match self.inner.io.poll(&mut self.events, max_wait) {
+ Ok(_) => {}
+ Err(e) => return Err(e),
+ }
+
+ // Process all the events that came in, dispatching appropriately
+
+ for event in self.events.iter() {
+ let token = event.token();
+
+ if token == TOKEN_WAKEUP {
+ self.inner
+ .wakeup
+ .set_readiness(mio::Ready::empty())
+ .unwrap();
+ } else {
+ self.dispatch(token, event.readiness());
+ }
+ }
+
+ Ok(())
+ }
+
+ fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
+ let mut rd = None;
+ let mut wr = None;
+
+ let address = Address::from_usize(token.0);
+
+ let io = match self.inner.io_dispatch.get(address) {
+ Some(io) => io,
+ None => return,
+ };
+
+ if io
+ .set_readiness(address, |curr| curr | ready.as_usize())
+ .is_err()
+ {
+ // token no longer valid!
+ return;
+ }
+
+ if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
+ wr = io.writer.take_waker();
+ }
+
+ if !(ready & (!mio::Ready::writable())).is_empty() {
+ rd = io.reader.take_waker();
+ }
+
+ if let Some(w) = rd {
+ w.wake();
+ }
+
+ if let Some(w) = wr {
+ w.wake();
+ }
+ }
+}
+
+impl Park for Driver {
+ type Unpark = Handle;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.handle()
+ }
+
+ fn park(&mut self) -> io::Result<()> {
+ self.turn(None)?;
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
+ self.turn(Some(duration))?;
+ Ok(())
+ }
+}
+
+impl fmt::Debug for Driver {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Driver")
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ /// Returns a handle to the current reactor
+ ///
+ /// # Panics
+ ///
+ /// This function panics if there is no current reactor set.
+ pub(super) fn current() -> Self {
+ context::io_handle()
+ .expect("there is no reactor running, must be called from the context of Tokio runtime")
+ }
+
+ /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
+ /// makes the next call to `turn` return immediately.
+ ///
+ /// This method is intended to be used in situations where a notification
+ /// needs to otherwise be sent to the main reactor. If the reactor is
+ /// currently blocked inside of `turn` then it will wake up and soon return
+ /// after this method has been called. If the reactor is not currently
+ /// blocked in `turn`, then the next call to `turn` will not block and
+ /// return immediately.
+ fn wakeup(&self) {
+ if let Some(inner) = self.inner() {
+ inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
+ }
+ }
+
+ pub(super) fn inner(&self) -> Option<Arc<Inner>> {
+ self.inner.upgrade()
+ }
+}
+
+impl Unpark for Handle {
+ fn unpark(&self) {
+ self.wakeup();
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ /// Registers an I/O resource with the reactor.
+ ///
+ /// The registration token is returned.
+ pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> {
+ let address = self.io_dispatch.alloc().ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::Other,
+ "reactor at max registered I/O resources",
+ )
+ })?;
+
+ self.n_sources.fetch_add(1, SeqCst);
+
+ self.io.register(
+ source,
+ mio::Token(address.to_usize()),
+ mio::Ready::all(),
+ mio::PollOpt::edge(),
+ )?;
+
+ Ok(address)
+ }
+
+ /// Deregisters an I/O resource from the reactor.
+ pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
+ self.io.deregister(source)
+ }
+
+ pub(super) fn drop_source(&self, address: Address) {
+ self.io_dispatch.remove(address);
+ self.n_sources.fetch_sub(1, SeqCst);
+ }
+
+ /// Registers interest in the I/O resource associated with `token`.
+ pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
+ let sched = self
+ .io_dispatch
+ .get(token)
+ .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
+
+ let waker = match dir {
+ Direction::Read => &sched.reader,
+ Direction::Write => &sched.writer,
+ };
+
+ waker.register(w);
+ }
+}
+
+impl Direction {
+ pub(super) fn mask(self) -> mio::Ready {
+ match self {
+ Direction::Read => {
+ // Everything except writable is signaled through read.
+ mio::Ready::all() - mio::Ready::writable()
+ }
+ Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
+ }
+ }
+}
+
+#[cfg(all(test, loom))]
+mod tests {
+ use super::*;
+ use loom::thread;
+
+ // No-op `Evented` impl just so we can have something to pass to `add_source`.
+ struct NotEvented;
+
+ impl Evented for NotEvented {
+ fn register(
+ &self,
+ _: &mio::Poll,
+ _: mio::Token,
+ _: mio::Ready,
+ _: mio::PollOpt,
+ ) -> io::Result<()> {
+ Ok(())
+ }
+
+ fn reregister(
+ &self,
+ _: &mio::Poll,
+ _: mio::Token,
+ _: mio::Ready,
+ _: mio::PollOpt,
+ ) -> io::Result<()> {
+ Ok(())
+ }
+
+ fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
+ Ok(())
+ }
+ }
+
+ #[test]
+ fn tokens_unique_when_dropped() {
+ loom::model(|| {
+ let reactor = Driver::new().unwrap();
+ let inner = reactor.inner;
+ let inner2 = inner.clone();
+
+ let token_1 = inner.add_source(&NotEvented).unwrap();
+ let thread = thread::spawn(move || {
+ inner2.drop_source(token_1);
+ });
+
+ let token_2 = inner.add_source(&NotEvented).unwrap();
+ thread.join().unwrap();
+
+ assert!(token_1 != token_2);
+ })
+ }
+
+ #[test]
+ fn tokens_unique_when_dropped_on_full_page() {
+ loom::model(|| {
+ let reactor = Driver::new().unwrap();
+ let inner = reactor.inner;
+ let inner2 = inner.clone();
+ // add sources to fill up the first page so that the dropped index
+ // may be reused.
+ for _ in 0..31 {
+ inner.add_source(&NotEvented).unwrap();
+ }
+
+ let token_1 = inner.add_source(&NotEvented).unwrap();
+ let thread = thread::spawn(move || {
+ inner2.drop_source(token_1);
+ });
+
+ let token_2 = inner.add_source(&NotEvented).unwrap();
+ thread.join().unwrap();
+
+ assert!(token_1 != token_2);
+ })
+ }
+
+ #[test]
+ fn tokens_unique_concurrent_add() {
+ loom::model(|| {
+ let reactor = Driver::new().unwrap();
+ let inner = reactor.inner;
+ let inner2 = inner.clone();
+
+ let thread = thread::spawn(move || {
+ let token_2 = inner2.add_source(&NotEvented).unwrap();
+ token_2
+ });
+
+ let token_1 = inner.add_source(&NotEvented).unwrap();
+ let token_2 = thread.join().unwrap();
+
+ assert!(token_1 != token_2);
+ })
+ }
+}
diff --git a/third_party/rust/tokio/src/io/driver/platform.rs b/third_party/rust/tokio/src/io/driver/platform.rs
new file mode 100644
index 0000000000..6b27988ce6
--- /dev/null
+++ b/third_party/rust/tokio/src/io/driver/platform.rs
@@ -0,0 +1,44 @@
+pub(crate) use self::sys::*;
+
+#[cfg(unix)]
+mod sys {
+ use mio::unix::UnixReady;
+ use mio::Ready;
+
+ pub(crate) fn hup() -> Ready {
+ UnixReady::hup().into()
+ }
+
+ pub(crate) fn is_hup(ready: Ready) -> bool {
+ UnixReady::from(ready).is_hup()
+ }
+
+ pub(crate) fn error() -> Ready {
+ UnixReady::error().into()
+ }
+
+ pub(crate) fn is_error(ready: Ready) -> bool {
+ UnixReady::from(ready).is_error()
+ }
+}
+
+#[cfg(windows)]
+mod sys {
+ use mio::Ready;
+
+ pub(crate) fn hup() -> Ready {
+ Ready::empty()
+ }
+
+ pub(crate) fn is_hup(_: Ready) -> bool {
+ false
+ }
+
+ pub(crate) fn error() -> Ready {
+ Ready::empty()
+ }
+
+ pub(crate) fn is_error(_: Ready) -> bool {
+ false
+ }
+}
diff --git a/third_party/rust/tokio/src/io/driver/scheduled_io.rs b/third_party/rust/tokio/src/io/driver/scheduled_io.rs
new file mode 100644
index 0000000000..7f6446e3f5
--- /dev/null
+++ b/third_party/rust/tokio/src/io/driver/scheduled_io.rs
@@ -0,0 +1,141 @@
+use crate::loom::future::AtomicWaker;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::util::bit;
+use crate::util::slab::{Address, Entry, Generation};
+
+use std::sync::atomic::Ordering::{AcqRel, Acquire, SeqCst};
+
+#[derive(Debug)]
+pub(crate) struct ScheduledIo {
+ readiness: AtomicUsize,
+ pub(crate) reader: AtomicWaker,
+ pub(crate) writer: AtomicWaker,
+}
+
+const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH);
+
+impl Entry for ScheduledIo {
+ fn generation(&self) -> Generation {
+ unpack_generation(self.readiness.load(SeqCst))
+ }
+
+ fn reset(&self, generation: Generation) -> bool {
+ let mut current = self.readiness.load(Acquire);
+
+ loop {
+ if unpack_generation(current) != generation {
+ return false;
+ }
+
+ let next = PACK.pack(generation.next().to_usize(), 0);
+
+ match self
+ .readiness
+ .compare_exchange(current, next, AcqRel, Acquire)
+ {
+ Ok(_) => break,
+ Err(actual) => current = actual,
+ }
+ }
+
+ drop(self.reader.take_waker());
+ drop(self.writer.take_waker());
+
+ true
+ }
+}
+
+impl Default for ScheduledIo {
+ fn default() -> ScheduledIo {
+ ScheduledIo {
+ readiness: AtomicUsize::new(0),
+ reader: AtomicWaker::new(),
+ writer: AtomicWaker::new(),
+ }
+ }
+}
+
+impl ScheduledIo {
+ #[cfg(all(test, loom))]
+ /// Returns the current readiness value of this `ScheduledIo`, if the
+ /// provided `token` is still a valid access.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `None`.
+ /// Otherwise, this returns the current readiness.
+ pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> {
+ let ready = self.readiness.load(Acquire);
+
+ if unpack_generation(ready) != address.generation() {
+ return None;
+ }
+
+ Some(ready & !PACK.mask())
+ }
+
+ /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
+ /// the current value, returning the previous readiness value.
+ ///
+ /// # Arguments
+ /// - `token`: the token for this `ScheduledIo`.
+ /// - `f`: a closure returning a new readiness value given the previous
+ /// readiness.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `Err`.
+ /// Otherwise, this returns the previous readiness.
+ pub(crate) fn set_readiness(
+ &self,
+ address: Address,
+ f: impl Fn(usize) -> usize,
+ ) -> Result<usize, ()> {
+ let generation = address.generation();
+
+ let mut current = self.readiness.load(Acquire);
+
+ loop {
+ // Check that the generation for this access is still the current
+ // one.
+ if unpack_generation(current) != generation {
+ return Err(());
+ }
+ // Mask out the generation bits so that the modifying function
+ // doesn't see them.
+ let current_readiness = current & mio::Ready::all().as_usize();
+ let new = f(current_readiness);
+
+ debug_assert!(
+ new <= !PACK.max_value(),
+ "new readiness value would overwrite generation bits!"
+ );
+
+ match self.readiness.compare_exchange(
+ current,
+ PACK.pack(generation.to_usize(), new),
+ AcqRel,
+ Acquire,
+ ) {
+ Ok(_) => return Ok(current),
+ // we lost the race, retry!
+ Err(actual) => current = actual,
+ }
+ }
+ }
+}
+
+impl Drop for ScheduledIo {
+ fn drop(&mut self) {
+ self.writer.wake();
+ self.reader.wake();
+ }
+}
+
+fn unpack_generation(src: usize) -> Generation {
+ Generation::new(PACK.unpack(src))
+}