summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-reactor/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-reactor/src/lib.rs')
-rw-r--r--third_party/rust/tokio-reactor/src/lib.rs773
1 files changed, 773 insertions, 0 deletions
diff --git a/third_party/rust/tokio-reactor/src/lib.rs b/third_party/rust/tokio-reactor/src/lib.rs
new file mode 100644
index 0000000000..381d77af03
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/lib.rs
@@ -0,0 +1,773 @@
+//! Event loop that drives Tokio I/O resources.
+//!
+//! The reactor is the engine that drives asynchronous I/O resources (like TCP and
+//! UDP sockets). It is backed by [`mio`] and acts as a bridge between [`mio`] and
+//! [`futures`].
+//!
+//! The crate provides:
+//!
+//! * [`Reactor`] is the main type of this crate. It performs the event loop logic.
+//!
+//! * [`Handle`] provides a reference to a reactor instance.
+//!
+//! * [`Registration`] and [`PollEvented`] allow third parties to implement I/O
+//! resources that are driven by the reactor.
+//!
+//! Application authors will not use this crate directly. Instead, they will use the
+//! `tokio` crate. Library authors should only depend on `tokio-reactor` if they
+//! are building a custom I/O resource.
+//!
+//! For more details, see [reactor module] documentation in the Tokio crate.
+//!
+//! [`mio`]: http://github.com/carllerche/mio
+//! [`futures`]: http://github.com/rust-lang-nursery/futures-rs
+//! [`Reactor`]: struct.Reactor.html
+//! [`Handle`]: struct.Handle.html
+//! [`Registration`]: struct.Registration.html
+//! [`PollEvented`]: struct.PollEvented.html
+//! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html
+
+#![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.3")]
+#![deny(missing_docs, warnings, missing_debug_implementations)]
+
+#[macro_use]
+extern crate futures;
+#[macro_use]
+extern crate log;
+extern crate mio;
+extern crate slab;
+extern crate tokio_executor;
+extern crate tokio_io;
+
+#[cfg(feature = "unstable-futures")]
+extern crate futures2;
+
+mod atomic_task;
+pub(crate) mod background;
+mod poll_evented;
+mod registration;
+
+// ===== Public re-exports =====
+
+pub use self::background::{Background, Shutdown};
+pub use self::registration::Registration;
+pub use self::poll_evented::PollEvented;
+
+// ===== Private imports =====
+
+use atomic_task::AtomicTask;
+
+use tokio_executor::Enter;
+use tokio_executor::park::{Park, Unpark};
+
+use std::{fmt, usize};
+use std::io;
+use std::mem;
+use std::cell::RefCell;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
+use std::sync::{Arc, Weak, RwLock};
+use std::time::{Duration, Instant};
+
+use log::Level;
+use mio::event::Evented;
+use slab::Slab;
+
+/// The core reactor, or event loop.
+///
+/// The event loop is the main source of blocking in an application which drives
+/// all other I/O events and notifications happening. Each event loop can have
+/// multiple handles pointing to it, each of which can then be used to create
+/// various I/O objects to interact with the event loop in interesting ways.
+pub struct Reactor {
+ /// Reuse the `mio::Events` value across calls to poll.
+ events: mio::Events,
+
+ /// State shared between the reactor and the handles.
+ inner: Arc<Inner>,
+
+ _wakeup_registration: mio::Registration,
+}
+
+/// A reference to a reactor.
+///
+/// A `Handle` is used for associating I/O objects with an event loop
+/// explicitly. Typically though you won't end up using a `Handle` that often
+/// and will instead use the default reactor for the execution context.
+///
+/// By default, most components bind lazily to reactors.
+/// To get this behavior when manually passing a `Handle`, use `default()`.
+#[derive(Clone)]
+pub struct Handle {
+ inner: Option<HandlePriv>,
+}
+
+/// Like `Handle`, but never `None`.
+#[derive(Clone)]
+struct HandlePriv {
+ inner: Weak<Inner>,
+}
+
+/// Return value from the `turn` method on `Reactor`.
+///
+/// Currently this value doesn't actually provide any functionality, but it may
+/// in the future give insight into what happened during `turn`.
+#[derive(Debug)]
+pub struct Turn {
+ _priv: (),
+}
+
+/// Error returned from `Handle::set_fallback`.
+#[derive(Clone, Debug)]
+pub struct SetFallbackError(());
+
+#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")]
+#[doc(hidden)]
+pub type SetDefaultError = SetFallbackError;
+
+#[test]
+fn test_handle_size() {
+ use std::mem;
+ assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
+}
+
+struct Inner {
+ /// The underlying system event queue.
+ io: mio::Poll,
+
+ /// ABA guard counter
+ next_aba_guard: AtomicUsize,
+
+ /// Dispatch slabs for I/O and futures events
+ io_dispatch: RwLock<Slab<ScheduledIo>>,
+
+ /// Used to wake up the reactor from a call to `turn`
+ wakeup: mio::SetReadiness
+}
+
+struct ScheduledIo {
+ aba_guard: usize,
+ readiness: AtomicUsize,
+ reader: AtomicTask,
+ writer: AtomicTask,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(crate) enum Direction {
+ Read,
+ Write,
+}
+
+/// The global fallback reactor.
+static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT;
+
+/// Tracks the reactor for the current execution context.
+thread_local!(static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None));
+
+const TOKEN_SHIFT: usize = 22;
+
+// Kind of arbitrary, but this reserves some token space for later usage.
+const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
+const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES);
+
+fn _assert_kinds() {
+ fn _assert<T: Send + Sync>() {}
+
+ _assert::<Handle>();
+}
+
+/// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task
+#[derive(Debug, Clone)]
+pub(crate) enum Task {
+ Futures1(futures::task::Task),
+ #[cfg(feature = "unstable-futures")]
+ Futures2(futures2::task::Waker),
+}
+
+// ===== impl Reactor =====
+
+/// Set the default reactor for the duration of the closure
+///
+/// # Panics
+///
+/// This function panics if there already is a default reactor set.
+pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R
+where F: FnOnce(&mut Enter) -> R
+{
+ // Ensure that the executor is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset;
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ CURRENT_REACTOR.with(|current| {
+ let mut current = current.borrow_mut();
+ *current = None;
+ });
+ }
+ }
+
+ // This ensures the value for the current reactor gets reset even if there
+ // is a panic.
+ let _r = Reset;
+
+ CURRENT_REACTOR.with(|current| {
+ {
+ let mut current = current.borrow_mut();
+
+ assert!(current.is_none(), "default Tokio reactor already set \
+ for execution context");
+
+ let handle = match handle.as_priv() {
+ Some(handle) => handle,
+ None => {
+ panic!("`handle` does not reference a reactor");
+ }
+ };
+
+ *current = Some(handle.clone());
+ }
+
+ f(enter)
+ })
+}
+
+impl Reactor {
+ /// Creates a new event loop, returning any error that happened during the
+ /// creation.
+ pub fn new() -> io::Result<Reactor> {
+ let io = mio::Poll::new()?;
+ let wakeup_pair = mio::Registration::new2();
+
+ io.register(&wakeup_pair.0,
+ TOKEN_WAKEUP,
+ mio::Ready::readable(),
+ mio::PollOpt::level())?;
+
+ Ok(Reactor {
+ events: mio::Events::with_capacity(1024),
+ _wakeup_registration: wakeup_pair.0,
+ inner: Arc::new(Inner {
+ io: io,
+ next_aba_guard: AtomicUsize::new(0),
+ io_dispatch: RwLock::new(Slab::with_capacity(1)),
+ wakeup: wakeup_pair.1,
+ }),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ ///
+ /// Handles are cloneable and clones always refer to the same event loop.
+ /// This handle is typically passed into functions that create I/O objects
+ /// to bind them to this event loop.
+ pub fn handle(&self) -> Handle {
+ Handle {
+ inner: Some(HandlePriv {
+ inner: Arc::downgrade(&self.inner),
+ }),
+ }
+ }
+
+ /// Configures the fallback handle to be returned from `Handle::default`.
+ ///
+ /// The `Handle::default()` function will by default lazily spin up a global
+ /// thread and run a reactor on this global thread. This behavior is not
+ /// always desirable in all applications, however, and sometimes a different
+ /// fallback reactor is desired.
+ ///
+ /// This function will attempt to globally alter the return value of
+ /// `Handle::default()` to return the `handle` specified rather than a
+ /// lazily initialized global thread. If successful then all future calls to
+ /// `Handle::default()` which would otherwise fall back to the global thread
+ /// will instead return a clone of the handle specified.
+ ///
+ /// # Errors
+ ///
+ /// This function may not always succeed in configuring the fallback handle.
+ /// If this function was previously called (or perhaps concurrently called
+ /// on many threads) only the *first* invocation of this function will
+ /// succeed. All other invocations will return an error.
+ ///
+ /// Additionally if the global reactor thread has already been initialized
+ /// then this function will also return an error. (aka if `Handle::default`
+ /// has been called previously in this program).
+ pub fn set_fallback(&self) -> Result<(), SetFallbackError> {
+ set_fallback(self.handle().into_priv().unwrap())
+ }
+
+ /// Performs one iteration of the event loop, blocking on waiting for events
+ /// for at most `max_wait` (forever if `None`).
+ ///
+ /// This method is the primary method of running this reactor and processing
+ /// I/O events that occur. This method executes one iteration of an event
+ /// loop, blocking at most once waiting for events to happen.
+ ///
+ /// If a `max_wait` is specified then the method should block no longer than
+ /// the duration specified, but this shouldn't be used as a super-precise
+ /// timer but rather a "ballpark approximation"
+ ///
+ /// # Return value
+ ///
+ /// This function returns an instance of `Turn`
+ ///
+ /// `Turn` as of today has no extra information with it and can be safely
+ /// discarded. In the future `Turn` may contain information about what
+ /// happened while this reactor blocked.
+ ///
+ /// # Errors
+ ///
+ /// This function may also return any I/O error which occurs when polling
+ /// for readiness of I/O objects with the OS. This is quite unlikely to
+ /// arise and typically mean that things have gone horribly wrong at that
+ /// point. Currently this is primarily only known to happen for internal
+ /// bugs to `tokio` itself.
+ pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
+ self.poll(max_wait)?;
+ Ok(Turn { _priv: () })
+ }
+
+ /// Returns true if the reactor is currently idle.
+ ///
+ /// Idle is defined as all tasks that have been spawned have completed,
+ /// either successfully or with an error.
+ pub fn is_idle(&self) -> bool {
+ self.inner.io_dispatch
+ .read().unwrap()
+ .is_empty()
+ }
+
+ /// Run this reactor on a background thread.
+ ///
+ /// This function takes ownership, spawns a new thread, and moves the
+ /// reactor to this new thread. It then runs the reactor, driving all
+ /// associated I/O resources, until the `Background` handle is dropped or
+ /// explicitly shutdown.
+ pub fn background(self) -> io::Result<Background> {
+ Background::new(self)
+ }
+
+ fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
+ // Block waiting for an event to happen, peeling out how many events
+ // happened.
+ match self.inner.io.poll(&mut self.events, max_wait) {
+ Ok(_) => {}
+ Err(e) => return Err(e),
+ }
+
+ let start = if log_enabled!(Level::Debug) {
+ Some(Instant::now())
+ } else {
+ None
+ };
+
+ // Process all the events that came in, dispatching appropriately
+ let mut events = 0;
+ for event in self.events.iter() {
+ events += 1;
+ let token = event.token();
+ trace!("event {:?} {:?}", event.readiness(), event.token());
+
+ if token == TOKEN_WAKEUP {
+ self.inner.wakeup.set_readiness(mio::Ready::empty()).unwrap();
+ } else {
+ self.dispatch(token, event.readiness());
+ }
+ }
+
+ if let Some(start) = start {
+ let dur = start.elapsed();
+ debug!("loop process - {} events, {}.{:03}s",
+ events,
+ dur.as_secs(),
+ dur.subsec_nanos() / 1_000_000);
+ }
+
+ Ok(())
+ }
+
+ fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
+ let aba_guard = token.0 & !MAX_SOURCES;
+ let token = token.0 & MAX_SOURCES;
+
+ let mut rd = None;
+ let mut wr = None;
+
+ // Create a scope to ensure that notifying the tasks stays out of the
+ // lock's critical section.
+ {
+ let io_dispatch = self.inner.io_dispatch.read().unwrap();
+
+ let io = match io_dispatch.get(token) {
+ Some(io) => io,
+ None => return,
+ };
+
+ if aba_guard != io.aba_guard {
+ return;
+ }
+
+ io.readiness.fetch_or(ready.as_usize(), Relaxed);
+
+ if ready.is_writable() || platform::is_hup(&ready) {
+ wr = io.writer.take_to_notify();
+ }
+
+ if !(ready & (!mio::Ready::writable())).is_empty() {
+ rd = io.reader.take_to_notify();
+ }
+ }
+
+ if let Some(task) = rd {
+ task.notify();
+ }
+
+ if let Some(task) = wr {
+ task.notify();
+ }
+ }
+}
+
+impl Park for Reactor {
+ type Unpark = Handle;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.handle()
+ }
+
+ fn park(&mut self) -> io::Result<()> {
+ self.turn(None)?;
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
+ self.turn(Some(duration))?;
+ Ok(())
+ }
+}
+
+impl fmt::Debug for Reactor {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Reactor")
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ /// Returns a handle to the current reactor.
+ pub fn current() -> Handle {
+ // TODO: Should this panic on error?
+ HandlePriv::try_current()
+ .map(|handle| Handle {
+ inner: Some(handle),
+ })
+ .unwrap_or(Handle {
+ inner: Some(HandlePriv {
+ inner: Weak::new(),
+ })
+ })
+ }
+
+ fn as_priv(&self) -> Option<&HandlePriv> {
+ self.inner.as_ref()
+ }
+
+ fn into_priv(self) -> Option<HandlePriv> {
+ self.inner
+ }
+
+ fn wakeup(&self) {
+ if let Some(handle) = self.as_priv() {
+ handle.wakeup();
+ }
+ }
+}
+
+impl Unpark for Handle {
+ fn unpark(&self) {
+ if let Some(ref h) = self.inner {
+ h.wakeup();
+ }
+ }
+}
+
+impl Default for Handle {
+ /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor.
+ fn default() -> Handle {
+ Handle { inner: None }
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
+
+fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> {
+ unsafe {
+ let val = handle.into_usize();
+ match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ drop(HandlePriv::from_usize(val));
+ Err(SetFallbackError(()))
+ }
+ }
+ }
+}
+
+// ===== impl HandlePriv =====
+
+impl HandlePriv {
+ /// Try to get a handle to the current reactor.
+ ///
+ /// Returns `Err` if no handle is found.
+ pub(crate) fn try_current() -> io::Result<HandlePriv> {
+ CURRENT_REACTOR.with(|current| {
+ match *current.borrow() {
+ Some(ref handle) => Ok(handle.clone()),
+ None => HandlePriv::fallback(),
+ }
+ })
+ }
+
+ /// Returns a handle to the fallback reactor.
+ fn fallback() -> io::Result<HandlePriv> {
+ let mut fallback = HANDLE_FALLBACK.load(SeqCst);
+
+ // If the fallback hasn't been previously initialized then let's spin
+ // up a helper thread and try to initialize with that. If we can't
+ // actually create a helper thread then we'll just return a "defunct"
+ // handle which will return errors when I/O objects are attempted to be
+ // associated.
+ if fallback == 0 {
+ let reactor = match Reactor::new() {
+ Ok(reactor) => reactor,
+ Err(_) => return Err(io::Error::new(io::ErrorKind::Other,
+ "failed to create reactor")),
+ };
+
+ // If we successfully set ourselves as the actual fallback then we
+ // want to `forget` the helper thread to ensure that it persists
+ // globally. If we fail to set ourselves as the fallback that means
+ // that someone was racing with this call to `Handle::default`.
+ // They ended up winning so we'll destroy our helper thread (which
+ // shuts down the thread) and reload the fallback.
+ if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() {
+ let ret = reactor.handle().into_priv().unwrap();
+
+ match reactor.background() {
+ Ok(bg) => bg.forget(),
+ // The global handle is fubar, but y'all probably got bigger
+ // problems if a thread can't spawn.
+ Err(_) => {}
+ }
+
+ return Ok(ret);
+ }
+
+ fallback = HANDLE_FALLBACK.load(SeqCst);
+ }
+
+ // At this point our fallback handle global was configured so we use
+ // its value to reify a handle, clone it, and then forget our reified
+ // handle as we don't actually have an owning reference to it.
+ assert!(fallback != 0);
+
+ let ret = unsafe {
+ let handle = HandlePriv::from_usize(fallback);
+ let ret = handle.clone();
+
+ // This prevents `handle` from being dropped and having the ref
+ // count decremented.
+ drop(handle.into_usize());
+
+ ret
+ };
+
+ Ok(ret)
+ }
+
+ /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
+ /// makes the next call to `turn` return immediately.
+ ///
+ /// This method is intended to be used in situations where a notification
+ /// needs to otherwise be sent to the main reactor. If the reactor is
+ /// currently blocked inside of `turn` then it will wake up and soon return
+ /// after this method has been called. If the reactor is not currently
+ /// blocked in `turn`, then the next call to `turn` will not block and
+ /// return immediately.
+ fn wakeup(&self) {
+ if let Some(inner) = self.inner() {
+ inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
+ }
+ }
+
+ fn into_usize(self) -> usize {
+ unsafe {
+ mem::transmute::<Weak<Inner>, usize>(self.inner)
+ }
+ }
+
+ unsafe fn from_usize(val: usize) -> HandlePriv {
+ let inner = mem::transmute::<usize, Weak<Inner>>(val);;
+ HandlePriv { inner }
+ }
+
+ fn inner(&self) -> Option<Arc<Inner>> {
+ self.inner.upgrade()
+ }
+}
+
+impl fmt::Debug for HandlePriv {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "HandlePriv")
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ /// Register an I/O resource with the reactor.
+ ///
+ /// The registration token is returned.
+ fn add_source(&self, source: &Evented)
+ -> io::Result<usize>
+ {
+ // Get an ABA guard value
+ let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed);
+
+ let mut io_dispatch = self.io_dispatch.write().unwrap();
+
+ if io_dispatch.len() == MAX_SOURCES {
+ return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \
+ registered I/O resources"));
+ }
+
+ // Acquire a write lock
+ let key = io_dispatch.insert(ScheduledIo {
+ aba_guard,
+ readiness: AtomicUsize::new(0),
+ reader: AtomicTask::new(),
+ writer: AtomicTask::new(),
+ });
+
+ try!(self.io.register(source,
+ mio::Token(aba_guard | key),
+ mio::Ready::all(),
+ mio::PollOpt::edge()));
+
+ Ok(key)
+ }
+
+ /// Deregisters an I/O resource from the reactor.
+ fn deregister_source(&self, source: &Evented) -> io::Result<()> {
+ self.io.deregister(source)
+ }
+
+ fn drop_source(&self, token: usize) {
+ debug!("dropping I/O source: {}", token);
+ self.io_dispatch.write().unwrap().remove(token);
+ }
+
+ /// Registers interest in the I/O resource associated with `token`.
+ fn register(&self, token: usize, dir: Direction, t: Task) {
+ debug!("scheduling direction for: {}", token);
+ let io_dispatch = self.io_dispatch.read().unwrap();
+ let sched = io_dispatch.get(token).unwrap();
+
+ let (task, ready) = match dir {
+ Direction::Read => (&sched.reader, !mio::Ready::writable()),
+ Direction::Write => (&sched.writer, mio::Ready::writable()),
+ };
+
+ task.register_task(t);
+
+ if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
+ task.notify();
+ }
+ }
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ // When a reactor is dropped it needs to wake up all blocked tasks as
+ // they'll never receive a notification, and all connected I/O objects
+ // will start returning errors pretty quickly.
+ let io = self.io_dispatch.read().unwrap();
+ for (_, io) in io.iter() {
+ io.writer.notify();
+ io.reader.notify();
+ }
+ }
+}
+
+impl Direction {
+ fn mask(&self) -> mio::Ready {
+ match *self {
+ Direction::Read => {
+ // Everything except writable is signaled through read.
+ mio::Ready::all() - mio::Ready::writable()
+ }
+ Direction::Write => mio::Ready::writable() | platform::hup(),
+ }
+ }
+}
+
+impl Task {
+ fn notify(&self) {
+ match *self {
+ Task::Futures1(ref task) => task.notify(),
+
+ #[cfg(feature = "unstable-futures")]
+ Task::Futures2(ref waker) => waker.wake(),
+ }
+ }
+}
+
+#[cfg(unix)]
+mod platform {
+ use mio::Ready;
+ use mio::unix::UnixReady;
+
+ pub fn hup() -> Ready {
+ UnixReady::hup().into()
+ }
+
+ pub fn is_hup(ready: &Ready) -> bool {
+ UnixReady::from(*ready).is_hup()
+ }
+}
+
+#[cfg(windows)]
+mod platform {
+ use mio::Ready;
+
+ pub fn hup() -> Ready {
+ Ready::empty()
+ }
+
+ pub fn is_hup(_: &Ready) -> bool {
+ false
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> {
+ match old {
+ futures::Async::Ready(x) => futures2::Async::Ready(x),
+ futures::Async::NotReady => futures2::Async::Pending,
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> {
+ match new {
+ futures2::Async::Ready(x) => futures::Async::Ready(x),
+ futures2::Async::Pending => futures::Async::NotReady,
+ }
+}