diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/tokio-current-thread/src/lib.rs | 882 |
1 files changed, 882 insertions, 0 deletions
diff --git a/third_party/rust/tokio-current-thread/src/lib.rs b/third_party/rust/tokio-current-thread/src/lib.rs new file mode 100644 index 0000000000..53392c4bfa --- /dev/null +++ b/third_party/rust/tokio-current-thread/src/lib.rs @@ -0,0 +1,882 @@ +#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.7")] +#![deny(missing_docs, missing_debug_implementations)] + +//! A single-threaded executor which executes tasks on the same thread from which +//! they are spawned. +//! +//! > **Note:** This crate is **deprecated in tokio 0.2.x** and has been moved +//! > and refactored into various places in the [`tokio`] crate. The closest +//! replacement is to make use of [`tokio::task::LocalSet::block_on`] which +//! requires the [`rt-util` feature]. +//! +//! [`tokio`]: https://docs.rs/tokio/latest/tokio/index.html +//! [`tokio::task::LocalSet::block_on`]: https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#method.block_on +//! [`rt-util` feature]: https://docs.rs/tokio/latest/tokio/index.html#feature-flags +//! +//! The crate provides: +//! +//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread. +//! The easiest way to start a new [`CurrentThread`] executor is to call +//! [`block_on_all`] with an initial task to seed the executor. +//! All tasks that are being managed by a [`CurrentThread`] executor are able to +//! spawn additional tasks by calling [`spawn`]. +//! +//! +//! Application authors will not use this crate directly. Instead, they will use the +//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they +//! are building a custom task executor. +//! +//! For more details, see [executor module] documentation in the Tokio crate. +//! +//! [`CurrentThread`]: struct.CurrentThread.html +//! [`spawn`]: fn.spawn.html +//! [`block_on_all`]: fn.block_on_all.html +//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html + +extern crate futures; +extern crate tokio_executor; + +mod scheduler; + +use self::scheduler::Scheduler; + +use tokio_executor::park::{Park, ParkThread, Unpark}; +use tokio_executor::{Enter, SpawnError}; + +use futures::future::{ExecuteError, ExecuteErrorKind, Executor}; +use futures::{executor, Async, Future}; + +use std::cell::Cell; +use std::error::Error; +use std::fmt; +use std::rc::Rc; +use std::sync::{atomic, mpsc, Arc}; +use std::thread; +use std::time::{Duration, Instant}; + +/// Executes tasks on the current thread +pub struct CurrentThread<P: Park = ParkThread> { + /// Execute futures and receive unpark notifications. + scheduler: Scheduler<P::Unpark>, + + /// Current number of futures being executed. + /// + /// The LSB is used to indicate that the runtime is preparing to shut down. + /// Thus, to get the actual number of pending futures, `>>1`. + num_futures: Arc<atomic::AtomicUsize>, + + /// Thread park handle + park: P, + + /// Handle for spawning new futures from other threads + spawn_handle: Handle, + + /// Receiver for futures spawned from other threads + spawn_receiver: mpsc::Receiver<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>, + + /// The thread-local ID assigned to this executor. + id: u64, +} + +/// Executes futures on the current thread. +/// +/// All futures executed using this executor will be executed on the current +/// thread. As such, `run` will wait for these futures to complete before +/// returning. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + // Prevent the handle from moving across threads. + _p: ::std::marker::PhantomData<Rc<()>>, +} + +/// Returned by the `turn` function. +#[derive(Debug)] +pub struct Turn { + polled: bool, +} + +impl Turn { + /// `true` if any futures were polled at all and `false` otherwise. + pub fn has_polled(&self) -> bool { + self.polled + } +} + +/// A `CurrentThread` instance bound to a supplied execution context. +pub struct Entered<'a, P: Park + 'a> { + executor: &'a mut CurrentThread<P>, + enter: &'a mut Enter, +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + _p: (), +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + "Run error" + } +} + +/// Error returned by the `run_timeout` function. +#[derive(Debug)] +pub struct RunTimeoutError { + timeout: bool, +} + +impl fmt::Display for RunTimeoutError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for RunTimeoutError { + fn description(&self) -> &str { + if self.timeout { + "Run timeout error (timeout)" + } else { + "Run timeout error (not timeout)" + } + } +} + +/// Error returned by the `turn` function. +#[derive(Debug)] +pub struct TurnError { + _p: (), +} + +impl fmt::Display for TurnError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for TurnError { + fn description(&self) -> &str { + "Turn error" + } +} + +/// Error returned by the `block_on` function. +#[derive(Debug)] +pub struct BlockError<T> { + inner: Option<T>, +} + +impl<T> fmt::Display for BlockError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Block error") + } +} + +impl<T: fmt::Debug> Error for BlockError<T> { + fn description(&self) -> &str { + "Block error" + } +} + +/// This is mostly split out to make the borrow checker happy. +struct Borrow<'a, U: 'a> { + id: u64, + scheduler: &'a mut Scheduler<U>, + num_futures: &'a atomic::AtomicUsize, +} + +trait SpawnLocal { + fn spawn_local( + &mut self, + future: Box<dyn Future<Item = (), Error = ()>>, + already_counted: bool, + ); +} + +struct CurrentRunner { + spawn: Cell<Option<*mut dyn SpawnLocal>>, + id: Cell<Option<u64>>, +} + +thread_local! { + /// Current thread's task runner. This is set in `TaskRunner::with` + static CURRENT: CurrentRunner = CurrentRunner { + spawn: Cell::new(None), + id: Cell::new(None), + } +} + +thread_local! { + /// Unique ID to assign to each new executor launched on this thread. + /// + /// The unique ID is used to determine if the currently running executor matches the one + /// referred to by a `Handle` so that direct task dispatch can be used. + static EXECUTOR_ID: Cell<u64> = Cell::new(0) +} + +/// Run the executor bootstrapping the execution with the provided future. +/// +/// This creates a new [`CurrentThread`] executor, spawns the provided future, +/// and blocks the current thread until the provided future and **all** +/// subsequently spawned futures complete. In other words: +/// +/// * If the provided bootstrap future does **not** spawn any additional tasks, +/// `block_on_all` returns once `future` completes. +/// * If the provided bootstrap future **does** spawn additional tasks, then +/// `block_on_all` returns once **all** spawned futures complete. +/// +/// See [module level][mod] documentation for more details. +/// +/// [`CurrentThread`]: struct.CurrentThread.html +/// [mod]: index.html +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where + F: Future, +{ + let mut current_thread = CurrentThread::new(); + + let ret = current_thread.block_on(future); + current_thread.run().unwrap(); + + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) +} + +/// Executes a future on the current thread. +/// +/// The provided future must complete or be canceled before `run` will return. +/// +/// Unlike [`tokio::spawn`], this function will always spawn on a +/// `CurrentThread` executor and is able to spawn futures that are not `Send`. +/// +/// # Panics +/// +/// This function can only be invoked from the context of a `run` call; any +/// other use will result in a panic. +/// +/// [`tokio::spawn`]: ../fn.spawn.html +pub fn spawn<F>(future: F) +where + F: Future<Item = (), Error = ()> + 'static, +{ + TaskExecutor::current() + .spawn_local(Box::new(future)) + .unwrap(); +} + +// ===== impl CurrentThread ===== + +impl CurrentThread<ParkThread> { + /// Create a new instance of `CurrentThread`. + pub fn new() -> Self { + CurrentThread::new_with_park(ParkThread::new()) + } +} + +impl<P: Park> CurrentThread<P> { + /// Create a new instance of `CurrentThread` backed by the given park + /// handle. + pub fn new_with_park(park: P) -> Self { + let unpark = park.unpark(); + + let (spawn_sender, spawn_receiver) = mpsc::channel(); + let thread = thread::current().id(); + let id = EXECUTOR_ID.with(|idc| { + let id = idc.get(); + idc.set(id + 1); + id + }); + + let scheduler = Scheduler::new(unpark); + let notify = scheduler.notify(); + + let num_futures = Arc::new(atomic::AtomicUsize::new(0)); + + CurrentThread { + scheduler: scheduler, + num_futures: num_futures.clone(), + park, + id, + spawn_handle: Handle { + sender: spawn_sender, + num_futures: num_futures, + notify: notify, + shut_down: Cell::new(false), + thread: thread, + id, + }, + spawn_receiver: spawn_receiver, + } + } + + /// Returns `true` if the executor is currently idle. + /// + /// An idle executor is defined by not currently having any spawned tasks. + /// + /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, + /// this method may return `true` even though there are more futures to be executed. + pub fn is_idle(&self) -> bool { + self.num_futures.load(atomic::Ordering::SeqCst) <= 1 + } + + /// Spawn the future on the executor. + /// + /// This internally queues the future to be executed once `run` is called. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where + F: Future<Item = (), Error = ()> + 'static, + { + self.borrow().spawn_local(Box::new(future), false); + self + } + + /// Synchronously waits for the provided `future` to complete. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution. + pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>> + where + F: Future, + { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).block_on(future) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).run() + } + + /// Run the executor to completion, blocking the thread until all + /// spawned futures have completed **or** `duration` time has elapsed. + pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).run_timeout(duration) + } + + /// Perform a single iteration of the event loop. + /// + /// This function blocks the current thread even if the executor is idle. + pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).turn(duration) + } + + /// Bind `CurrentThread` instance with an execution context. + pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> { + Entered { + executor: self, + enter, + } + } + + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &P { + &self.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.park + } + + fn borrow(&mut self) -> Borrow<P::Unpark> { + Borrow { + id: self.id, + scheduler: &mut self.scheduler, + num_futures: &*self.num_futures, + } + } + + /// Get a new handle to spawn futures on the executor + /// + /// Different to the executor itself, the handle can be sent to different + /// threads and can be used to spawn futures on the executor. + pub fn handle(&self) -> Handle { + self.spawn_handle.clone() + } +} + +impl<P: Park> Drop for CurrentThread<P> { + fn drop(&mut self) { + // Signal to Handles that no more futures can be spawned by setting LSB. + // + // NOTE: this isn't technically necessary since the send on the mpsc will fail once the + // receiver is dropped, but it's useful to illustrate how clean shutdown will be + // implemented (e.g., by setting the LSB). + let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); + + // TODO: We currently ignore any pending futures at the time we shut down. + // + // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) + // which sets LSB (as above) do make Handle::spawn stop working, and then runs until + // num_futures.load() == 1. + let _ = pending; + } +} + +impl tokio_executor::Executor for CurrentThread { + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + self.borrow().spawn_local(future, false); + Ok(()) + } +} + +impl<T> tokio_executor::TypedExecutor<T> for CurrentThread +where + T: Future<Item = (), Error = ()> + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + self.borrow().spawn_local(Box::new(future), false); + Ok(()) + } +} + +impl<P: Park> fmt::Debug for CurrentThread<P> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("CurrentThread") + .field("scheduler", &self.scheduler) + .field( + "num_futures", + &self.num_futures.load(atomic::Ordering::SeqCst), + ) + .finish() + } +} + +// ===== impl Entered ===== + +impl<'a, P: Park> Entered<'a, P> { + /// Spawn the future on the executor. + /// + /// This internally queues the future to be executed once `run` is called. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where + F: Future<Item = (), Error = ()> + 'static, + { + self.executor.borrow().spawn_local(Box::new(future), false); + self + } + + /// Synchronously waits for the provided `future` to complete. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution. + pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>> + where + F: Future, + { + let mut future = executor::spawn(future); + let notify = self.executor.scheduler.notify(); + + loop { + let res = self + .executor + .borrow() + .enter(self.enter, || future.poll_future_notify(¬ify, 0)); + + match res { + Ok(Async::Ready(e)) => return Ok(e), + Err(e) => return Err(BlockError { inner: Some(e) }), + Ok(Async::NotReady) => {} + } + + self.tick(); + + if let Err(_) = self.executor.park.park() { + return Err(BlockError { inner: None }); + } + } + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.run_timeout2(None).map_err(|_| RunError { _p: () }) + } + + /// Run the executor to completion, blocking the thread until all + /// spawned futures have completed **or** `duration` time has elapsed. + pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { + self.run_timeout2(Some(duration)) + } + + /// Perform a single iteration of the event loop. + /// + /// This function blocks the current thread even if the executor is idle. + pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { + let res = if self.executor.scheduler.has_pending_futures() { + self.executor.park.park_timeout(Duration::from_millis(0)) + } else { + match duration { + Some(duration) => self.executor.park.park_timeout(duration), + None => self.executor.park.park(), + } + }; + + if res.is_err() { + return Err(TurnError { _p: () }); + } + + let polled = self.tick(); + + Ok(Turn { polled }) + } + + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &P { + &self.executor.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.executor.park + } + + fn run_timeout2(&mut self, dur: Option<Duration>) -> Result<(), RunTimeoutError> { + if self.executor.is_idle() { + // Nothing to do + return Ok(()); + } + + let mut time = dur.map(|dur| (Instant::now() + dur, dur)); + + loop { + self.tick(); + + if self.executor.is_idle() { + return Ok(()); + } + + match time { + Some((until, rem)) => { + if let Err(_) = self.executor.park.park_timeout(rem) { + return Err(RunTimeoutError::new(false)); + } + + let now = Instant::now(); + + if now >= until { + return Err(RunTimeoutError::new(true)); + } + + time = Some((until, until - now)); + } + None => { + if let Err(_) = self.executor.park.park() { + return Err(RunTimeoutError::new(false)); + } + } + } + } + } + + /// Returns `true` if any futures were processed + fn tick(&mut self) -> bool { + // Spawn any futures that were spawned from other threads by manually + // looping over the receiver stream + + // FIXME: Slightly ugly but needed to make the borrow checker happy + let (mut borrow, spawn_receiver) = ( + Borrow { + id: self.executor.id, + scheduler: &mut self.executor.scheduler, + num_futures: &*self.executor.num_futures, + }, + &mut self.executor.spawn_receiver, + ); + + while let Ok(future) = spawn_receiver.try_recv() { + borrow.spawn_local(future, true); + } + + // After any pending futures were scheduled, do the actual tick + borrow + .scheduler + .tick(borrow.id, &mut *self.enter, borrow.num_futures) + } +} + +impl<'a, P: Park> fmt::Debug for Entered<'a, P> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Entered") + .field("executor", &self.executor) + .field("enter", &self.enter) + .finish() + } +} + +// ===== impl Handle ===== + +/// Handle to spawn a future on the corresponding `CurrentThread` instance +#[derive(Clone)] +pub struct Handle { + sender: mpsc::Sender<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>, + num_futures: Arc<atomic::AtomicUsize>, + shut_down: Cell<bool>, + notify: executor::NotifyHandle, + thread: thread::ThreadId, + + /// The thread-local ID assigned to this Handle's executor. + id: u64, +} + +// Manual implementation because the Sender does not implement Debug +impl fmt::Debug for Handle { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Handle") + .field("shut_down", &self.shut_down.get()) + .finish() + } +} + +impl Handle { + /// Spawn a future onto the `CurrentThread` instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> + where + F: Future<Item = (), Error = ()> + Send + 'static, + { + if thread::current().id() == self.thread { + let mut e = TaskExecutor::current(); + if e.id() == Some(self.id) { + return e.spawn_local(Box::new(future)); + } + } + + if self.shut_down.get() { + return Err(SpawnError::shutdown()); + } + + // NOTE: += 2 since LSB is the shutdown bit + let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + if pending % 2 == 1 { + // Bring the count back so we still know when the Runtime is idle. + self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst); + + // Once the Runtime is shutting down, we know it won't come back. + self.shut_down.set(true); + + return Err(SpawnError::shutdown()); + } + + self.sender + .send(Box::new(future)) + .expect("CurrentThread does not exist anymore"); + // use 0 for the id, CurrentThread does not make use of it + self.notify.notify(0); + Ok(()) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), SpawnError> { + if self.shut_down.get() { + return Err(SpawnError::shutdown()); + } + + Ok(()) + } +} + +// ===== impl TaskExecutor ===== + +impl TaskExecutor { + /// Returns an executor that executes futures on the current thread. + /// + /// The user of `TaskExecutor` must ensure that when a future is submitted, + /// that it is done within the context of a call to `run`. + /// + /// For more details, see the [module level](index.html) documentation. + pub fn current() -> TaskExecutor { + TaskExecutor { + _p: ::std::marker::PhantomData, + } + } + + /// Get the current executor's thread-local ID. + fn id(&self) -> Option<u64> { + CURRENT.with(|current| current.id.get()) + } + + /// Spawn a future onto the current `CurrentThread` instance. + pub fn spawn_local( + &mut self, + future: Box<dyn Future<Item = (), Error = ()>>, + ) -> Result<(), SpawnError> { + CURRENT.with(|current| match current.spawn.get() { + Some(spawn) => { + unsafe { (*spawn).spawn_local(future, false) }; + Ok(()) + } + None => Err(SpawnError::shutdown()), + }) + } +} + +impl tokio_executor::Executor for TaskExecutor { + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + self.spawn_local(future) + } +} + +impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor +where + F: Future<Item = (), Error = ()> + 'static, +{ + fn spawn(&mut self, future: F) -> Result<(), SpawnError> { + self.spawn_local(Box::new(future)) + } +} + +impl<F> Executor<F> for TaskExecutor +where + F: Future<Item = (), Error = ()> + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + CURRENT.with(|current| match current.spawn.get() { + Some(spawn) => { + unsafe { (*spawn).spawn_local(Box::new(future), false) }; + Ok(()) + } + None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)), + }) + } +} + +// ===== impl Borrow ===== + +impl<'a, U: Unpark> Borrow<'a, U> { + fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R + where + F: FnOnce() -> R, + { + CURRENT.with(|current| { + current.id.set(Some(self.id)); + current.set_spawn(self, || f()) + }) + } +} + +impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> { + fn spawn_local( + &mut self, + future: Box<dyn Future<Item = (), Error = ()>>, + already_counted: bool, + ) { + if !already_counted { + // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down. + // NOTE: += 2 since LSB is the shutdown bit + self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + } + self.scheduler.schedule(future); + } +} + +// ===== impl CurrentRunner ===== + +impl CurrentRunner { + fn set_spawn<F, R>(&self, spawn: &mut dyn SpawnLocal, f: F) -> R + where + F: FnOnce() -> R, + { + struct Reset<'a>(&'a CurrentRunner); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.spawn.set(None); + self.0.id.set(None); + } + } + + let _reset = Reset(self); + + let spawn = unsafe { hide_lt(spawn as *mut dyn SpawnLocal) }; + self.spawn.set(Some(spawn)); + + f() + } +} + +unsafe fn hide_lt<'a>(p: *mut (dyn SpawnLocal + 'a)) -> *mut (dyn SpawnLocal + 'static) { + use std::mem; + mem::transmute(p) +} + +// ===== impl RunTimeoutError ===== + +impl RunTimeoutError { + fn new(timeout: bool) -> Self { + RunTimeoutError { timeout } + } + + /// Returns `true` if the error was caused by the operation timing out. + pub fn is_timeout(&self) -> bool { + self.timeout + } +} + +impl From<tokio_executor::EnterError> for RunTimeoutError { + fn from(_: tokio_executor::EnterError) -> Self { + RunTimeoutError::new(false) + } +} + +// ===== impl BlockError ===== + +impl<T> BlockError<T> { + /// Returns the error yielded by the future being blocked on + pub fn into_inner(self) -> Option<T> { + self.inner + } +} + +impl<T> From<tokio_executor::EnterError> for BlockError<T> { + fn from(_: tokio_executor::EnterError) -> Self { + BlockError { inner: None } + } +} |