diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/tokio-current-thread/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/CHANGELOG.md | 35 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/Cargo.toml | 28 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/LICENSE | 25 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/README.md | 28 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/src/lib.rs | 882 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/src/scheduler.rs | 770 | ||||
-rw-r--r-- | third_party/rust/tokio-current-thread/tests/current_thread.rs | 835 |
8 files changed, 2604 insertions, 0 deletions
diff --git a/third_party/rust/tokio-current-thread/.cargo-checksum.json b/third_party/rust/tokio-current-thread/.cargo-checksum.json new file mode 100644 index 0000000000..45e8f26c3d --- /dev/null +++ b/third_party/rust/tokio-current-thread/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"1884f80f8edf8b2a2f51526797ff60ec5245f0d6f12087673758783c1a07c887","Cargo.toml":"1eae85f7af5011dc8e81cae7f72f61392953db11e7e9d32ee997d9935422b829","LICENSE":"898b1ae9821e98daf8964c8d6c7f61641f5f5aa78ad500020771c0939ee0dea1","README.md":"070f1287dddae0c7af74de4ef744ef2f821d5ba90b5e4d798b6cb80f5b58624d","src/lib.rs":"db9c246beb52b7091669eee1912867ef523ce8d7dda12b5591ffc3b38090ab07","src/scheduler.rs":"ca05f871266e8c1ad48316f6fa6140c7353fe8d94c50b033a6f61a3daa490ccb","tests/current_thread.rs":"a4d366d9befcb845e111949868a5b807a46de7cf4d9cb32708fa397365fb304a"},"package":"b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e"}
\ No newline at end of file diff --git a/third_party/rust/tokio-current-thread/CHANGELOG.md b/third_party/rust/tokio-current-thread/CHANGELOG.md new file mode 100644 index 0000000000..3433659601 --- /dev/null +++ b/third_party/rust/tokio-current-thread/CHANGELOG.md @@ -0,0 +1,35 @@ +# 0.1.7 (February 4, 2020) + +* Add `tokio 0.2.x` deprecation notice. + +# 0.1.6 (March 22, 2019) + +### Added +- implement `TypedExecutor` (#993). + +# 0.1.5 (March 1, 2019) + +### Fixed +- Documentation typos (#882). + +# 0.1.4 (November 21, 2018) + +* Fix shutdown on idle (#763). + +# 0.1.3 (September 27, 2018) + +* Fix minimal versions + +# 0.1.2 (September 26, 2018) + +* Implement `futures::Executor` for executor types (#563) +* Spawning performance improvements (#565) + +# 0.1.1 (August 6, 2018) + +* Implement `std::Error` for misc error types (#501) +* bugfix: Track tasks pending in spawn queue (#478) + +# 0.1.0 (June 13, 2018) + +* Extract `tokio::executor::current_thread` to a tokio-current-thread crate (#356) diff --git a/third_party/rust/tokio-current-thread/Cargo.toml b/third_party/rust/tokio-current-thread/Cargo.toml new file mode 100644 index 0000000000..eb6ab5d5fe --- /dev/null +++ b/third_party/rust/tokio-current-thread/Cargo.toml @@ -0,0 +1,28 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio-current-thread" +version = "0.1.7" +authors = ["Carl Lerche <me@carllerche.com>"] +description = "Single threaded executor which manage many tasks concurrently on the current thread.\n" +homepage = "https://github.com/tokio-rs/tokio" +documentation = "https://docs.rs/tokio-current-thread/0.1.7/tokio_current_thread" +keywords = ["futures", "tokio"] +categories = ["concurrency", "asynchronous"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.futures] +version = "0.1.19" + +[dependencies.tokio-executor] +version = "0.1.7" diff --git a/third_party/rust/tokio-current-thread/LICENSE b/third_party/rust/tokio-current-thread/LICENSE new file mode 100644 index 0000000000..cdb28b4b56 --- /dev/null +++ b/third_party/rust/tokio-current-thread/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2019 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-current-thread/README.md b/third_party/rust/tokio-current-thread/README.md new file mode 100644 index 0000000000..cb31acaea1 --- /dev/null +++ b/third_party/rust/tokio-current-thread/README.md @@ -0,0 +1,28 @@ +# tokio-current-thread + +Single threaded executor for Tokio. + +> **Note:** This crate is **deprecated in tokio 0.2.x** and has been moved and +> refactored into various places in the [`tokio`] crate. The closest replacement +> is to make use of [`tokio::task::LocalSet::block_on`] which requires the +> [`rt-util` feature]. + +[`tokio`]: https://docs.rs/tokio/latest/tokio/index.html +[`tokio::task::LocalSet::block_on`]: https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#method.block_on +[`rt-util` feature]: https://docs.rs/tokio/latest/tokio/index.html#feature-flags + +[Documentation](https://docs.rs/tokio-current-thread/0.1.6/tokio_current_thread/) + +## Overview + +This crate provides the single threaded executor which execute many tasks concurrently. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-current-thread/src/lib.rs b/third_party/rust/tokio-current-thread/src/lib.rs new file mode 100644 index 0000000000..53392c4bfa --- /dev/null +++ b/third_party/rust/tokio-current-thread/src/lib.rs @@ -0,0 +1,882 @@ +#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.7")] +#![deny(missing_docs, missing_debug_implementations)] + +//! A single-threaded executor which executes tasks on the same thread from which +//! they are spawned. +//! +//! > **Note:** This crate is **deprecated in tokio 0.2.x** and has been moved +//! > and refactored into various places in the [`tokio`] crate. The closest +//! replacement is to make use of [`tokio::task::LocalSet::block_on`] which +//! requires the [`rt-util` feature]. +//! +//! [`tokio`]: https://docs.rs/tokio/latest/tokio/index.html +//! [`tokio::task::LocalSet::block_on`]: https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#method.block_on +//! [`rt-util` feature]: https://docs.rs/tokio/latest/tokio/index.html#feature-flags +//! +//! The crate provides: +//! +//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread. +//! The easiest way to start a new [`CurrentThread`] executor is to call +//! [`block_on_all`] with an initial task to seed the executor. +//! All tasks that are being managed by a [`CurrentThread`] executor are able to +//! spawn additional tasks by calling [`spawn`]. +//! +//! +//! Application authors will not use this crate directly. Instead, they will use the +//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they +//! are building a custom task executor. +//! +//! For more details, see [executor module] documentation in the Tokio crate. +//! +//! [`CurrentThread`]: struct.CurrentThread.html +//! [`spawn`]: fn.spawn.html +//! [`block_on_all`]: fn.block_on_all.html +//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html + +extern crate futures; +extern crate tokio_executor; + +mod scheduler; + +use self::scheduler::Scheduler; + +use tokio_executor::park::{Park, ParkThread, Unpark}; +use tokio_executor::{Enter, SpawnError}; + +use futures::future::{ExecuteError, ExecuteErrorKind, Executor}; +use futures::{executor, Async, Future}; + +use std::cell::Cell; +use std::error::Error; +use std::fmt; +use std::rc::Rc; +use std::sync::{atomic, mpsc, Arc}; +use std::thread; +use std::time::{Duration, Instant}; + +/// Executes tasks on the current thread +pub struct CurrentThread<P: Park = ParkThread> { + /// Execute futures and receive unpark notifications. + scheduler: Scheduler<P::Unpark>, + + /// Current number of futures being executed. + /// + /// The LSB is used to indicate that the runtime is preparing to shut down. + /// Thus, to get the actual number of pending futures, `>>1`. + num_futures: Arc<atomic::AtomicUsize>, + + /// Thread park handle + park: P, + + /// Handle for spawning new futures from other threads + spawn_handle: Handle, + + /// Receiver for futures spawned from other threads + spawn_receiver: mpsc::Receiver<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>, + + /// The thread-local ID assigned to this executor. + id: u64, +} + +/// Executes futures on the current thread. +/// +/// All futures executed using this executor will be executed on the current +/// thread. As such, `run` will wait for these futures to complete before +/// returning. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + // Prevent the handle from moving across threads. + _p: ::std::marker::PhantomData<Rc<()>>, +} + +/// Returned by the `turn` function. +#[derive(Debug)] +pub struct Turn { + polled: bool, +} + +impl Turn { + /// `true` if any futures were polled at all and `false` otherwise. + pub fn has_polled(&self) -> bool { + self.polled + } +} + +/// A `CurrentThread` instance bound to a supplied execution context. +pub struct Entered<'a, P: Park + 'a> { + executor: &'a mut CurrentThread<P>, + enter: &'a mut Enter, +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + _p: (), +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + "Run error" + } +} + +/// Error returned by the `run_timeout` function. +#[derive(Debug)] +pub struct RunTimeoutError { + timeout: bool, +} + +impl fmt::Display for RunTimeoutError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for RunTimeoutError { + fn description(&self) -> &str { + if self.timeout { + "Run timeout error (timeout)" + } else { + "Run timeout error (not timeout)" + } + } +} + +/// Error returned by the `turn` function. +#[derive(Debug)] +pub struct TurnError { + _p: (), +} + +impl fmt::Display for TurnError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for TurnError { + fn description(&self) -> &str { + "Turn error" + } +} + +/// Error returned by the `block_on` function. +#[derive(Debug)] +pub struct BlockError<T> { + inner: Option<T>, +} + +impl<T> fmt::Display for BlockError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Block error") + } +} + +impl<T: fmt::Debug> Error for BlockError<T> { + fn description(&self) -> &str { + "Block error" + } +} + +/// This is mostly split out to make the borrow checker happy. +struct Borrow<'a, U: 'a> { + id: u64, + scheduler: &'a mut Scheduler<U>, + num_futures: &'a atomic::AtomicUsize, +} + +trait SpawnLocal { + fn spawn_local( + &mut self, + future: Box<dyn Future<Item = (), Error = ()>>, + already_counted: bool, + ); +} + +struct CurrentRunner { + spawn: Cell<Option<*mut dyn SpawnLocal>>, + id: Cell<Option<u64>>, +} + +thread_local! { + /// Current thread's task runner. This is set in `TaskRunner::with` + static CURRENT: CurrentRunner = CurrentRunner { + spawn: Cell::new(None), + id: Cell::new(None), + } +} + +thread_local! { + /// Unique ID to assign to each new executor launched on this thread. + /// + /// The unique ID is used to determine if the currently running executor matches the one + /// referred to by a `Handle` so that direct task dispatch can be used. + static EXECUTOR_ID: Cell<u64> = Cell::new(0) +} + +/// Run the executor bootstrapping the execution with the provided future. +/// +/// This creates a new [`CurrentThread`] executor, spawns the provided future, +/// and blocks the current thread until the provided future and **all** +/// subsequently spawned futures complete. In other words: +/// +/// * If the provided bootstrap future does **not** spawn any additional tasks, +/// `block_on_all` returns once `future` completes. +/// * If the provided bootstrap future **does** spawn additional tasks, then +/// `block_on_all` returns once **all** spawned futures complete. +/// +/// See [module level][mod] documentation for more details. +/// +/// [`CurrentThread`]: struct.CurrentThread.html +/// [mod]: index.html +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where + F: Future, +{ + let mut current_thread = CurrentThread::new(); + + let ret = current_thread.block_on(future); + current_thread.run().unwrap(); + + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) +} + +/// Executes a future on the current thread. +/// +/// The provided future must complete or be canceled before `run` will return. +/// +/// Unlike [`tokio::spawn`], this function will always spawn on a +/// `CurrentThread` executor and is able to spawn futures that are not `Send`. +/// +/// # Panics +/// +/// This function can only be invoked from the context of a `run` call; any +/// other use will result in a panic. +/// +/// [`tokio::spawn`]: ../fn.spawn.html +pub fn spawn<F>(future: F) +where + F: Future<Item = (), Error = ()> + 'static, +{ + TaskExecutor::current() + .spawn_local(Box::new(future)) + .unwrap(); +} + +// ===== impl CurrentThread ===== + +impl CurrentThread<ParkThread> { + /// Create a new instance of `CurrentThread`. + pub fn new() -> Self { + CurrentThread::new_with_park(ParkThread::new()) + } +} + +impl<P: Park> CurrentThread<P> { + /// Create a new instance of `CurrentThread` backed by the given park + /// handle. + pub fn new_with_park(park: P) -> Self { + let unpark = park.unpark(); + + let (spawn_sender, spawn_receiver) = mpsc::channel(); + let thread = thread::current().id(); + let id = EXECUTOR_ID.with(|idc| { + let id = idc.get(); + idc.set(id + 1); + id + }); + + let scheduler = Scheduler::new(unpark); + let notify = scheduler.notify(); + + let num_futures = Arc::new(atomic::AtomicUsize::new(0)); + + CurrentThread { + scheduler: scheduler, + num_futures: num_futures.clone(), + park, + id, + spawn_handle: Handle { + sender: spawn_sender, + num_futures: num_futures, + notify: notify, + shut_down: Cell::new(false), + thread: thread, + id, + }, + spawn_receiver: spawn_receiver, + } + } + + /// Returns `true` if the executor is currently idle. + /// + /// An idle executor is defined by not currently having any spawned tasks. + /// + /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, + /// this method may return `true` even though there are more futures to be executed. + pub fn is_idle(&self) -> bool { + self.num_futures.load(atomic::Ordering::SeqCst) <= 1 + } + + /// Spawn the future on the executor. + /// + /// This internally queues the future to be executed once `run` is called. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where + F: Future<Item = (), Error = ()> + 'static, + { + self.borrow().spawn_local(Box::new(future), false); + self + } + + /// Synchronously waits for the provided `future` to complete. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution. + pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>> + where + F: Future, + { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).block_on(future) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).run() + } + + /// Run the executor to completion, blocking the thread until all + /// spawned futures have completed **or** `duration` time has elapsed. + pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).run_timeout(duration) + } + + /// Perform a single iteration of the event loop. + /// + /// This function blocks the current thread even if the executor is idle. + pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { + let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); + self.enter(&mut enter).turn(duration) + } + + /// Bind `CurrentThread` instance with an execution context. + pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> { + Entered { + executor: self, + enter, + } + } + + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &P { + &self.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.park + } + + fn borrow(&mut self) -> Borrow<P::Unpark> { + Borrow { + id: self.id, + scheduler: &mut self.scheduler, + num_futures: &*self.num_futures, + } + } + + /// Get a new handle to spawn futures on the executor + /// + /// Different to the executor itself, the handle can be sent to different + /// threads and can be used to spawn futures on the executor. + pub fn handle(&self) -> Handle { + self.spawn_handle.clone() + } +} + +impl<P: Park> Drop for CurrentThread<P> { + fn drop(&mut self) { + // Signal to Handles that no more futures can be spawned by setting LSB. + // + // NOTE: this isn't technically necessary since the send on the mpsc will fail once the + // receiver is dropped, but it's useful to illustrate how clean shutdown will be + // implemented (e.g., by setting the LSB). + let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); + + // TODO: We currently ignore any pending futures at the time we shut down. + // + // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) + // which sets LSB (as above) do make Handle::spawn stop working, and then runs until + // num_futures.load() == 1. + let _ = pending; + } +} + +impl tokio_executor::Executor for CurrentThread { + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + self.borrow().spawn_local(future, false); + Ok(()) + } +} + +impl<T> tokio_executor::TypedExecutor<T> for CurrentThread +where + T: Future<Item = (), Error = ()> + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + self.borrow().spawn_local(Box::new(future), false); + Ok(()) + } +} + +impl<P: Park> fmt::Debug for CurrentThread<P> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("CurrentThread") + .field("scheduler", &self.scheduler) + .field( + "num_futures", + &self.num_futures.load(atomic::Ordering::SeqCst), + ) + .finish() + } +} + +// ===== impl Entered ===== + +impl<'a, P: Park> Entered<'a, P> { + /// Spawn the future on the executor. + /// + /// This internally queues the future to be executed once `run` is called. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where + F: Future<Item = (), Error = ()> + 'static, + { + self.executor.borrow().spawn_local(Box::new(future), false); + self + } + + /// Synchronously waits for the provided `future` to complete. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution. + pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>> + where + F: Future, + { + let mut future = executor::spawn(future); + let notify = self.executor.scheduler.notify(); + + loop { + let res = self + .executor + .borrow() + .enter(self.enter, || future.poll_future_notify(¬ify, 0)); + + match res { + Ok(Async::Ready(e)) => return Ok(e), + Err(e) => return Err(BlockError { inner: Some(e) }), + Ok(Async::NotReady) => {} + } + + self.tick(); + + if let Err(_) = self.executor.park.park() { + return Err(BlockError { inner: None }); + } + } + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.run_timeout2(None).map_err(|_| RunError { _p: () }) + } + + /// Run the executor to completion, blocking the thread until all + /// spawned futures have completed **or** `duration` time has elapsed. + pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { + self.run_timeout2(Some(duration)) + } + + /// Perform a single iteration of the event loop. + /// + /// This function blocks the current thread even if the executor is idle. + pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { + let res = if self.executor.scheduler.has_pending_futures() { + self.executor.park.park_timeout(Duration::from_millis(0)) + } else { + match duration { + Some(duration) => self.executor.park.park_timeout(duration), + None => self.executor.park.park(), + } + }; + + if res.is_err() { + return Err(TurnError { _p: () }); + } + + let polled = self.tick(); + + Ok(Turn { polled }) + } + + /// Returns a reference to the underlying `Park` instance. + pub fn get_park(&self) -> &P { + &self.executor.park + } + + /// Returns a mutable reference to the underlying `Park` instance. + pub fn get_park_mut(&mut self) -> &mut P { + &mut self.executor.park + } + + fn run_timeout2(&mut self, dur: Option<Duration>) -> Result<(), RunTimeoutError> { + if self.executor.is_idle() { + // Nothing to do + return Ok(()); + } + + let mut time = dur.map(|dur| (Instant::now() + dur, dur)); + + loop { + self.tick(); + + if self.executor.is_idle() { + return Ok(()); + } + + match time { + Some((until, rem)) => { + if let Err(_) = self.executor.park.park_timeout(rem) { + return Err(RunTimeoutError::new(false)); + } + + let now = Instant::now(); + + if now >= until { + return Err(RunTimeoutError::new(true)); + } + + time = Some((until, until - now)); + } + None => { + if let Err(_) = self.executor.park.park() { + return Err(RunTimeoutError::new(false)); + } + } + } + } + } + + /// Returns `true` if any futures were processed + fn tick(&mut self) -> bool { + // Spawn any futures that were spawned from other threads by manually + // looping over the receiver stream + + // FIXME: Slightly ugly but needed to make the borrow checker happy + let (mut borrow, spawn_receiver) = ( + Borrow { + id: self.executor.id, + scheduler: &mut self.executor.scheduler, + num_futures: &*self.executor.num_futures, + }, + &mut self.executor.spawn_receiver, + ); + + while let Ok(future) = spawn_receiver.try_recv() { + borrow.spawn_local(future, true); + } + + // After any pending futures were scheduled, do the actual tick + borrow + .scheduler + .tick(borrow.id, &mut *self.enter, borrow.num_futures) + } +} + +impl<'a, P: Park> fmt::Debug for Entered<'a, P> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Entered") + .field("executor", &self.executor) + .field("enter", &self.enter) + .finish() + } +} + +// ===== impl Handle ===== + +/// Handle to spawn a future on the corresponding `CurrentThread` instance +#[derive(Clone)] +pub struct Handle { + sender: mpsc::Sender<Box<dyn Future<Item = (), Error = ()> + Send + 'static>>, + num_futures: Arc<atomic::AtomicUsize>, + shut_down: Cell<bool>, + notify: executor::NotifyHandle, + thread: thread::ThreadId, + + /// The thread-local ID assigned to this Handle's executor. + id: u64, +} + +// Manual implementation because the Sender does not implement Debug +impl fmt::Debug for Handle { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Handle") + .field("shut_down", &self.shut_down.get()) + .finish() + } +} + +impl Handle { + /// Spawn a future onto the `CurrentThread` instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> + where + F: Future<Item = (), Error = ()> + Send + 'static, + { + if thread::current().id() == self.thread { + let mut e = TaskExecutor::current(); + if e.id() == Some(self.id) { + return e.spawn_local(Box::new(future)); + } + } + + if self.shut_down.get() { + return Err(SpawnError::shutdown()); + } + + // NOTE: += 2 since LSB is the shutdown bit + let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + if pending % 2 == 1 { + // Bring the count back so we still know when the Runtime is idle. + self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst); + + // Once the Runtime is shutting down, we know it won't come back. + self.shut_down.set(true); + + return Err(SpawnError::shutdown()); + } + + self.sender + .send(Box::new(future)) + .expect("CurrentThread does not exist anymore"); + // use 0 for the id, CurrentThread does not make use of it + self.notify.notify(0); + Ok(()) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), SpawnError> { + if self.shut_down.get() { + return Err(SpawnError::shutdown()); + } + + Ok(()) + } +} + +// ===== impl TaskExecutor ===== + +impl TaskExecutor { + /// Returns an executor that executes futures on the current thread. + /// + /// The user of `TaskExecutor` must ensure that when a future is submitted, + /// that it is done within the context of a call to `run`. + /// + /// For more details, see the [module level](index.html) documentation. + pub fn current() -> TaskExecutor { + TaskExecutor { + _p: ::std::marker::PhantomData, + } + } + + /// Get the current executor's thread-local ID. + fn id(&self) -> Option<u64> { + CURRENT.with(|current| current.id.get()) + } + + /// Spawn a future onto the current `CurrentThread` instance. + pub fn spawn_local( + &mut self, + future: Box<dyn Future<Item = (), Error = ()>>, + ) -> Result<(), SpawnError> { + CURRENT.with(|current| match current.spawn.get() { + Some(spawn) => { + unsafe { (*spawn).spawn_local(future, false) }; + Ok(()) + } + None => Err(SpawnError::shutdown()), + }) + } +} + +impl tokio_executor::Executor for TaskExecutor { + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + self.spawn_local(future) + } +} + +impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor +where + F: Future<Item = (), Error = ()> + 'static, +{ + fn spawn(&mut self, future: F) -> Result<(), SpawnError> { + self.spawn_local(Box::new(future)) + } +} + +impl<F> Executor<F> for TaskExecutor +where + F: Future<Item = (), Error = ()> + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + CURRENT.with(|current| match current.spawn.get() { + Some(spawn) => { + unsafe { (*spawn).spawn_local(Box::new(future), false) }; + Ok(()) + } + None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)), + }) + } +} + +// ===== impl Borrow ===== + +impl<'a, U: Unpark> Borrow<'a, U> { + fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R + where + F: FnOnce() -> R, + { + CURRENT.with(|current| { + current.id.set(Some(self.id)); + current.set_spawn(self, || f()) + }) + } +} + +impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> { + fn spawn_local( + &mut self, + future: Box<dyn Future<Item = (), Error = ()>>, + already_counted: bool, + ) { + if !already_counted { + // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down. + // NOTE: += 2 since LSB is the shutdown bit + self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); + } + self.scheduler.schedule(future); + } +} + +// ===== impl CurrentRunner ===== + +impl CurrentRunner { + fn set_spawn<F, R>(&self, spawn: &mut dyn SpawnLocal, f: F) -> R + where + F: FnOnce() -> R, + { + struct Reset<'a>(&'a CurrentRunner); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.spawn.set(None); + self.0.id.set(None); + } + } + + let _reset = Reset(self); + + let spawn = unsafe { hide_lt(spawn as *mut dyn SpawnLocal) }; + self.spawn.set(Some(spawn)); + + f() + } +} + +unsafe fn hide_lt<'a>(p: *mut (dyn SpawnLocal + 'a)) -> *mut (dyn SpawnLocal + 'static) { + use std::mem; + mem::transmute(p) +} + +// ===== impl RunTimeoutError ===== + +impl RunTimeoutError { + fn new(timeout: bool) -> Self { + RunTimeoutError { timeout } + } + + /// Returns `true` if the error was caused by the operation timing out. + pub fn is_timeout(&self) -> bool { + self.timeout + } +} + +impl From<tokio_executor::EnterError> for RunTimeoutError { + fn from(_: tokio_executor::EnterError) -> Self { + RunTimeoutError::new(false) + } +} + +// ===== impl BlockError ===== + +impl<T> BlockError<T> { + /// Returns the error yielded by the future being blocked on + pub fn into_inner(self) -> Option<T> { + self.inner + } +} + +impl<T> From<tokio_executor::EnterError> for BlockError<T> { + fn from(_: tokio_executor::EnterError) -> Self { + BlockError { inner: None } + } +} diff --git a/third_party/rust/tokio-current-thread/src/scheduler.rs b/third_party/rust/tokio-current-thread/src/scheduler.rs new file mode 100644 index 0000000000..76b890eb95 --- /dev/null +++ b/third_party/rust/tokio-current-thread/src/scheduler.rs @@ -0,0 +1,770 @@ +use super::Borrow; +use tokio_executor::park::Unpark; +use tokio_executor::Enter; + +use futures::executor::{self, NotifyHandle, Spawn, UnsafeNotify}; +use futures::{Async, Future}; + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; +use std::sync::{Arc, Weak}; +use std::thread; +use std::usize; + +/// A generic task-aware scheduler. +/// +/// This is used both by `FuturesUnordered` and the current-thread executor. +pub struct Scheduler<U> { + inner: Arc<Inner<U>>, + nodes: List<U>, +} + +pub struct Notify<'a, U: 'a>(&'a Arc<Node<U>>); + +// A linked-list of nodes +struct List<U> { + len: usize, + head: *const Node<U>, + tail: *const Node<U>, +} + +// Scheduler is implemented using two linked lists. The first linked list tracks +// all items managed by a `Scheduler`. This list is stored on the `Scheduler` +// struct and is **not** thread safe. The second linked list is an +// implementation of the intrusive MPSC queue algorithm described by +// 1024cores.net and is stored on `Inner`. This linked list can push items to +// the back concurrently but only one consumer may pop from the front. To +// enforce this requirement, all popping will be performed via fns on +// `Scheduler` that take `&mut self`. +// +// When a item is submitted to the set a node is allocated and inserted in +// both linked lists. This means that all insertion operations **must** be +// originated from `Scheduler` with `&mut self` The next call to `tick` will +// (eventually) see this node and call `poll` on the item. +// +// Nodes are wrapped in `Arc` cells which manage the lifetime of the node. +// However, `Arc` handles are sometimes cast to `*const Node` pointers. +// Specifically, when a node is stored in at least one of the two lists +// described above, this represents a logical `Arc` handle. This is how +// `Scheduler` maintains its reference to all nodes it manages. Each +// `NotifyHandle` instance is an `Arc<Node>` as well. +// +// When `Scheduler` drops, it clears the linked list of all nodes that it +// manages. When doing so, it must attempt to decrement the reference count (by +// dropping an Arc handle). However, it can **only** decrement the reference +// count if the node is not currently stored in the mpsc channel. If the node +// **is** "queued" in the mpsc channel, then the arc reference count cannot be +// decremented. Once the node is popped from the mpsc channel, then the final +// arc reference count can be decremented, thus freeing the node. + +struct Inner<U> { + // Thread unpark handle + unpark: U, + + // Tick number + tick_num: AtomicUsize, + + // Head/tail of the readiness queue + head_readiness: AtomicPtr<Node<U>>, + tail_readiness: UnsafeCell<*const Node<U>>, + + // Used as part of the mpsc queue algorithm + stub: Arc<Node<U>>, +} + +unsafe impl<U: Sync + Send> Send for Inner<U> {} +unsafe impl<U: Sync + Send> Sync for Inner<U> {} + +impl<U: Unpark> executor::Notify for Inner<U> { + fn notify(&self, _: usize) { + self.unpark.unpark(); + } +} + +struct Node<U> { + // The item + item: UnsafeCell<Option<Task>>, + + // The tick at which this node was notified + notified_at: AtomicUsize, + + // Next pointer for linked list tracking all active nodes + next_all: UnsafeCell<*const Node<U>>, + + // Previous node in linked list tracking all active nodes + prev_all: UnsafeCell<*const Node<U>>, + + // Next pointer in readiness queue + next_readiness: AtomicPtr<Node<U>>, + + // Whether or not this node is currently in the mpsc queue. + queued: AtomicBool, + + // Queue that we'll be enqueued to when notified + queue: Weak<Inner<U>>, +} + +/// Returned by `Inner::dequeue`, representing either a dequeue success (with +/// the dequeued node), an empty list, or an inconsistent state. +/// +/// The inconsistent state is described in more detail at [1024cores], but +/// roughly indicates that a node will be ready to dequeue sometime shortly in +/// the future and the caller should try again soon. +/// +/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +enum Dequeue<U> { + Data(*const Node<U>), + Empty, + Yield, + Inconsistent, +} + +/// Wraps a spawned boxed future +struct Task(Spawn<Box<dyn Future<Item = (), Error = ()>>>); + +/// A task that is scheduled. `turn` must be called +pub struct Scheduled<'a, U: 'a> { + task: &'a mut Task, + notify: &'a Notify<'a, U>, + done: &'a mut bool, +} + +impl<U> Scheduler<U> +where + U: Unpark, +{ + /// Constructs a new, empty `Scheduler` + /// + /// The returned `Scheduler` does not contain any items and, in this + /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`. + pub fn new(unpark: U) -> Self { + let stub = Arc::new(Node { + item: UnsafeCell::new(None), + notified_at: AtomicUsize::new(0), + next_all: UnsafeCell::new(ptr::null()), + prev_all: UnsafeCell::new(ptr::null()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Weak::new(), + }); + let stub_ptr = &*stub as *const Node<U>; + let inner = Arc::new(Inner { + unpark, + tick_num: AtomicUsize::new(0), + head_readiness: AtomicPtr::new(stub_ptr as *mut _), + tail_readiness: UnsafeCell::new(stub_ptr), + stub: stub, + }); + + Scheduler { + inner: inner, + nodes: List::new(), + } + } + + pub fn notify(&self) -> NotifyHandle { + self.inner.clone().into() + } + + pub fn schedule(&mut self, item: Box<dyn Future<Item = (), Error = ()>>) { + // Get the current scheduler tick + let tick_num = self.inner.tick_num.load(SeqCst); + + let node = Arc::new(Node { + item: UnsafeCell::new(Some(Task::new(item))), + notified_at: AtomicUsize::new(tick_num), + next_all: UnsafeCell::new(ptr::null_mut()), + prev_all: UnsafeCell::new(ptr::null_mut()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Arc::downgrade(&self.inner), + }); + + // Right now our node has a strong reference count of 1. We transfer + // ownership of this reference count to our internal linked list + // and we'll reclaim ownership through the `unlink` function below. + let ptr = self.nodes.push_back(node); + + // We'll need to get the item "into the system" to start tracking it, + // e.g. getting its unpark notifications going to us tracking which + // items are ready. To do that we unconditionally enqueue it for + // polling here. + self.inner.enqueue(ptr); + } + + /// Returns `true` if there are currently any pending futures + pub fn has_pending_futures(&mut self) -> bool { + // See function definition for why the unsafe is needed and + // correctly used here + unsafe { self.inner.has_pending_futures() } + } + + /// Advance the scheduler state, returning `true` if any futures were + /// processed. + /// + /// This function should be called whenever the caller is notified via a + /// wakeup. + pub fn tick(&mut self, eid: u64, enter: &mut Enter, num_futures: &AtomicUsize) -> bool { + let mut ret = false; + let tick = self.inner.tick_num.fetch_add(1, SeqCst).wrapping_add(1); + + loop { + let node = match unsafe { self.inner.dequeue(Some(tick)) } { + Dequeue::Empty => { + return ret; + } + Dequeue::Yield => { + self.inner.unpark.unpark(); + return ret; + } + Dequeue::Inconsistent => { + thread::yield_now(); + continue; + } + Dequeue::Data(node) => node, + }; + + ret = true; + + debug_assert!(node != self.inner.stub()); + + unsafe { + if (*(*node).item.get()).is_none() { + // The node has already been released. However, while it was + // being released, another thread notified it, which + // resulted in it getting pushed into the mpsc channel. + // + // In this case, we just decrement the ref count. + let node = ptr2arc(node); + assert!((*node.next_all.get()).is_null()); + assert!((*node.prev_all.get()).is_null()); + continue; + }; + + // We're going to need to be very careful if the `poll` + // function below panics. We need to (a) not leak memory and + // (b) ensure that we still don't have any use-after-frees. To + // manage this we do a few things: + // + // * This "bomb" here will call `release_node` if dropped + // abnormally. That way we'll be sure the memory management + // of the `node` is managed correctly. + // + // * We unlink the node from our internal queue to preemptively + // assume is is complete (will return Ready or panic), in + // which case we'll want to discard it regardless. + // + struct Bomb<'a, U: Unpark + 'a> { + borrow: &'a mut Borrow<'a, U>, + enter: &'a mut Enter, + node: Option<Arc<Node<U>>>, + } + + impl<'a, U: Unpark> Drop for Bomb<'a, U> { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + self.borrow.enter(self.enter, || release_node(node)) + } + } + } + + let node = self.nodes.remove(node); + + let mut borrow = Borrow { + id: eid, + scheduler: self, + num_futures, + }; + + let mut bomb = Bomb { + node: Some(node), + enter: enter, + borrow: &mut borrow, + }; + + let mut done = false; + + // Now that the bomb holds the node, create a new scope. This + // scope ensures that the borrow will go out of scope before we + // mutate the node pointer in `bomb` again + { + let node = bomb.node.as_ref().unwrap(); + + // Get a reference to the inner future. We already ensured + // that the item `is_some`. + let item = (*node.item.get()).as_mut().unwrap(); + + // Unset queued flag... this must be done before + // polling. This ensures that the item gets + // rescheduled if it is notified **during** a call + // to `poll`. + let prev = (*node).queued.swap(false, SeqCst); + assert!(prev); + + // Poll the underlying item with the appropriate `notify` + // implementation. This is where a large bit of the unsafety + // starts to stem from internally. The `notify` instance itself + // is basically just our `Arc<Node>` and tracks the mpsc + // queue of ready items. + // + // Critically though `Node` won't actually access `Task`, the + // item, while it's floating around inside of `Task` + // instances. These structs will basically just use `T` to size + // the internal allocation, appropriately accessing fields and + // deallocating the node if need be. + let borrow = &mut *bomb.borrow; + let enter = &mut *bomb.enter; + let notify = Notify(bomb.node.as_ref().unwrap()); + + let mut scheduled = Scheduled { + task: item, + notify: ¬ify, + done: &mut done, + }; + + if borrow.enter(enter, || scheduled.tick()) { + // we have a borrow of the Runtime, so we know it's not shut down + borrow.num_futures.fetch_sub(2, SeqCst); + } + } + + if !done { + // The future is not done, push it back into the "all + // node" list. + let node = bomb.node.take().unwrap(); + bomb.borrow.scheduler.nodes.push_back(node); + } + } + } + } +} + +impl<'a, U: Unpark> Scheduled<'a, U> { + /// Polls the task, returns `true` if the task has completed. + pub fn tick(&mut self) -> bool { + // Tick the future + let ret = match self.task.0.poll_future_notify(self.notify, 0) { + Ok(Async::Ready(_)) | Err(_) => true, + Ok(Async::NotReady) => false, + }; + + *self.done = ret; + ret + } +} + +impl Task { + pub fn new(future: Box<dyn Future<Item = (), Error = ()> + 'static>) -> Self { + Task(executor::spawn(future)) + } +} + +impl fmt::Debug for Task { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Task").finish() + } +} + +fn release_node<U>(node: Arc<Node<U>>) { + // The item is done, try to reset the queued flag. This will prevent + // `notify` from doing any work in the item + let prev = node.queued.swap(true, SeqCst); + + // Drop the item, even if it hasn't finished yet. This is safe + // because we're dropping the item on the thread that owns + // `Scheduler`, which correctly tracks T's lifetimes and such. + unsafe { + drop((*node.item.get()).take()); + } + + // If the queued flag was previously set then it means that this node + // is still in our internal mpsc queue. We then transfer ownership + // of our reference count to the mpsc queue, and it'll come along and + // free it later, noticing that the item is `None`. + // + // If, however, the queued flag was *not* set then we're safe to + // release our reference count on the internal node. The queued flag + // was set above so all item `enqueue` operations will not actually + // enqueue the node, so our node will never see the mpsc queue again. + // The node itself will be deallocated once all reference counts have + // been dropped by the various owning tasks elsewhere. + if prev { + mem::forget(node); + } +} + +impl<U> Debug for Scheduler<U> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Scheduler {{ ... }}") + } +} + +impl<U> Drop for Scheduler<U> { + fn drop(&mut self) { + // When a `Scheduler` is dropped we want to drop all items associated + // with it. At the same time though there may be tons of `Task` handles + // flying around which contain `Node` references inside them. We'll + // let those naturally get deallocated when the `Task` itself goes out + // of scope or gets notified. + while let Some(node) = self.nodes.pop_front() { + release_node(node); + } + + // Note that at this point we could still have a bunch of nodes in the + // mpsc queue. None of those nodes, however, have items associated + // with them so they're safe to destroy on any thread. At this point + // the `Scheduler` struct, the owner of the one strong reference + // to `Inner` will drop the strong reference. At that point + // whichever thread releases the strong refcount last (be it this + // thread or some other thread as part of an `upgrade`) will clear out + // the mpsc queue and free all remaining nodes. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. + } +} + +impl<U> Inner<U> { + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. + fn enqueue(&self, node: *const Node<U>) { + unsafe { + debug_assert!((*node).queued.load(Relaxed)); + + // This action does not require any coordination + (*node).next_readiness.store(ptr::null_mut(), Relaxed); + + // Note that these atomic orderings come from 1024cores + let node = node as *mut _; + let prev = self.head_readiness.swap(node, AcqRel); + (*prev).next_readiness.store(node, Release); + } + } + + /// Returns `true` if there are currently any pending futures + /// + /// See `dequeue` for an explanation why this function is unsafe. + unsafe fn has_pending_futures(&self) -> bool { + let tail = *self.tail_readiness.get(); + let next = (*tail).next_readiness.load(Acquire); + + if tail == self.stub() { + if next.is_null() { + return false; + } + } + + true + } + + /// The dequeue function from the 1024cores intrusive MPSC queue algorithm + /// + /// Note that this unsafe as it required mutual exclusion (only one thread + /// can call this) to be guaranteed elsewhere. + unsafe fn dequeue(&self, tick: Option<usize>) -> Dequeue<U> { + let mut tail = *self.tail_readiness.get(); + let mut next = (*tail).next_readiness.load(Acquire); + + if tail == self.stub() { + if next.is_null() { + return Dequeue::Empty; + } + + *self.tail_readiness.get() = next; + tail = next; + next = (*next).next_readiness.load(Acquire); + } + + if let Some(tick) = tick { + let actual = (*tail).notified_at.load(SeqCst); + + // Only dequeue if the node was not scheduled during the current + // tick. + if actual == tick { + // Only doing the check above **should** be enough in + // practice. However, technically there is a potential for + // deadlocking if there are `usize::MAX` ticks while the thread + // scheduling the task is frozen. + // + // If, for some reason, this is not enough, calling `unpark` + // here will resolve the issue. + return Dequeue::Yield; + } + } + + if !next.is_null() { + *self.tail_readiness.get() = next; + debug_assert!(tail != self.stub()); + return Dequeue::Data(tail); + } + + if self.head_readiness.load(Acquire) as *const _ != tail { + return Dequeue::Inconsistent; + } + + self.enqueue(self.stub()); + + next = (*tail).next_readiness.load(Acquire); + + if !next.is_null() { + *self.tail_readiness.get() = next; + return Dequeue::Data(tail); + } + + Dequeue::Inconsistent + } + + fn stub(&self) -> *const Node<U> { + &*self.stub + } +} + +impl<U> Drop for Inner<U> { + fn drop(&mut self) { + // Once we're in the destructor for `Inner` we need to clear out the + // mpsc queue of nodes if there's anything left in there. + // + // Note that each node has a strong reference count associated with it + // which is owned by the mpsc queue. All nodes should have had their + // items dropped already by the `Scheduler` destructor above, + // so we're just pulling out nodes and dropping their refcounts. + unsafe { + loop { + match self.dequeue(None) { + Dequeue::Empty => break, + Dequeue::Yield => unreachable!(), + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(ptr2arc(ptr)), + } + } + } + } +} + +impl<U> List<U> { + fn new() -> Self { + List { + len: 0, + head: ptr::null_mut(), + tail: ptr::null_mut(), + } + } + + /// Appends an element to the back of the list + fn push_back(&mut self, node: Arc<Node<U>>) -> *const Node<U> { + let ptr = arc2ptr(node); + + unsafe { + // Point to the current last node in the list + *(*ptr).prev_all.get() = self.tail; + *(*ptr).next_all.get() = ptr::null_mut(); + + if !self.tail.is_null() { + *(*self.tail).next_all.get() = ptr; + self.tail = ptr; + } else { + // This is the first node + self.tail = ptr; + self.head = ptr; + } + } + + self.len += 1; + + return ptr; + } + + /// Pop an element from the front of the list + fn pop_front(&mut self) -> Option<Arc<Node<U>>> { + if self.head.is_null() { + // The list is empty + return None; + } + + self.len -= 1; + + unsafe { + // Convert the ptr to Arc<_> + let node = ptr2arc(self.head); + + // Update the head pointer + self.head = *node.next_all.get(); + + // If the pointer is null, then the list is empty + if self.head.is_null() { + self.tail = ptr::null_mut(); + } else { + *(*self.head).prev_all.get() = ptr::null_mut(); + } + + Some(node) + } + } + + /// Remove a specific node + unsafe fn remove(&mut self, node: *const Node<U>) -> Arc<Node<U>> { + let node = ptr2arc(node); + let next = *node.next_all.get(); + let prev = *node.prev_all.get(); + *node.next_all.get() = ptr::null_mut(); + *node.prev_all.get() = ptr::null_mut(); + + if !next.is_null() { + *(*next).prev_all.get() = prev; + } else { + self.tail = prev; + } + + if !prev.is_null() { + *(*prev).next_all.get() = next; + } else { + self.head = next; + } + + self.len -= 1; + + return node; + } +} + +impl<'a, U> Clone for Notify<'a, U> { + fn clone(&self) -> Self { + Notify(self.0) + } +} + +impl<'a, U> fmt::Debug for Notify<'a, U> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Notify").finish() + } +} + +impl<'a, U: Unpark> From<Notify<'a, U>> for NotifyHandle { + fn from(handle: Notify<'a, U>) -> NotifyHandle { + unsafe { + let ptr = handle.0.clone(); + let ptr = mem::transmute::<Arc<Node<U>>, *mut ArcNode<U>>(ptr); + NotifyHandle::new(hide_lt(ptr)) + } + } +} + +struct ArcNode<U>(PhantomData<U>); + +// We should never touch `Task` on any thread other than the one owning +// `Scheduler`, so this should be a safe operation. +unsafe impl<U: Sync + Send> Send for ArcNode<U> {} +unsafe impl<U: Sync + Send> Sync for ArcNode<U> {} + +impl<U: Unpark> executor::Notify for ArcNode<U> { + fn notify(&self, _id: usize) { + unsafe { + let me: *const ArcNode<U> = self; + let me: *const *const ArcNode<U> = &me; + let me = me as *const Arc<Node<U>>; + Node::notify(&*me) + } + } +} + +unsafe impl<U: Unpark> UnsafeNotify for ArcNode<U> { + unsafe fn clone_raw(&self) -> NotifyHandle { + let me: *const ArcNode<U> = self; + let me: *const *const ArcNode<U> = &me; + let me = &*(me as *const Arc<Node<U>>); + Notify(me).into() + } + + unsafe fn drop_raw(&self) { + let mut me: *const ArcNode<U> = self; + let me = &mut me as *mut *const ArcNode<U> as *mut Arc<Node<U>>; + ptr::drop_in_place(me); + } +} + +unsafe fn hide_lt<U: Unpark>(p: *mut ArcNode<U>) -> *mut dyn UnsafeNotify { + mem::transmute(p as *mut dyn UnsafeNotify) +} + +impl<U: Unpark> Node<U> { + fn notify(me: &Arc<Node<U>>) { + let inner = match me.queue.upgrade() { + Some(inner) => inner, + None => return, + }; + + // It's our job to notify the node that it's ready to get polled, + // meaning that we need to enqueue it into the readiness queue. To + // do this we flag that we're ready to be queued, and if successful + // we then do the literal queueing operation, ensuring that we're + // only queued once. + // + // Once the node is inserted we be sure to notify the parent task, + // as it'll want to come along and pick up our node now. + // + // Note that we don't change the reference count of the node here, + // we're just enqueueing the raw pointer. The `Scheduler` + // implementation guarantees that if we set the `queued` flag true that + // there's a reference count held by the main `Scheduler` queue + // still. + let prev = me.queued.swap(true, SeqCst); + if !prev { + // Get the current scheduler tick + let tick_num = inner.tick_num.load(SeqCst); + me.notified_at.store(tick_num, SeqCst); + + inner.enqueue(&**me); + inner.unpark.unpark(); + } + } +} + +impl<U> Drop for Node<U> { + fn drop(&mut self) { + // Currently a `Node` is sent across all threads for any lifetime, + // regardless of `T`. This means that for memory safety we can't + // actually touch `T` at any time except when we have a reference to the + // `Scheduler` itself. + // + // Consequently it *should* be the case that we always drop items from + // the `Scheduler` instance, but this is a bomb in place to catch + // any bugs in that logic. + unsafe { + if (*self.item.get()).is_some() { + abort("item still here when dropping"); + } + } + } +} + +fn arc2ptr<T>(ptr: Arc<T>) -> *const T { + let addr = &*ptr as *const T; + mem::forget(ptr); + return addr; +} + +unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> { + let anchor = mem::transmute::<usize, Arc<T>>(0x10); + let addr = &*anchor as *const T; + mem::forget(anchor); + let offset = addr as isize - 0x10; + mem::transmute::<isize, Arc<T>>(ptr as isize - offset) +} + +fn abort(s: &str) -> ! { + struct DoublePanic; + + impl Drop for DoublePanic { + fn drop(&mut self) { + panic!("panicking twice to abort the program"); + } + } + + let _bomb = DoublePanic; + panic!("{}", s); +} diff --git a/third_party/rust/tokio-current-thread/tests/current_thread.rs b/third_party/rust/tokio-current-thread/tests/current_thread.rs new file mode 100644 index 0000000000..dca0190e29 --- /dev/null +++ b/third_party/rust/tokio-current-thread/tests/current_thread.rs @@ -0,0 +1,835 @@ +extern crate futures; +extern crate tokio_current_thread; +extern crate tokio_executor; + +use tokio_current_thread::{block_on_all, CurrentThread}; + +use std::any::Any; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::thread; +use std::time::Duration; + +use futures::future::{self, lazy}; +use futures::task; +// This is not actually unused --- we need this trait to be in scope for +// the tests that sue TaskExecutor::current().execute(). The compiler +// doesn't realise that. +#[allow(unused_imports)] +use futures::future::Executor as _futures_Executor; +use futures::prelude::*; +use futures::sync::oneshot; + +mod from_block_on_all { + use super::*; + fn test<F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static>(spawn: F) { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + let msg = tokio_current_thread::block_on_all(lazy(move || { + c.set(1 + c.get()); + + // Spawn! + spawn(Box::new(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, cnt.get()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }); + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Rc::new(Cell::new(0)); + let cnt2 = cnt.clone(); + + block_on_all(rx.then(move |_| { + cnt.set(1 + cnt.get()); + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, cnt2.get()); +} + +#[test] +fn spawn_many() { + const ITER: usize = 200; + + let cnt = Rc::new(Cell::new(0)); + let mut tokio_current_thread = CurrentThread::new(); + + for _ in 0..ITER { + let cnt = cnt.clone(); + tokio_current_thread.spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok::<(), ()>(()) + })); + } + + tokio_current_thread.run().unwrap(); + + assert_eq!(cnt.get(), ITER); +} + +mod does_not_set_global_executor_by_default { + use super::*; + + fn test<F: Fn(Box<dyn Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>( + spawn: F, + ) { + block_on_all(lazy(|| { + spawn(Box::new(lazy(|| ok()))).unwrap_err(); + ok() + })) + .unwrap() + } + + #[test] + fn spawn() { + use tokio_executor::Executor; + test(|f| tokio_executor::DefaultExecutor::current().spawn(f)) + } + + #[test] + fn execute() { + test(|f| tokio_executor::DefaultExecutor::current().execute(f)) + } +} + +mod from_block_on_future { + use super::*; + + fn test<F: Fn(Box<dyn Future<Item = (), Error = ()>>)>(spawn: F) { + let cnt = Rc::new(Cell::new(0)); + + let mut tokio_current_thread = CurrentThread::new(); + + tokio_current_thread + .block_on(lazy(|| { + let cnt = cnt.clone(); + + spawn(Box::new(lazy(move || { + cnt.set(1 + cnt.get()); + Ok(()) + }))); + + Ok::<_, ()>(()) + })) + .unwrap(); + + tokio_current_thread.run().unwrap(); + + assert_eq!(1, cnt.get()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn); + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }); + } +} + +struct Never(Rc<()>); + +impl Future for Never { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } +} + +mod outstanding_tasks_are_dropped_when_executor_is_dropped { + use super::*; + + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static, + G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>), + { + let mut rc = Rc::new(()); + + let mut tokio_current_thread = CurrentThread::new(); + dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone()))); + + drop(tokio_current_thread); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); + + // Using the global spawn fn + + let mut rc = Rc::new(()); + + let mut tokio_current_thread = CurrentThread::new(); + + tokio_current_thread + .block_on(lazy(|| { + spawn(Box::new(Never(rc.clone()))); + Ok::<_, ()>(()) + })) + .unwrap(); + + drop(tokio_current_thread); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test( + |f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { + rt.spawn(f); + }, + ); + } +} + +#[test] +#[should_panic] +fn nesting_run() { + block_on_all(lazy(|| { + block_on_all(lazy(|| ok())).unwrap(); + + ok() + })) + .unwrap(); +} + +mod run_in_future { + use super::*; + + #[test] + #[should_panic] + fn spawn() { + block_on_all(lazy(|| { + tokio_current_thread::spawn(lazy(|| { + block_on_all(lazy(|| ok())).unwrap(); + ok() + })); + ok() + })) + .unwrap(); + } + + #[test] + #[should_panic] + fn execute() { + block_on_all(lazy(|| { + tokio_current_thread::TaskExecutor::current() + .execute(lazy(|| { + block_on_all(lazy(|| ok())).unwrap(); + ok() + })) + .unwrap(); + ok() + })) + .unwrap(); + } +} + +#[test] +fn tick_on_infini_future() { + let num = Rc::new(Cell::new(0)); + + struct Infini { + num: Rc<Cell<usize>>, + } + + impl Future for Infini { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.num.set(1 + self.num.get()); + task::current().notify(); + Ok(Async::NotReady) + } + } + + CurrentThread::new() + .spawn(Infini { num: num.clone() }) + .turn(None) + .unwrap(); + + assert_eq!(1, num.get()); +} + +mod tasks_are_scheduled_fairly { + use super::*; + struct Spin { + state: Rc<RefCell<[i32; 2]>>, + idx: usize, + } + + impl Future for Spin { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let mut state = self.state.borrow_mut(); + + if self.idx == 0 { + let diff = state[0] - state[1]; + + assert!(diff.abs() <= 1); + + if state[0] >= 50 { + return Ok(().into()); + } + } + + state[self.idx] += 1; + + if state[self.idx] >= 100 { + return Ok(().into()); + } + + task::current().notify(); + Ok(Async::NotReady) + } + } + + fn test<F: Fn(Spin)>(spawn: F) { + let state = Rc::new(RefCell::new([0, 0])); + + block_on_all(lazy(|| { + spawn(Spin { + state: state.clone(), + idx: 0, + }); + + spawn(Spin { + state: state, + idx: 1, + }); + + ok() + })) + .unwrap(); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod and_turn { + use super::*; + + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static, + G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>), + { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + let mut tokio_current_thread = CurrentThread::new(); + + // Spawn a basic task to get the executor to turn + dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(())))); + + // Turn once... + tokio_current_thread.turn(None).unwrap(); + + dotspawn( + &mut tokio_current_thread, + Box::new(lazy(move || { + c.set(1 + c.get()); + + // Spawn! + spawn(Box::new(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + }))); + + Ok(()) + })), + ); + + // This does not run the newly spawned thread + tokio_current_thread.turn(None).unwrap(); + assert_eq!(1, cnt.get()); + + // This runs the newly spawned thread + tokio_current_thread.turn(None).unwrap(); + assert_eq!(2, cnt.get()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test( + |f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { + rt.spawn(f); + }, + ); + } +} + +mod in_drop { + use super::*; + struct OnDrop<F: FnOnce()>(Option<F>); + + impl<F: FnOnce()> Drop for OnDrop<F> { + fn drop(&mut self) { + (self.0.take().unwrap())(); + } + } + + struct MyFuture { + _data: Box<dyn Any>, + } + + impl Future for MyFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(().into()) + } + } + + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<dyn Future<Item = (), Error = ()>>) + 'static, + G: Fn(&mut CurrentThread, Box<dyn Future<Item = (), Error = ()>>), + { + let mut tokio_current_thread = CurrentThread::new(); + + let (tx, rx) = oneshot::channel(); + + dotspawn( + &mut tokio_current_thread, + Box::new(MyFuture { + _data: Box::new(OnDrop(Some(move || { + spawn(Box::new(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + }))); + }))), + }), + ); + + tokio_current_thread.block_on(rx).unwrap(); + tokio_current_thread.run().unwrap(); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test( + |f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { + rt.spawn(f); + }, + ); + } +} + +#[test] +fn hammer_turn() { + use futures::sync::mpsc; + + const ITER: usize = 100; + const N: usize = 100; + const THREADS: usize = 4; + + for _ in 0..ITER { + let mut ths = vec![]; + + // Add some jitter + for _ in 0..THREADS { + let th = thread::spawn(|| { + let mut tokio_current_thread = CurrentThread::new(); + + let (tx, rx) = mpsc::unbounded(); + + tokio_current_thread.spawn({ + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + rx.for_each(move |_| { + c.set(1 + c.get()); + Ok(()) + }) + .map_err(|e| panic!("err={:?}", e)) + .map(move |v| { + assert_eq!(N, cnt.get()); + v + }) + }); + + thread::spawn(move || { + for _ in 0..N { + tx.unbounded_send(()).unwrap(); + thread::yield_now(); + } + }); + + while !tokio_current_thread.is_idle() { + tokio_current_thread.turn(None).unwrap(); + } + }); + + ths.push(th); + } + + for th in ths { + th.join().unwrap(); + } + } +} + +#[test] +fn turn_has_polled() { + let mut tokio_current_thread = CurrentThread::new(); + + // Spawn oneshot receiver + let (sender, receiver) = oneshot::channel::<()>(); + tokio_current_thread.spawn(receiver.then(|_| Ok(()))); + + // Turn once... + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // Should've polled the receiver once, but considered it not ready + assert!(res.has_polled()); + + // Turn another time + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // Should've polled nothing, the receiver is not ready yet + assert!(!res.has_polled()); + + // Make the receiver ready + sender.send(()).unwrap(); + + // Turn another time + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // Should've polled the receiver, it's ready now + assert!(res.has_polled()); + + // Now the executor should be empty + assert!(tokio_current_thread.is_idle()); + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // So should've polled nothing + assert!(!res.has_polled()); +} + +// Our own mock Park that is never really waiting and the only +// thing it does is to send, on request, something (once) to a oneshot +// channel +struct MyPark { + sender: Option<oneshot::Sender<()>>, + send_now: Rc<Cell<bool>>, +} + +struct MyUnpark; + +impl tokio_executor::park::Park for MyPark { + type Unpark = MyUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + MyUnpark + } + + fn park(&mut self) -> Result<(), Self::Error> { + // If called twice with send_now, this will intentionally panic + if self.send_now.get() { + self.sender.take().unwrap().send(()).unwrap(); + } + + Ok(()) + } + + fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { + self.park() + } +} + +impl tokio_executor::park::Unpark for MyUnpark { + fn unpark(&self) {} +} + +#[test] +fn turn_fair() { + let send_now = Rc::new(Cell::new(false)); + + let (sender, receiver) = oneshot::channel::<()>(); + let (sender_2, receiver_2) = oneshot::channel::<()>(); + let (sender_3, receiver_3) = oneshot::channel::<()>(); + + let my_park = MyPark { + sender: Some(sender_3), + send_now: send_now.clone(), + }; + + let mut tokio_current_thread = CurrentThread::new_with_park(my_park); + + let receiver_1_done = Rc::new(Cell::new(false)); + let receiver_1_done_clone = receiver_1_done.clone(); + + // Once an item is received on the oneshot channel, it will immediately + // immediately make the second oneshot channel ready + tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| { + sender_2.send(()).unwrap(); + receiver_1_done_clone.set(true); + + Ok(()) + })); + + let receiver_2_done = Rc::new(Cell::new(false)); + let receiver_2_done_clone = receiver_2_done.clone(); + + tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| { + receiver_2_done_clone.set(true); + Ok(()) + })); + + // The third receiver is only woken up from our Park implementation, it simulates + // e.g. a socket that first has to be polled to know if it is ready now + let receiver_3_done = Rc::new(Cell::new(false)); + let receiver_3_done_clone = receiver_3_done.clone(); + + tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| { + receiver_3_done_clone.set(true); + Ok(()) + })); + + // First turn should've polled both and considered them not ready + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + assert!(res.has_polled()); + + // Next turn should've polled nothing + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + assert!(!res.has_polled()); + + assert!(!receiver_1_done.get()); + assert!(!receiver_2_done.get()); + assert!(!receiver_3_done.get()); + + // After this the receiver future will wake up the second receiver future, + // so there are pending futures again + sender.send(()).unwrap(); + + // Now the first receiver should be done, the second receiver should be ready + // to be polled again and the socket not yet + let res = tokio_current_thread.turn(None).unwrap(); + assert!(res.has_polled()); + + assert!(receiver_1_done.get()); + assert!(!receiver_2_done.get()); + assert!(!receiver_3_done.get()); + + // Now let our park implementation know that it should send something to sender 3 + send_now.set(true); + + // This should resolve the second receiver directly, but also poll the socket + // and read the packet from it. If it didn't do both here, we would handle + // futures that are woken up from the reactor and directly unfairly and would + // favour the ones that are woken up directly. + let res = tokio_current_thread.turn(None).unwrap(); + assert!(res.has_polled()); + + assert!(receiver_1_done.get()); + assert!(receiver_2_done.get()); + assert!(receiver_3_done.get()); + + // Don't send again + send_now.set(false); + + // Now we should be idle and turning should not poll anything + assert!(tokio_current_thread.is_idle()); + let res = tokio_current_thread.turn(None).unwrap(); + assert!(!res.has_polled()); +} + +#[test] +fn spawn_from_other_thread() { + let mut current_thread = CurrentThread::new(); + + let handle = current_thread.handle(); + let (sender, receiver) = oneshot::channel::<()>(); + + thread::spawn(move || { + handle + .spawn(lazy(move || { + sender.send(()).unwrap(); + Ok(()) + })) + .unwrap(); + }); + + let _ = current_thread.block_on(receiver).unwrap(); +} + +#[test] +fn spawn_from_other_thread_unpark() { + use std::sync::mpsc::channel as mpsc_channel; + + let mut current_thread = CurrentThread::new(); + + let handle = current_thread.handle(); + let (sender_1, receiver_1) = oneshot::channel::<()>(); + let (sender_2, receiver_2) = mpsc_channel::<()>(); + + thread::spawn(move || { + let _ = receiver_2.recv().unwrap(); + + handle + .spawn(lazy(move || { + sender_1.send(()).unwrap(); + Ok(()) + })) + .unwrap(); + }); + + // Ensure that unparking the executor works correctly. It will first + // check if there are new futures (there are none), then execute the + // lazy future below which will cause the future to be spawned from + // the other thread. Then the executor will park but should be woken + // up because *now* we have a new future to schedule + let _ = current_thread + .block_on( + lazy(move || { + sender_2.send(()).unwrap(); + Ok(()) + }) + .and_then(|_| receiver_1), + ) + .unwrap(); +} + +#[test] +fn spawn_from_executor_with_handle() { + let mut current_thread = CurrentThread::new(); + let handle = current_thread.handle(); + let (tx, rx) = oneshot::channel(); + + current_thread.spawn(lazy(move || { + handle + .spawn(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })) + .unwrap(); + Ok::<_, ()>(()) + })); + + current_thread.run(); + + rx.wait().unwrap(); +} + +fn ok() -> future::FutureResult<(), ()> { + future::ok(()) +} |