diff options
Diffstat (limited to 'third_party/rust/tokio-reactor/src/lib.rs')
-rw-r--r-- | third_party/rust/tokio-reactor/src/lib.rs | 773 |
1 files changed, 773 insertions, 0 deletions
diff --git a/third_party/rust/tokio-reactor/src/lib.rs b/third_party/rust/tokio-reactor/src/lib.rs new file mode 100644 index 0000000000..381d77af03 --- /dev/null +++ b/third_party/rust/tokio-reactor/src/lib.rs @@ -0,0 +1,773 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! The reactor is the engine that drives asynchronous I/O resources (like TCP and +//! UDP sockets). It is backed by [`mio`] and acts as a bridge between [`mio`] and +//! [`futures`]. +//! +//! The crate provides: +//! +//! * [`Reactor`] is the main type of this crate. It performs the event loop logic. +//! +//! * [`Handle`] provides a reference to a reactor instance. +//! +//! * [`Registration`] and [`PollEvented`] allow third parties to implement I/O +//! resources that are driven by the reactor. +//! +//! Application authors will not use this crate directly. Instead, they will use the +//! `tokio` crate. Library authors should only depend on `tokio-reactor` if they +//! are building a custom I/O resource. +//! +//! For more details, see [reactor module] documentation in the Tokio crate. +//! +//! [`mio`]: http://github.com/carllerche/mio +//! [`futures`]: http://github.com/rust-lang-nursery/futures-rs +//! [`Reactor`]: struct.Reactor.html +//! [`Handle`]: struct.Handle.html +//! [`Registration`]: struct.Registration.html +//! [`PollEvented`]: struct.PollEvented.html +//! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html + +#![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.3")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +#[macro_use] +extern crate futures; +#[macro_use] +extern crate log; +extern crate mio; +extern crate slab; +extern crate tokio_executor; +extern crate tokio_io; + +#[cfg(feature = "unstable-futures")] +extern crate futures2; + +mod atomic_task; +pub(crate) mod background; +mod poll_evented; +mod registration; + +// ===== Public re-exports ===== + +pub use self::background::{Background, Shutdown}; +pub use self::registration::Registration; +pub use self::poll_evented::PollEvented; + +// ===== Private imports ===== + +use atomic_task::AtomicTask; + +use tokio_executor::Enter; +use tokio_executor::park::{Park, Unpark}; + +use std::{fmt, usize}; +use std::io; +use std::mem; +use std::cell::RefCell; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; +use std::sync::{Arc, Weak, RwLock}; +use std::time::{Duration, Instant}; + +use log::Level; +use mio::event::Evented; +use slab::Slab; + +/// The core reactor, or event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +pub struct Reactor { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, + + _wakeup_registration: mio::Registration, +} + +/// A reference to a reactor. +/// +/// A `Handle` is used for associating I/O objects with an event loop +/// explicitly. Typically though you won't end up using a `Handle` that often +/// and will instead use the default reactor for the execution context. +/// +/// By default, most components bind lazily to reactors. +/// To get this behavior when manually passing a `Handle`, use `default()`. +#[derive(Clone)] +pub struct Handle { + inner: Option<HandlePriv>, +} + +/// Like `Handle`, but never `None`. +#[derive(Clone)] +struct HandlePriv { + inner: Weak<Inner>, +} + +/// Return value from the `turn` method on `Reactor`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn { + _priv: (), +} + +/// Error returned from `Handle::set_fallback`. +#[derive(Clone, Debug)] +pub struct SetFallbackError(()); + +#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")] +#[doc(hidden)] +pub type SetDefaultError = SetFallbackError; + +#[test] +fn test_handle_size() { + use std::mem; + assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>()); +} + +struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// ABA guard counter + next_aba_guard: AtomicUsize, + + /// Dispatch slabs for I/O and futures events + io_dispatch: RwLock<Slab<ScheduledIo>>, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness +} + +struct ScheduledIo { + aba_guard: usize, + readiness: AtomicUsize, + reader: AtomicTask, + writer: AtomicTask, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(crate) enum Direction { + Read, + Write, +} + +/// The global fallback reactor. +static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT; + +/// Tracks the reactor for the current execution context. +thread_local!(static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)); + +const TOKEN_SHIFT: usize = 22; + +// Kind of arbitrary, but this reserves some token space for later usage. +const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; +const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +/// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task +#[derive(Debug, Clone)] +pub(crate) enum Task { + Futures1(futures::task::Task), + #[cfg(feature = "unstable-futures")] + Futures2(futures2::task::Waker), +} + +// ===== impl Reactor ===== + +/// Set the default reactor for the duration of the closure +/// +/// # Panics +/// +/// This function panics if there already is a default reactor set. +pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R +where F: FnOnce(&mut Enter) -> R +{ + // Ensure that the executor is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset; + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } + } + + // This ensures the value for the current reactor gets reset even if there + // is a panic. + let _r = Reset; + + CURRENT_REACTOR.with(|current| { + { + let mut current = current.borrow_mut(); + + assert!(current.is_none(), "default Tokio reactor already set \ + for execution context"); + + let handle = match handle.as_priv() { + Some(handle) => handle, + None => { + panic!("`handle` does not reference a reactor"); + } + }; + + *current = Some(handle.clone()); + } + + f(enter) + }) +} + +impl Reactor { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result<Reactor> { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register(&wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level())?; + + Ok(Reactor { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io: io, + next_aba_guard: AtomicUsize::new(0), + io_dispatch: RwLock::new(Slab::with_capacity(1)), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub fn handle(&self) -> Handle { + Handle { + inner: Some(HandlePriv { + inner: Arc::downgrade(&self.inner), + }), + } + } + + /// Configures the fallback handle to be returned from `Handle::default`. + /// + /// The `Handle::default()` function will by default lazily spin up a global + /// thread and run a reactor on this global thread. This behavior is not + /// always desirable in all applications, however, and sometimes a different + /// fallback reactor is desired. + /// + /// This function will attempt to globally alter the return value of + /// `Handle::default()` to return the `handle` specified rather than a + /// lazily initialized global thread. If successful then all future calls to + /// `Handle::default()` which would otherwise fall back to the global thread + /// will instead return a clone of the handle specified. + /// + /// # Errors + /// + /// This function may not always succeed in configuring the fallback handle. + /// If this function was previously called (or perhaps concurrently called + /// on many threads) only the *first* invocation of this function will + /// succeed. All other invocations will return an error. + /// + /// Additionally if the global reactor thread has already been initialized + /// then this function will also return an error. (aka if `Handle::default` + /// has been called previously in this program). + pub fn set_fallback(&self) -> Result<(), SetFallbackError> { + set_fallback(self.handle().into_priv().unwrap()) + } + + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// This method is the primary method of running this reactor and processing + /// I/O events that occur. This method executes one iteration of an event + /// loop, blocking at most once waiting for events to happen. + /// + /// If a `max_wait` is specified then the method should block no longer than + /// the duration specified, but this shouldn't be used as a super-precise + /// timer but rather a "ballpark approximation" + /// + /// # Return value + /// + /// This function returns an instance of `Turn` + /// + /// `Turn` as of today has no extra information with it and can be safely + /// discarded. In the future `Turn` may contain information about what + /// happened while this reactor blocked. + /// + /// # Errors + /// + /// This function may also return any I/O error which occurs when polling + /// for readiness of I/O objects with the OS. This is quite unlikely to + /// arise and typically mean that things have gone horribly wrong at that + /// point. Currently this is primarily only known to happen for internal + /// bugs to `tokio` itself. + pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { + self.poll(max_wait)?; + Ok(Turn { _priv: () }) + } + + /// Returns true if the reactor is currently idle. + /// + /// Idle is defined as all tasks that have been spawned have completed, + /// either successfully or with an error. + pub fn is_idle(&self) -> bool { + self.inner.io_dispatch + .read().unwrap() + .is_empty() + } + + /// Run this reactor on a background thread. + /// + /// This function takes ownership, spawns a new thread, and moves the + /// reactor to this new thread. It then runs the reactor, driving all + /// associated I/O resources, until the `Background` handle is dropped or + /// explicitly shutdown. + pub fn background(self) -> io::Result<Background> { + Background::new(self) + } + + fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.inner.io.poll(&mut self.events, max_wait) { + Ok(_) => {} + Err(e) => return Err(e), + } + + let start = if log_enabled!(Level::Debug) { + Some(Instant::now()) + } else { + None + }; + + // Process all the events that came in, dispatching appropriately + let mut events = 0; + for event in self.events.iter() { + events += 1; + let token = event.token(); + trace!("event {:?} {:?}", event.readiness(), event.token()); + + if token == TOKEN_WAKEUP { + self.inner.wakeup.set_readiness(mio::Ready::empty()).unwrap(); + } else { + self.dispatch(token, event.readiness()); + } + } + + if let Some(start) = start { + let dur = start.elapsed(); + debug!("loop process - {} events, {}.{:03}s", + events, + dur.as_secs(), + dur.subsec_nanos() / 1_000_000); + } + + Ok(()) + } + + fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + let aba_guard = token.0 & !MAX_SOURCES; + let token = token.0 & MAX_SOURCES; + + let mut rd = None; + let mut wr = None; + + // Create a scope to ensure that notifying the tasks stays out of the + // lock's critical section. + { + let io_dispatch = self.inner.io_dispatch.read().unwrap(); + + let io = match io_dispatch.get(token) { + Some(io) => io, + None => return, + }; + + if aba_guard != io.aba_guard { + return; + } + + io.readiness.fetch_or(ready.as_usize(), Relaxed); + + if ready.is_writable() || platform::is_hup(&ready) { + wr = io.writer.take_to_notify(); + } + + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_to_notify(); + } + } + + if let Some(task) = rd { + task.notify(); + } + + if let Some(task) = wr { + task.notify(); + } + } +} + +impl Park for Reactor { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } +} + +impl fmt::Debug for Reactor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Reactor") + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current reactor. + pub fn current() -> Handle { + // TODO: Should this panic on error? + HandlePriv::try_current() + .map(|handle| Handle { + inner: Some(handle), + }) + .unwrap_or(Handle { + inner: Some(HandlePriv { + inner: Weak::new(), + }) + }) + } + + fn as_priv(&self) -> Option<&HandlePriv> { + self.inner.as_ref() + } + + fn into_priv(self) -> Option<HandlePriv> { + self.inner + } + + fn wakeup(&self) { + if let Some(handle) = self.as_priv() { + handle.wakeup(); + } + } +} + +impl Unpark for Handle { + fn unpark(&self) { + if let Some(ref h) = self.inner { + h.wakeup(); + } + } +} + +impl Default for Handle { + /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor. + fn default() -> Handle { + Handle { inner: None } + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Handle") + } +} + +fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> { + unsafe { + let val = handle.into_usize(); + match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { + Ok(_) => Ok(()), + Err(_) => { + drop(HandlePriv::from_usize(val)); + Err(SetFallbackError(())) + } + } + } +} + +// ===== impl HandlePriv ===== + +impl HandlePriv { + /// Try to get a handle to the current reactor. + /// + /// Returns `Err` if no handle is found. + pub(crate) fn try_current() -> io::Result<HandlePriv> { + CURRENT_REACTOR.with(|current| { + match *current.borrow() { + Some(ref handle) => Ok(handle.clone()), + None => HandlePriv::fallback(), + } + }) + } + + /// Returns a handle to the fallback reactor. + fn fallback() -> io::Result<HandlePriv> { + let mut fallback = HANDLE_FALLBACK.load(SeqCst); + + // If the fallback hasn't been previously initialized then let's spin + // up a helper thread and try to initialize with that. If we can't + // actually create a helper thread then we'll just return a "defunct" + // handle which will return errors when I/O objects are attempted to be + // associated. + if fallback == 0 { + let reactor = match Reactor::new() { + Ok(reactor) => reactor, + Err(_) => return Err(io::Error::new(io::ErrorKind::Other, + "failed to create reactor")), + }; + + // If we successfully set ourselves as the actual fallback then we + // want to `forget` the helper thread to ensure that it persists + // globally. If we fail to set ourselves as the fallback that means + // that someone was racing with this call to `Handle::default`. + // They ended up winning so we'll destroy our helper thread (which + // shuts down the thread) and reload the fallback. + if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() { + let ret = reactor.handle().into_priv().unwrap(); + + match reactor.background() { + Ok(bg) => bg.forget(), + // The global handle is fubar, but y'all probably got bigger + // problems if a thread can't spawn. + Err(_) => {} + } + + return Ok(ret); + } + + fallback = HANDLE_FALLBACK.load(SeqCst); + } + + // At this point our fallback handle global was configured so we use + // its value to reify a handle, clone it, and then forget our reified + // handle as we don't actually have an owning reference to it. + assert!(fallback != 0); + + let ret = unsafe { + let handle = HandlePriv::from_usize(fallback); + let ret = handle.clone(); + + // This prevents `handle` from being dropped and having the ref + // count decremented. + drop(handle.into_usize()); + + ret + }; + + Ok(ret) + } + + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + } + } + + fn into_usize(self) -> usize { + unsafe { + mem::transmute::<Weak<Inner>, usize>(self.inner) + } + } + + unsafe fn from_usize(val: usize) -> HandlePriv { + let inner = mem::transmute::<usize, Weak<Inner>>(val);; + HandlePriv { inner } + } + + fn inner(&self) -> Option<Arc<Inner>> { + self.inner.upgrade() + } +} + +impl fmt::Debug for HandlePriv { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "HandlePriv") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Register an I/O resource with the reactor. + /// + /// The registration token is returned. + fn add_source(&self, source: &Evented) + -> io::Result<usize> + { + // Get an ABA guard value + let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); + + let mut io_dispatch = self.io_dispatch.write().unwrap(); + + if io_dispatch.len() == MAX_SOURCES { + return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \ + registered I/O resources")); + } + + // Acquire a write lock + let key = io_dispatch.insert(ScheduledIo { + aba_guard, + readiness: AtomicUsize::new(0), + reader: AtomicTask::new(), + writer: AtomicTask::new(), + }); + + try!(self.io.register(source, + mio::Token(aba_guard | key), + mio::Ready::all(), + mio::PollOpt::edge())); + + Ok(key) + } + + /// Deregisters an I/O resource from the reactor. + fn deregister_source(&self, source: &Evented) -> io::Result<()> { + self.io.deregister(source) + } + + fn drop_source(&self, token: usize) { + debug!("dropping I/O source: {}", token); + self.io_dispatch.write().unwrap().remove(token); + } + + /// Registers interest in the I/O resource associated with `token`. + fn register(&self, token: usize, dir: Direction, t: Task) { + debug!("scheduling direction for: {}", token); + let io_dispatch = self.io_dispatch.read().unwrap(); + let sched = io_dispatch.get(token).unwrap(); + + let (task, ready) = match dir { + Direction::Read => (&sched.reader, !mio::Ready::writable()), + Direction::Write => (&sched.writer, mio::Ready::writable()), + }; + + task.register_task(t); + + if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { + task.notify(); + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // When a reactor is dropped it needs to wake up all blocked tasks as + // they'll never receive a notification, and all connected I/O objects + // will start returning errors pretty quickly. + let io = self.io_dispatch.read().unwrap(); + for (_, io) in io.iter() { + io.writer.notify(); + io.reader.notify(); + } + } +} + +impl Direction { + fn mask(&self) -> mio::Ready { + match *self { + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup(), + } + } +} + +impl Task { + fn notify(&self) { + match *self { + Task::Futures1(ref task) => task.notify(), + + #[cfg(feature = "unstable-futures")] + Task::Futures2(ref waker) => waker.wake(), + } + } +} + +#[cfg(unix)] +mod platform { + use mio::Ready; + use mio::unix::UnixReady; + + pub fn hup() -> Ready { + UnixReady::hup().into() + } + + pub fn is_hup(ready: &Ready) -> bool { + UnixReady::from(*ready).is_hup() + } +} + +#[cfg(windows)] +mod platform { + use mio::Ready; + + pub fn hup() -> Ready { + Ready::empty() + } + + pub fn is_hup(_: &Ready) -> bool { + false + } +} + +#[cfg(feature = "unstable-futures")] +fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> { + match old { + futures::Async::Ready(x) => futures2::Async::Ready(x), + futures::Async::NotReady => futures2::Async::Pending, + } +} + +#[cfg(feature = "unstable-futures")] +fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> { + match new { + futures2::Async::Ready(x) => futures::Async::Ready(x), + futures2::Async::Pending => futures::Async::NotReady, + } +} |