diff options
Diffstat (limited to 'third_party/rust/tokio/src/io/driver/mod.rs')
-rw-r--r-- | third_party/rust/tokio/src/io/driver/mod.rs | 396 |
1 files changed, 396 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/io/driver/mod.rs b/third_party/rust/tokio/src/io/driver/mod.rs new file mode 100644 index 0000000000..d8535d9ab2 --- /dev/null +++ b/third_party/rust/tokio/src/io/driver/mod.rs @@ -0,0 +1,396 @@ +pub(crate) mod platform; + +mod scheduled_io; +pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests + +use crate::loom::sync::atomic::AtomicUsize; +use crate::park::{Park, Unpark}; +use crate::runtime::context; +use crate::util::slab::{Address, Slab}; + +use mio::event::Evented; +use std::fmt; +use std::io; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Weak}; +use std::task::Waker; +use std::time::Duration; + +/// I/O driver, backed by Mio +pub(crate) struct Driver { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, + + _wakeup_registration: mio::Registration, +} + +/// A reference to an I/O driver +#[derive(Clone)] +pub(crate) struct Handle { + inner: Weak<Inner>, +} + +pub(super) struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// Dispatch slabs for I/O and futures events + pub(super) io_dispatch: Slab<ScheduledIo>, + + /// The number of sources in `io_dispatch`. + n_sources: AtomicUsize, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(super) enum Direction { + Read, + Write, +} + +const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub(crate) fn new() -> io::Result<Driver> { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register( + &wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level(), + )?; + + Ok(Driver { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io, + io_dispatch: Slab::new(), + n_sources: AtomicUsize::new(0), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.inner.io.poll(&mut self.events, max_wait) { + Ok(_) => {} + Err(e) => return Err(e), + } + + // Process all the events that came in, dispatching appropriately + + for event in self.events.iter() { + let token = event.token(); + + if token == TOKEN_WAKEUP { + self.inner + .wakeup + .set_readiness(mio::Ready::empty()) + .unwrap(); + } else { + self.dispatch(token, event.readiness()); + } + } + + Ok(()) + } + + fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + let mut rd = None; + let mut wr = None; + + let address = Address::from_usize(token.0); + + let io = match self.inner.io_dispatch.get(address) { + Some(io) => io, + None => return, + }; + + if io + .set_readiness(address, |curr| curr | ready.as_usize()) + .is_err() + { + // token no longer valid! + return; + } + + if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { + wr = io.writer.take_waker(); + } + + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_waker(); + } + + if let Some(w) = rd { + w.wake(); + } + + if let Some(w) = wr { + w.wake(); + } + } +} + +impl Park for Driver { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } +} + +impl fmt::Debug for Driver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Driver") + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set. + pub(super) fn current() -> Self { + context::io_handle() + .expect("there is no reactor running, must be called from the context of Tokio runtime") + } + + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + } + } + + pub(super) fn inner(&self) -> Option<Arc<Inner>> { + self.inner.upgrade() + } +} + +impl Unpark for Handle { + fn unpark(&self) { + self.wakeup(); + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Registers an I/O resource with the reactor. + /// + /// The registration token is returned. + pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> { + let address = self.io_dispatch.alloc().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "reactor at max registered I/O resources", + ) + })?; + + self.n_sources.fetch_add(1, SeqCst); + + self.io.register( + source, + mio::Token(address.to_usize()), + mio::Ready::all(), + mio::PollOpt::edge(), + )?; + + Ok(address) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { + self.io.deregister(source) + } + + pub(super) fn drop_source(&self, address: Address) { + self.io_dispatch.remove(address); + self.n_sources.fetch_sub(1, SeqCst); + } + + /// Registers interest in the I/O resource associated with `token`. + pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) { + let sched = self + .io_dispatch + .get(token) + .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token)); + + let waker = match dir { + Direction::Read => &sched.reader, + Direction::Write => &sched.writer, + }; + + waker.register(w); + } +} + +impl Direction { + pub(super) fn mask(self) -> mio::Ready { + match self { + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(), + } + } +} + +#[cfg(all(test, loom))] +mod tests { + use super::*; + use loom::thread; + + // No-op `Evented` impl just so we can have something to pass to `add_source`. + struct NotEvented; + + impl Evented for NotEvented { + fn register( + &self, + _: &mio::Poll, + _: mio::Token, + _: mio::Ready, + _: mio::PollOpt, + ) -> io::Result<()> { + Ok(()) + } + + fn reregister( + &self, + _: &mio::Poll, + _: mio::Token, + _: mio::Ready, + _: mio::PollOpt, + ) -> io::Result<()> { + Ok(()) + } + + fn deregister(&self, _: &mio::Poll) -> io::Result<()> { + Ok(()) + } + } + + #[test] + fn tokens_unique_when_dropped() { + loom::model(|| { + let reactor = Driver::new().unwrap(); + let inner = reactor.inner; + let inner2 = inner.clone(); + + let token_1 = inner.add_source(&NotEvented).unwrap(); + let thread = thread::spawn(move || { + inner2.drop_source(token_1); + }); + + let token_2 = inner.add_source(&NotEvented).unwrap(); + thread.join().unwrap(); + + assert!(token_1 != token_2); + }) + } + + #[test] + fn tokens_unique_when_dropped_on_full_page() { + loom::model(|| { + let reactor = Driver::new().unwrap(); + let inner = reactor.inner; + let inner2 = inner.clone(); + // add sources to fill up the first page so that the dropped index + // may be reused. + for _ in 0..31 { + inner.add_source(&NotEvented).unwrap(); + } + + let token_1 = inner.add_source(&NotEvented).unwrap(); + let thread = thread::spawn(move || { + inner2.drop_source(token_1); + }); + + let token_2 = inner.add_source(&NotEvented).unwrap(); + thread.join().unwrap(); + + assert!(token_1 != token_2); + }) + } + + #[test] + fn tokens_unique_concurrent_add() { + loom::model(|| { + let reactor = Driver::new().unwrap(); + let inner = reactor.inner; + let inner2 = inner.clone(); + + let thread = thread::spawn(move || { + let token_2 = inner2.add_source(&NotEvented).unwrap(); + token_2 + }); + + let token_1 = inner.add_source(&NotEvented).unwrap(); + let token_2 = thread.join().unwrap(); + + assert!(token_1 != token_2); + }) + } +} |