//! Event loop that drives Tokio I/O resources. //! //! The reactor is the engine that drives asynchronous I/O resources (like TCP and //! UDP sockets). It is backed by [`mio`] and acts as a bridge between [`mio`] and //! [`futures`]. //! //! The crate provides: //! //! * [`Reactor`] is the main type of this crate. It performs the event loop logic. //! //! * [`Handle`] provides a reference to a reactor instance. //! //! * [`Registration`] and [`PollEvented`] allow third parties to implement I/O //! resources that are driven by the reactor. //! //! Application authors will not use this crate directly. Instead, they will use the //! `tokio` crate. Library authors should only depend on `tokio-reactor` if they //! are building a custom I/O resource. //! //! For more details, see [reactor module] documentation in the Tokio crate. //! //! [`mio`]: http://github.com/carllerche/mio //! [`futures`]: http://github.com/rust-lang-nursery/futures-rs //! [`Reactor`]: struct.Reactor.html //! [`Handle`]: struct.Handle.html //! [`Registration`]: struct.Registration.html //! [`PollEvented`]: struct.PollEvented.html //! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html #![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.3")] #![deny(missing_docs, warnings, missing_debug_implementations)] #[macro_use] extern crate futures; #[macro_use] extern crate log; extern crate mio; extern crate slab; extern crate tokio_executor; extern crate tokio_io; #[cfg(feature = "unstable-futures")] extern crate futures2; mod atomic_task; pub(crate) mod background; mod poll_evented; mod registration; // ===== Public re-exports ===== pub use self::background::{Background, Shutdown}; pub use self::registration::Registration; pub use self::poll_evented::PollEvented; // ===== Private imports ===== use atomic_task::AtomicTask; use tokio_executor::Enter; use tokio_executor::park::{Park, Unpark}; use std::{fmt, usize}; use std::io; use std::mem; use std::cell::RefCell; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; use std::sync::{Arc, Weak, RwLock}; use std::time::{Duration, Instant}; use log::Level; use mio::event::Evented; use slab::Slab; /// The core reactor, or event loop. /// /// The event loop is the main source of blocking in an application which drives /// all other I/O events and notifications happening. Each event loop can have /// multiple handles pointing to it, each of which can then be used to create /// various I/O objects to interact with the event loop in interesting ways. pub struct Reactor { /// Reuse the `mio::Events` value across calls to poll. events: mio::Events, /// State shared between the reactor and the handles. inner: Arc, _wakeup_registration: mio::Registration, } /// A reference to a reactor. /// /// A `Handle` is used for associating I/O objects with an event loop /// explicitly. Typically though you won't end up using a `Handle` that often /// and will instead use the default reactor for the execution context. /// /// By default, most components bind lazily to reactors. /// To get this behavior when manually passing a `Handle`, use `default()`. #[derive(Clone)] pub struct Handle { inner: Option, } /// Like `Handle`, but never `None`. #[derive(Clone)] struct HandlePriv { inner: Weak, } /// Return value from the `turn` method on `Reactor`. /// /// Currently this value doesn't actually provide any functionality, but it may /// in the future give insight into what happened during `turn`. #[derive(Debug)] pub struct Turn { _priv: (), } /// Error returned from `Handle::set_fallback`. #[derive(Clone, Debug)] pub struct SetFallbackError(()); #[deprecated(since = "0.1.2", note = "use SetFallbackError instead")] #[doc(hidden)] pub type SetDefaultError = SetFallbackError; #[test] fn test_handle_size() { use std::mem; assert_eq!(mem::size_of::(), mem::size_of::()); } struct Inner { /// The underlying system event queue. io: mio::Poll, /// ABA guard counter next_aba_guard: AtomicUsize, /// Dispatch slabs for I/O and futures events io_dispatch: RwLock>, /// Used to wake up the reactor from a call to `turn` wakeup: mio::SetReadiness } struct ScheduledIo { aba_guard: usize, readiness: AtomicUsize, reader: AtomicTask, writer: AtomicTask, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub(crate) enum Direction { Read, Write, } /// The global fallback reactor. static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT; /// Tracks the reactor for the current execution context. thread_local!(static CURRENT_REACTOR: RefCell> = RefCell::new(None)); const TOKEN_SHIFT: usize = 22; // Kind of arbitrary, but this reserves some token space for later usage. const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); fn _assert_kinds() { fn _assert() {} _assert::(); } /// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task #[derive(Debug, Clone)] pub(crate) enum Task { Futures1(futures::task::Task), #[cfg(feature = "unstable-futures")] Futures2(futures2::task::Waker), } // ===== impl Reactor ===== /// Set the default reactor for the duration of the closure /// /// # Panics /// /// This function panics if there already is a default reactor set. pub fn with_default(handle: &Handle, enter: &mut Enter, f: F) -> R where F: FnOnce(&mut Enter) -> R { // Ensure that the executor is removed from the thread-local context // when leaving the scope. This handles cases that involve panicking. struct Reset; impl Drop for Reset { fn drop(&mut self) { CURRENT_REACTOR.with(|current| { let mut current = current.borrow_mut(); *current = None; }); } } // This ensures the value for the current reactor gets reset even if there // is a panic. let _r = Reset; CURRENT_REACTOR.with(|current| { { let mut current = current.borrow_mut(); assert!(current.is_none(), "default Tokio reactor already set \ for execution context"); let handle = match handle.as_priv() { Some(handle) => handle, None => { panic!("`handle` does not reference a reactor"); } }; *current = Some(handle.clone()); } f(enter) }) } impl Reactor { /// Creates a new event loop, returning any error that happened during the /// creation. pub fn new() -> io::Result { let io = mio::Poll::new()?; let wakeup_pair = mio::Registration::new2(); io.register(&wakeup_pair.0, TOKEN_WAKEUP, mio::Ready::readable(), mio::PollOpt::level())?; Ok(Reactor { events: mio::Events::with_capacity(1024), _wakeup_registration: wakeup_pair.0, inner: Arc::new(Inner { io: io, next_aba_guard: AtomicUsize::new(0), io_dispatch: RwLock::new(Slab::with_capacity(1)), wakeup: wakeup_pair.1, }), }) } /// Returns a handle to this event loop which can be sent across threads /// and can be used as a proxy to the event loop itself. /// /// Handles are cloneable and clones always refer to the same event loop. /// This handle is typically passed into functions that create I/O objects /// to bind them to this event loop. pub fn handle(&self) -> Handle { Handle { inner: Some(HandlePriv { inner: Arc::downgrade(&self.inner), }), } } /// Configures the fallback handle to be returned from `Handle::default`. /// /// The `Handle::default()` function will by default lazily spin up a global /// thread and run a reactor on this global thread. This behavior is not /// always desirable in all applications, however, and sometimes a different /// fallback reactor is desired. /// /// This function will attempt to globally alter the return value of /// `Handle::default()` to return the `handle` specified rather than a /// lazily initialized global thread. If successful then all future calls to /// `Handle::default()` which would otherwise fall back to the global thread /// will instead return a clone of the handle specified. /// /// # Errors /// /// This function may not always succeed in configuring the fallback handle. /// If this function was previously called (or perhaps concurrently called /// on many threads) only the *first* invocation of this function will /// succeed. All other invocations will return an error. /// /// Additionally if the global reactor thread has already been initialized /// then this function will also return an error. (aka if `Handle::default` /// has been called previously in this program). pub fn set_fallback(&self) -> Result<(), SetFallbackError> { set_fallback(self.handle().into_priv().unwrap()) } /// Performs one iteration of the event loop, blocking on waiting for events /// for at most `max_wait` (forever if `None`). /// /// This method is the primary method of running this reactor and processing /// I/O events that occur. This method executes one iteration of an event /// loop, blocking at most once waiting for events to happen. /// /// If a `max_wait` is specified then the method should block no longer than /// the duration specified, but this shouldn't be used as a super-precise /// timer but rather a "ballpark approximation" /// /// # Return value /// /// This function returns an instance of `Turn` /// /// `Turn` as of today has no extra information with it and can be safely /// discarded. In the future `Turn` may contain information about what /// happened while this reactor blocked. /// /// # Errors /// /// This function may also return any I/O error which occurs when polling /// for readiness of I/O objects with the OS. This is quite unlikely to /// arise and typically mean that things have gone horribly wrong at that /// point. Currently this is primarily only known to happen for internal /// bugs to `tokio` itself. pub fn turn(&mut self, max_wait: Option) -> io::Result { self.poll(max_wait)?; Ok(Turn { _priv: () }) } /// Returns true if the reactor is currently idle. /// /// Idle is defined as all tasks that have been spawned have completed, /// either successfully or with an error. pub fn is_idle(&self) -> bool { self.inner.io_dispatch .read().unwrap() .is_empty() } /// Run this reactor on a background thread. /// /// This function takes ownership, spawns a new thread, and moves the /// reactor to this new thread. It then runs the reactor, driving all /// associated I/O resources, until the `Background` handle is dropped or /// explicitly shutdown. pub fn background(self) -> io::Result { Background::new(self) } fn poll(&mut self, max_wait: Option) -> io::Result<()> { // Block waiting for an event to happen, peeling out how many events // happened. match self.inner.io.poll(&mut self.events, max_wait) { Ok(_) => {} Err(e) => return Err(e), } let start = if log_enabled!(Level::Debug) { Some(Instant::now()) } else { None }; // Process all the events that came in, dispatching appropriately let mut events = 0; for event in self.events.iter() { events += 1; let token = event.token(); trace!("event {:?} {:?}", event.readiness(), event.token()); if token == TOKEN_WAKEUP { self.inner.wakeup.set_readiness(mio::Ready::empty()).unwrap(); } else { self.dispatch(token, event.readiness()); } } if let Some(start) = start { let dur = start.elapsed(); debug!("loop process - {} events, {}.{:03}s", events, dur.as_secs(), dur.subsec_nanos() / 1_000_000); } Ok(()) } fn dispatch(&self, token: mio::Token, ready: mio::Ready) { let aba_guard = token.0 & !MAX_SOURCES; let token = token.0 & MAX_SOURCES; let mut rd = None; let mut wr = None; // Create a scope to ensure that notifying the tasks stays out of the // lock's critical section. { let io_dispatch = self.inner.io_dispatch.read().unwrap(); let io = match io_dispatch.get(token) { Some(io) => io, None => return, }; if aba_guard != io.aba_guard { return; } io.readiness.fetch_or(ready.as_usize(), Relaxed); if ready.is_writable() || platform::is_hup(&ready) { wr = io.writer.take_to_notify(); } if !(ready & (!mio::Ready::writable())).is_empty() { rd = io.reader.take_to_notify(); } } if let Some(task) = rd { task.notify(); } if let Some(task) = wr { task.notify(); } } } impl Park for Reactor { type Unpark = Handle; type Error = io::Error; fn unpark(&self) -> Self::Unpark { self.handle() } fn park(&mut self) -> io::Result<()> { self.turn(None)?; Ok(()) } fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { self.turn(Some(duration))?; Ok(()) } } impl fmt::Debug for Reactor { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Reactor") } } // ===== impl Handle ===== impl Handle { /// Returns a handle to the current reactor. pub fn current() -> Handle { // TODO: Should this panic on error? HandlePriv::try_current() .map(|handle| Handle { inner: Some(handle), }) .unwrap_or(Handle { inner: Some(HandlePriv { inner: Weak::new(), }) }) } fn as_priv(&self) -> Option<&HandlePriv> { self.inner.as_ref() } fn into_priv(self) -> Option { self.inner } fn wakeup(&self) { if let Some(handle) = self.as_priv() { handle.wakeup(); } } } impl Unpark for Handle { fn unpark(&self) { if let Some(ref h) = self.inner { h.wakeup(); } } } impl Default for Handle { /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor. fn default() -> Handle { Handle { inner: None } } } impl fmt::Debug for Handle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Handle") } } fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> { unsafe { let val = handle.into_usize(); match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { Ok(_) => Ok(()), Err(_) => { drop(HandlePriv::from_usize(val)); Err(SetFallbackError(())) } } } } // ===== impl HandlePriv ===== impl HandlePriv { /// Try to get a handle to the current reactor. /// /// Returns `Err` if no handle is found. pub(crate) fn try_current() -> io::Result { CURRENT_REACTOR.with(|current| { match *current.borrow() { Some(ref handle) => Ok(handle.clone()), None => HandlePriv::fallback(), } }) } /// Returns a handle to the fallback reactor. fn fallback() -> io::Result { let mut fallback = HANDLE_FALLBACK.load(SeqCst); // If the fallback hasn't been previously initialized then let's spin // up a helper thread and try to initialize with that. If we can't // actually create a helper thread then we'll just return a "defunct" // handle which will return errors when I/O objects are attempted to be // associated. if fallback == 0 { let reactor = match Reactor::new() { Ok(reactor) => reactor, Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "failed to create reactor")), }; // If we successfully set ourselves as the actual fallback then we // want to `forget` the helper thread to ensure that it persists // globally. If we fail to set ourselves as the fallback that means // that someone was racing with this call to `Handle::default`. // They ended up winning so we'll destroy our helper thread (which // shuts down the thread) and reload the fallback. if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() { let ret = reactor.handle().into_priv().unwrap(); match reactor.background() { Ok(bg) => bg.forget(), // The global handle is fubar, but y'all probably got bigger // problems if a thread can't spawn. Err(_) => {} } return Ok(ret); } fallback = HANDLE_FALLBACK.load(SeqCst); } // At this point our fallback handle global was configured so we use // its value to reify a handle, clone it, and then forget our reified // handle as we don't actually have an owning reference to it. assert!(fallback != 0); let ret = unsafe { let handle = HandlePriv::from_usize(fallback); let ret = handle.clone(); // This prevents `handle` from being dropped and having the ref // count decremented. drop(handle.into_usize()); ret }; Ok(ret) } /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. /// /// This method is intended to be used in situations where a notification /// needs to otherwise be sent to the main reactor. If the reactor is /// currently blocked inside of `turn` then it will wake up and soon return /// after this method has been called. If the reactor is not currently /// blocked in `turn`, then the next call to `turn` will not block and /// return immediately. fn wakeup(&self) { if let Some(inner) = self.inner() { inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); } } fn into_usize(self) -> usize { unsafe { mem::transmute::, usize>(self.inner) } } unsafe fn from_usize(val: usize) -> HandlePriv { let inner = mem::transmute::>(val);; HandlePriv { inner } } fn inner(&self) -> Option> { self.inner.upgrade() } } impl fmt::Debug for HandlePriv { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "HandlePriv") } } // ===== impl Inner ===== impl Inner { /// Register an I/O resource with the reactor. /// /// The registration token is returned. fn add_source(&self, source: &Evented) -> io::Result { // Get an ABA guard value let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); let mut io_dispatch = self.io_dispatch.write().unwrap(); if io_dispatch.len() == MAX_SOURCES { return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \ registered I/O resources")); } // Acquire a write lock let key = io_dispatch.insert(ScheduledIo { aba_guard, readiness: AtomicUsize::new(0), reader: AtomicTask::new(), writer: AtomicTask::new(), }); try!(self.io.register(source, mio::Token(aba_guard | key), mio::Ready::all(), mio::PollOpt::edge())); Ok(key) } /// Deregisters an I/O resource from the reactor. fn deregister_source(&self, source: &Evented) -> io::Result<()> { self.io.deregister(source) } fn drop_source(&self, token: usize) { debug!("dropping I/O source: {}", token); self.io_dispatch.write().unwrap().remove(token); } /// Registers interest in the I/O resource associated with `token`. fn register(&self, token: usize, dir: Direction, t: Task) { debug!("scheduling direction for: {}", token); let io_dispatch = self.io_dispatch.read().unwrap(); let sched = io_dispatch.get(token).unwrap(); let (task, ready) = match dir { Direction::Read => (&sched.reader, !mio::Ready::writable()), Direction::Write => (&sched.writer, mio::Ready::writable()), }; task.register_task(t); if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { task.notify(); } } } impl Drop for Inner { fn drop(&mut self) { // When a reactor is dropped it needs to wake up all blocked tasks as // they'll never receive a notification, and all connected I/O objects // will start returning errors pretty quickly. let io = self.io_dispatch.read().unwrap(); for (_, io) in io.iter() { io.writer.notify(); io.reader.notify(); } } } impl Direction { fn mask(&self) -> mio::Ready { match *self { Direction::Read => { // Everything except writable is signaled through read. mio::Ready::all() - mio::Ready::writable() } Direction::Write => mio::Ready::writable() | platform::hup(), } } } impl Task { fn notify(&self) { match *self { Task::Futures1(ref task) => task.notify(), #[cfg(feature = "unstable-futures")] Task::Futures2(ref waker) => waker.wake(), } } } #[cfg(unix)] mod platform { use mio::Ready; use mio::unix::UnixReady; pub fn hup() -> Ready { UnixReady::hup().into() } pub fn is_hup(ready: &Ready) -> bool { UnixReady::from(*ready).is_hup() } } #[cfg(windows)] mod platform { use mio::Ready; pub fn hup() -> Ready { Ready::empty() } pub fn is_hup(_: &Ready) -> bool { false } } #[cfg(feature = "unstable-futures")] fn lift_async(old: futures::Async) -> futures2::Async { match old { futures::Async::Ready(x) => futures2::Async::Ready(x), futures::Async::NotReady => futures2::Async::Pending, } } #[cfg(feature = "unstable-futures")] fn lower_async(new: futures2::Async) -> futures::Async { match new { futures2::Async::Ready(x) => futures::Async::Ready(x), futures2::Async::Pending => futures::Async::NotReady, } }