summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-current-thread
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-current-thread')
-rw-r--r--third_party/rust/tokio-current-thread/.cargo-checksum.json1
-rw-r--r--third_party/rust/tokio-current-thread/CHANGELOG.md31
-rw-r--r--third_party/rust/tokio-current-thread/Cargo.toml28
-rw-r--r--third_party/rust/tokio-current-thread/LICENSE25
-rw-r--r--third_party/rust/tokio-current-thread/README.md19
-rw-r--r--third_party/rust/tokio-current-thread/src/lib.rs866
-rw-r--r--third_party/rust/tokio-current-thread/src/scheduler.rs770
-rw-r--r--third_party/rust/tokio-current-thread/tests/current_thread.rs837
8 files changed, 2577 insertions, 0 deletions
diff --git a/third_party/rust/tokio-current-thread/.cargo-checksum.json b/third_party/rust/tokio-current-thread/.cargo-checksum.json
new file mode 100644
index 0000000000..e6b56ef2da
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"CHANGELOG.md":"58905ba9ae83fd4733ab85fd56be9d7f32387be23b3feedf00e26082c5a3b15d","Cargo.toml":"a79ea6c03cf55f2880c59ac0bd56ede9ce8697cfdbca924fe8bff565cc5968c7","LICENSE":"898b1ae9821e98daf8964c8d6c7f61641f5f5aa78ad500020771c0939ee0dea1","README.md":"c59e1859e0f32ebb4e4b09c5b9fa5d780d035027a7e4964bde745655230843c7","src/lib.rs":"811d51ea3a1a33ff463826f74ee6e9231e168eebbe7c9b1787f78aeb2e166442","src/scheduler.rs":"2b69578151e24209408212efc8bfbf16a0dc46dfa2addcbf45074f92c16c05f7","tests/current_thread.rs":"8e9a12abec30997fe72cef2f5d860c5fb50c30a61549ab6b964f8112752d1d06"},"package":"d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443"} \ No newline at end of file
diff --git a/third_party/rust/tokio-current-thread/CHANGELOG.md b/third_party/rust/tokio-current-thread/CHANGELOG.md
new file mode 100644
index 0000000000..deb764d4a8
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/CHANGELOG.md
@@ -0,0 +1,31 @@
+# 0.1.6 (March 22, 2019)
+
+### Added
+- implement `TypedExecutor` (#993).
+
+# 0.1.5 (March 1, 2019)
+
+### Fixed
+- Documentation typos (#882).
+
+# 0.1.4 (November 21, 2018)
+
+* Fix shutdown on idle (#763).
+
+# 0.1.3 (September 27, 2018)
+
+* Fix minimal versions
+
+# 0.1.2 (September 26, 2018)
+
+* Implement `futures::Executor` for executor types (#563)
+* Spawning performance improvements (#565)
+
+# 0.1.1 (August 6, 2018)
+
+* Implement `std::Error` for misc error types (#501)
+* bugfix: Track tasks pending in spawn queue (#478)
+
+# 0.1.0 (June 13, 2018)
+
+* Extract `tokio::executor::current_thread` to a tokio-current-thread crate (#356)
diff --git a/third_party/rust/tokio-current-thread/Cargo.toml b/third_party/rust/tokio-current-thread/Cargo.toml
new file mode 100644
index 0000000000..9a2b6deedc
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/Cargo.toml
@@ -0,0 +1,28 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g. crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+name = "tokio-current-thread"
+version = "0.1.6"
+authors = ["Carl Lerche <me@carllerche.com>"]
+description = "Single threaded executor which manage many tasks concurrently on the current thread.\n"
+homepage = "https://github.com/tokio-rs/tokio"
+documentation = "https://docs.rs/tokio-current-thread/0.1.6/tokio_current_thread"
+keywords = ["futures", "tokio"]
+categories = ["concurrency", "asynchronous"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+[dependencies.futures]
+version = "0.1.19"
+
+[dependencies.tokio-executor]
+version = "0.1.7"
diff --git a/third_party/rust/tokio-current-thread/LICENSE b/third_party/rust/tokio-current-thread/LICENSE
new file mode 100644
index 0000000000..cdb28b4b56
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2019 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/third_party/rust/tokio-current-thread/README.md b/third_party/rust/tokio-current-thread/README.md
new file mode 100644
index 0000000000..007b84baed
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/README.md
@@ -0,0 +1,19 @@
+# tokio-current-thread
+
+Single threaded executor for Tokio.
+
+[Documentation](https://docs.rs/tokio-current-thread/0.1.6/tokio_current_thread/)
+
+## Overview
+
+This crate provides the single threaded executor which execute many tasks concurrently.
+
+## License
+
+This project is licensed under the [MIT license](LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/third_party/rust/tokio-current-thread/src/lib.rs b/third_party/rust/tokio-current-thread/src/lib.rs
new file mode 100644
index 0000000000..8dc3ebd144
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/src/lib.rs
@@ -0,0 +1,866 @@
+#![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.6")]
+#![deny(warnings, missing_docs, missing_debug_implementations)]
+
+//! A single-threaded executor which executes tasks on the same thread from which
+//! they are spawned.
+//!
+//!
+//! The crate provides:
+//!
+//! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread.
+//! The easiest way to start a new [`CurrentThread`] executor is to call
+//! [`block_on_all`] with an initial task to seed the executor.
+//! All tasks that are being managed by a [`CurrentThread`] executor are able to
+//! spawn additional tasks by calling [`spawn`].
+//!
+//!
+//! Application authors will not use this crate directly. Instead, they will use the
+//! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they
+//! are building a custom task executor.
+//!
+//! For more details, see [executor module] documentation in the Tokio crate.
+//!
+//! [`CurrentThread`]: struct.CurrentThread.html
+//! [`spawn`]: fn.spawn.html
+//! [`block_on_all`]: fn.block_on_all.html
+//! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html
+
+extern crate futures;
+extern crate tokio_executor;
+
+mod scheduler;
+
+use self::scheduler::Scheduler;
+
+use tokio_executor::park::{Park, ParkThread, Unpark};
+use tokio_executor::{Enter, SpawnError};
+
+use futures::future::{ExecuteError, ExecuteErrorKind, Executor};
+use futures::{executor, Async, Future};
+
+use std::cell::Cell;
+use std::error::Error;
+use std::fmt;
+use std::rc::Rc;
+use std::sync::{atomic, mpsc, Arc};
+use std::thread;
+use std::time::{Duration, Instant};
+
+/// Executes tasks on the current thread
+pub struct CurrentThread<P: Park = ParkThread> {
+ /// Execute futures and receive unpark notifications.
+ scheduler: Scheduler<P::Unpark>,
+
+ /// Current number of futures being executed.
+ ///
+ /// The LSB is used to indicate that the runtime is preparing to shut down.
+ /// Thus, to get the actual number of pending futures, `>>1`.
+ num_futures: Arc<atomic::AtomicUsize>,
+
+ /// Thread park handle
+ park: P,
+
+ /// Handle for spawning new futures from other threads
+ spawn_handle: Handle,
+
+ /// Receiver for futures spawned from other threads
+ spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>,
+
+ /// The thread-local ID assigned to this executor.
+ id: u64,
+}
+
+/// Executes futures on the current thread.
+///
+/// All futures executed using this executor will be executed on the current
+/// thread. As such, `run` will wait for these futures to complete before
+/// returning.
+///
+/// For more details, see the [module level](index.html) documentation.
+#[derive(Debug, Clone)]
+pub struct TaskExecutor {
+ // Prevent the handle from moving across threads.
+ _p: ::std::marker::PhantomData<Rc<()>>,
+}
+
+/// Returned by the `turn` function.
+#[derive(Debug)]
+pub struct Turn {
+ polled: bool,
+}
+
+impl Turn {
+ /// `true` if any futures were polled at all and `false` otherwise.
+ pub fn has_polled(&self) -> bool {
+ self.polled
+ }
+}
+
+/// A `CurrentThread` instance bound to a supplied execution context.
+pub struct Entered<'a, P: Park + 'a> {
+ executor: &'a mut CurrentThread<P>,
+ enter: &'a mut Enter,
+}
+
+/// Error returned by the `run` function.
+#[derive(Debug)]
+pub struct RunError {
+ _p: (),
+}
+
+impl fmt::Display for RunError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "{}", self.description())
+ }
+}
+
+impl Error for RunError {
+ fn description(&self) -> &str {
+ "Run error"
+ }
+}
+
+/// Error returned by the `run_timeout` function.
+#[derive(Debug)]
+pub struct RunTimeoutError {
+ timeout: bool,
+}
+
+impl fmt::Display for RunTimeoutError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "{}", self.description())
+ }
+}
+
+impl Error for RunTimeoutError {
+ fn description(&self) -> &str {
+ if self.timeout {
+ "Run timeout error (timeout)"
+ } else {
+ "Run timeout error (not timeout)"
+ }
+ }
+}
+
+/// Error returned by the `turn` function.
+#[derive(Debug)]
+pub struct TurnError {
+ _p: (),
+}
+
+impl fmt::Display for TurnError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "{}", self.description())
+ }
+}
+
+impl Error for TurnError {
+ fn description(&self) -> &str {
+ "Turn error"
+ }
+}
+
+/// Error returned by the `block_on` function.
+#[derive(Debug)]
+pub struct BlockError<T> {
+ inner: Option<T>,
+}
+
+impl<T> fmt::Display for BlockError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "Block error")
+ }
+}
+
+impl<T: fmt::Debug> Error for BlockError<T> {
+ fn description(&self) -> &str {
+ "Block error"
+ }
+}
+
+/// This is mostly split out to make the borrow checker happy.
+struct Borrow<'a, U: 'a> {
+ id: u64,
+ scheduler: &'a mut Scheduler<U>,
+ num_futures: &'a atomic::AtomicUsize,
+}
+
+trait SpawnLocal {
+ fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool);
+}
+
+struct CurrentRunner {
+ spawn: Cell<Option<*mut SpawnLocal>>,
+ id: Cell<Option<u64>>,
+}
+
+thread_local! {
+ /// Current thread's task runner. This is set in `TaskRunner::with`
+ static CURRENT: CurrentRunner = CurrentRunner {
+ spawn: Cell::new(None),
+ id: Cell::new(None),
+ }
+}
+
+thread_local! {
+ /// Unique ID to assign to each new executor launched on this thread.
+ ///
+ /// The unique ID is used to determine if the currently running executor matches the one
+ /// referred to by a `Handle` so that direct task dispatch can be used.
+ static EXECUTOR_ID: Cell<u64> = Cell::new(0)
+}
+
+/// Run the executor bootstrapping the execution with the provided future.
+///
+/// This creates a new [`CurrentThread`] executor, spawns the provided future,
+/// and blocks the current thread until the provided future and **all**
+/// subsequently spawned futures complete. In other words:
+///
+/// * If the provided bootstrap future does **not** spawn any additional tasks,
+/// `block_on_all` returns once `future` completes.
+/// * If the provided bootstrap future **does** spawn additional tasks, then
+/// `block_on_all` returns once **all** spawned futures complete.
+///
+/// See [module level][mod] documentation for more details.
+///
+/// [`CurrentThread`]: struct.CurrentThread.html
+/// [mod]: index.html
+pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
+where
+ F: Future,
+{
+ let mut current_thread = CurrentThread::new();
+
+ let ret = current_thread.block_on(future);
+ current_thread.run().unwrap();
+
+ ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
+}
+
+/// Executes a future on the current thread.
+///
+/// The provided future must complete or be canceled before `run` will return.
+///
+/// Unlike [`tokio::spawn`], this function will always spawn on a
+/// `CurrentThread` executor and is able to spawn futures that are not `Send`.
+///
+/// # Panics
+///
+/// This function can only be invoked from the context of a `run` call; any
+/// other use will result in a panic.
+///
+/// [`tokio::spawn`]: ../fn.spawn.html
+pub fn spawn<F>(future: F)
+where
+ F: Future<Item = (), Error = ()> + 'static,
+{
+ TaskExecutor::current()
+ .spawn_local(Box::new(future))
+ .unwrap();
+}
+
+// ===== impl CurrentThread =====
+
+impl CurrentThread<ParkThread> {
+ /// Create a new instance of `CurrentThread`.
+ pub fn new() -> Self {
+ CurrentThread::new_with_park(ParkThread::new())
+ }
+}
+
+impl<P: Park> CurrentThread<P> {
+ /// Create a new instance of `CurrentThread` backed by the given park
+ /// handle.
+ pub fn new_with_park(park: P) -> Self {
+ let unpark = park.unpark();
+
+ let (spawn_sender, spawn_receiver) = mpsc::channel();
+ let thread = thread::current().id();
+ let id = EXECUTOR_ID.with(|idc| {
+ let id = idc.get();
+ idc.set(id + 1);
+ id
+ });
+
+ let scheduler = Scheduler::new(unpark);
+ let notify = scheduler.notify();
+
+ let num_futures = Arc::new(atomic::AtomicUsize::new(0));
+
+ CurrentThread {
+ scheduler: scheduler,
+ num_futures: num_futures.clone(),
+ park,
+ id,
+ spawn_handle: Handle {
+ sender: spawn_sender,
+ num_futures: num_futures,
+ notify: notify,
+ shut_down: Cell::new(false),
+ thread: thread,
+ id,
+ },
+ spawn_receiver: spawn_receiver,
+ }
+ }
+
+ /// Returns `true` if the executor is currently idle.
+ ///
+ /// An idle executor is defined by not currently having any spawned tasks.
+ ///
+ /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`,
+ /// this method may return `true` even though there are more futures to be executed.
+ pub fn is_idle(&self) -> bool {
+ self.num_futures.load(atomic::Ordering::SeqCst) <= 1
+ }
+
+ /// Spawn the future on the executor.
+ ///
+ /// This internally queues the future to be executed once `run` is called.
+ pub fn spawn<F>(&mut self, future: F) -> &mut Self
+ where
+ F: Future<Item = (), Error = ()> + 'static,
+ {
+ self.borrow().spawn_local(Box::new(future), false);
+ self
+ }
+
+ /// Synchronously waits for the provided `future` to complete.
+ ///
+ /// This function can be used to synchronously block the current thread
+ /// until the provided `future` has resolved either successfully or with an
+ /// error. The result of the future is then returned from this function
+ /// call.
+ ///
+ /// Note that this function will **also** execute any spawned futures on the
+ /// current thread, but will **not** block until these other spawned futures
+ /// have completed.
+ ///
+ /// The caller is responsible for ensuring that other spawned futures
+ /// complete execution.
+ pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>>
+ where
+ F: Future,
+ {
+ let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
+ self.enter(&mut enter).block_on(future)
+ }
+
+ /// Run the executor to completion, blocking the thread until **all**
+ /// spawned futures have completed.
+ pub fn run(&mut self) -> Result<(), RunError> {
+ let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
+ self.enter(&mut enter).run()
+ }
+
+ /// Run the executor to completion, blocking the thread until all
+ /// spawned futures have completed **or** `duration` time has elapsed.
+ pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
+ let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
+ self.enter(&mut enter).run_timeout(duration)
+ }
+
+ /// Perform a single iteration of the event loop.
+ ///
+ /// This function blocks the current thread even if the executor is idle.
+ pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
+ let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`");
+ self.enter(&mut enter).turn(duration)
+ }
+
+ /// Bind `CurrentThread` instance with an execution context.
+ pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> {
+ Entered {
+ executor: self,
+ enter,
+ }
+ }
+
+ /// Returns a reference to the underlying `Park` instance.
+ pub fn get_park(&self) -> &P {
+ &self.park
+ }
+
+ /// Returns a mutable reference to the underlying `Park` instance.
+ pub fn get_park_mut(&mut self) -> &mut P {
+ &mut self.park
+ }
+
+ fn borrow(&mut self) -> Borrow<P::Unpark> {
+ Borrow {
+ id: self.id,
+ scheduler: &mut self.scheduler,
+ num_futures: &*self.num_futures,
+ }
+ }
+
+ /// Get a new handle to spawn futures on the executor
+ ///
+ /// Different to the executor itself, the handle can be sent to different
+ /// threads and can be used to spawn futures on the executor.
+ pub fn handle(&self) -> Handle {
+ self.spawn_handle.clone()
+ }
+}
+
+impl<P: Park> Drop for CurrentThread<P> {
+ fn drop(&mut self) {
+ // Signal to Handles that no more futures can be spawned by setting LSB.
+ //
+ // NOTE: this isn't technically necessary since the send on the mpsc will fail once the
+ // receiver is dropped, but it's useful to illustrate how clean shutdown will be
+ // implemented (e.g., by setting the LSB).
+ let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst);
+
+ // TODO: We currently ignore any pending futures at the time we shut down.
+ //
+ // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`)
+ // which sets LSB (as above) do make Handle::spawn stop working, and then runs until
+ // num_futures.load() == 1.
+ let _ = pending;
+ }
+}
+
+impl tokio_executor::Executor for CurrentThread {
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
+ self.borrow().spawn_local(future, false);
+ Ok(())
+ }
+}
+
+impl<T> tokio_executor::TypedExecutor<T> for CurrentThread
+where
+ T: Future<Item = (), Error = ()> + 'static,
+{
+ fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
+ self.borrow().spawn_local(Box::new(future), false);
+ Ok(())
+ }
+}
+
+impl<P: Park> fmt::Debug for CurrentThread<P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("CurrentThread")
+ .field("scheduler", &self.scheduler)
+ .field(
+ "num_futures",
+ &self.num_futures.load(atomic::Ordering::SeqCst),
+ )
+ .finish()
+ }
+}
+
+// ===== impl Entered =====
+
+impl<'a, P: Park> Entered<'a, P> {
+ /// Spawn the future on the executor.
+ ///
+ /// This internally queues the future to be executed once `run` is called.
+ pub fn spawn<F>(&mut self, future: F) -> &mut Self
+ where
+ F: Future<Item = (), Error = ()> + 'static,
+ {
+ self.executor.borrow().spawn_local(Box::new(future), false);
+ self
+ }
+
+ /// Synchronously waits for the provided `future` to complete.
+ ///
+ /// This function can be used to synchronously block the current thread
+ /// until the provided `future` has resolved either successfully or with an
+ /// error. The result of the future is then returned from this function
+ /// call.
+ ///
+ /// Note that this function will **also** execute any spawned futures on the
+ /// current thread, but will **not** block until these other spawned futures
+ /// have completed.
+ ///
+ /// The caller is responsible for ensuring that other spawned futures
+ /// complete execution.
+ pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>>
+ where
+ F: Future,
+ {
+ let mut future = executor::spawn(future);
+ let notify = self.executor.scheduler.notify();
+
+ loop {
+ let res = self
+ .executor
+ .borrow()
+ .enter(self.enter, || future.poll_future_notify(&notify, 0));
+
+ match res {
+ Ok(Async::Ready(e)) => return Ok(e),
+ Err(e) => return Err(BlockError { inner: Some(e) }),
+ Ok(Async::NotReady) => {}
+ }
+
+ self.tick();
+
+ if let Err(_) = self.executor.park.park() {
+ return Err(BlockError { inner: None });
+ }
+ }
+ }
+
+ /// Run the executor to completion, blocking the thread until **all**
+ /// spawned futures have completed.
+ pub fn run(&mut self) -> Result<(), RunError> {
+ self.run_timeout2(None).map_err(|_| RunError { _p: () })
+ }
+
+ /// Run the executor to completion, blocking the thread until all
+ /// spawned futures have completed **or** `duration` time has elapsed.
+ pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
+ self.run_timeout2(Some(duration))
+ }
+
+ /// Perform a single iteration of the event loop.
+ ///
+ /// This function blocks the current thread even if the executor is idle.
+ pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
+ let res = if self.executor.scheduler.has_pending_futures() {
+ self.executor.park.park_timeout(Duration::from_millis(0))
+ } else {
+ match duration {
+ Some(duration) => self.executor.park.park_timeout(duration),
+ None => self.executor.park.park(),
+ }
+ };
+
+ if res.is_err() {
+ return Err(TurnError { _p: () });
+ }
+
+ let polled = self.tick();
+
+ Ok(Turn { polled })
+ }
+
+ /// Returns a reference to the underlying `Park` instance.
+ pub fn get_park(&self) -> &P {
+ &self.executor.park
+ }
+
+ /// Returns a mutable reference to the underlying `Park` instance.
+ pub fn get_park_mut(&mut self) -> &mut P {
+ &mut self.executor.park
+ }
+
+ fn run_timeout2(&mut self, dur: Option<Duration>) -> Result<(), RunTimeoutError> {
+ if self.executor.is_idle() {
+ // Nothing to do
+ return Ok(());
+ }
+
+ let mut time = dur.map(|dur| (Instant::now() + dur, dur));
+
+ loop {
+ self.tick();
+
+ if self.executor.is_idle() {
+ return Ok(());
+ }
+
+ match time {
+ Some((until, rem)) => {
+ if let Err(_) = self.executor.park.park_timeout(rem) {
+ return Err(RunTimeoutError::new(false));
+ }
+
+ let now = Instant::now();
+
+ if now >= until {
+ return Err(RunTimeoutError::new(true));
+ }
+
+ time = Some((until, until - now));
+ }
+ None => {
+ if let Err(_) = self.executor.park.park() {
+ return Err(RunTimeoutError::new(false));
+ }
+ }
+ }
+ }
+ }
+
+ /// Returns `true` if any futures were processed
+ fn tick(&mut self) -> bool {
+ // Spawn any futures that were spawned from other threads by manually
+ // looping over the receiver stream
+
+ // FIXME: Slightly ugly but needed to make the borrow checker happy
+ let (mut borrow, spawn_receiver) = (
+ Borrow {
+ id: self.executor.id,
+ scheduler: &mut self.executor.scheduler,
+ num_futures: &*self.executor.num_futures,
+ },
+ &mut self.executor.spawn_receiver,
+ );
+
+ while let Ok(future) = spawn_receiver.try_recv() {
+ borrow.spawn_local(future, true);
+ }
+
+ // After any pending futures were scheduled, do the actual tick
+ borrow
+ .scheduler
+ .tick(borrow.id, &mut *self.enter, borrow.num_futures)
+ }
+}
+
+impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Entered")
+ .field("executor", &self.executor)
+ .field("enter", &self.enter)
+ .finish()
+ }
+}
+
+// ===== impl Handle =====
+
+/// Handle to spawn a future on the corresponding `CurrentThread` instance
+#[derive(Clone)]
+pub struct Handle {
+ sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
+ num_futures: Arc<atomic::AtomicUsize>,
+ shut_down: Cell<bool>,
+ notify: executor::NotifyHandle,
+ thread: thread::ThreadId,
+
+ /// The thread-local ID assigned to this Handle's executor.
+ id: u64,
+}
+
+// Manual implementation because the Sender does not implement Debug
+impl fmt::Debug for Handle {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Handle")
+ .field("shut_down", &self.shut_down.get())
+ .finish()
+ }
+}
+
+impl Handle {
+ /// Spawn a future onto the `CurrentThread` instance corresponding to this handle
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
+ /// instance of the `Handle` does not exist anymore.
+ pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
+ where
+ F: Future<Item = (), Error = ()> + Send + 'static,
+ {
+ if thread::current().id() == self.thread {
+ let mut e = TaskExecutor::current();
+ if e.id() == Some(self.id) {
+ return e.spawn_local(Box::new(future));
+ }
+ }
+
+ if self.shut_down.get() {
+ return Err(SpawnError::shutdown());
+ }
+
+ // NOTE: += 2 since LSB is the shutdown bit
+ let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
+ if pending % 2 == 1 {
+ // Bring the count back so we still know when the Runtime is idle.
+ self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst);
+
+ // Once the Runtime is shutting down, we know it won't come back.
+ self.shut_down.set(true);
+
+ return Err(SpawnError::shutdown());
+ }
+
+ self.sender
+ .send(Box::new(future))
+ .expect("CurrentThread does not exist anymore");
+ // use 0 for the id, CurrentThread does not make use of it
+ self.notify.notify(0);
+ Ok(())
+ }
+
+ /// Provides a best effort **hint** to whether or not `spawn` will succeed.
+ ///
+ /// This function may return both false positives **and** false negatives.
+ /// If `status` returns `Ok`, then a call to `spawn` will *probably*
+ /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
+ /// *probably* fail, but may succeed.
+ ///
+ /// This allows a caller to avoid creating the task if the call to `spawn`
+ /// has a high likelihood of failing.
+ pub fn status(&self) -> Result<(), SpawnError> {
+ if self.shut_down.get() {
+ return Err(SpawnError::shutdown());
+ }
+
+ Ok(())
+ }
+}
+
+// ===== impl TaskExecutor =====
+
+impl TaskExecutor {
+ /// Returns an executor that executes futures on the current thread.
+ ///
+ /// The user of `TaskExecutor` must ensure that when a future is submitted,
+ /// that it is done within the context of a call to `run`.
+ ///
+ /// For more details, see the [module level](index.html) documentation.
+ pub fn current() -> TaskExecutor {
+ TaskExecutor {
+ _p: ::std::marker::PhantomData,
+ }
+ }
+
+ /// Get the current executor's thread-local ID.
+ fn id(&self) -> Option<u64> {
+ CURRENT.with(|current| current.id.get())
+ }
+
+ /// Spawn a future onto the current `CurrentThread` instance.
+ pub fn spawn_local(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()>>,
+ ) -> Result<(), SpawnError> {
+ CURRENT.with(|current| match current.spawn.get() {
+ Some(spawn) => {
+ unsafe { (*spawn).spawn_local(future, false) };
+ Ok(())
+ }
+ None => Err(SpawnError::shutdown()),
+ })
+ }
+}
+
+impl tokio_executor::Executor for TaskExecutor {
+ fn spawn(
+ &mut self,
+ future: Box<Future<Item = (), Error = ()> + Send>,
+ ) -> Result<(), SpawnError> {
+ self.spawn_local(future)
+ }
+}
+
+impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor
+where
+ F: Future<Item = (), Error = ()> + 'static,
+{
+ fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
+ self.spawn_local(Box::new(future))
+ }
+}
+
+impl<F> Executor<F> for TaskExecutor
+where
+ F: Future<Item = (), Error = ()> + 'static,
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ CURRENT.with(|current| match current.spawn.get() {
+ Some(spawn) => {
+ unsafe { (*spawn).spawn_local(Box::new(future), false) };
+ Ok(())
+ }
+ None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)),
+ })
+ }
+}
+
+// ===== impl Borrow =====
+
+impl<'a, U: Unpark> Borrow<'a, U> {
+ fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ CURRENT.with(|current| {
+ current.id.set(Some(self.id));
+ current.set_spawn(self, || f())
+ })
+ }
+}
+
+impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
+ fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool) {
+ if !already_counted {
+ // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down.
+ // NOTE: += 2 since LSB is the shutdown bit
+ self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
+ }
+ self.scheduler.schedule(future);
+ }
+}
+
+// ===== impl CurrentRunner =====
+
+impl CurrentRunner {
+ fn set_spawn<F, R>(&self, spawn: &mut SpawnLocal, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ struct Reset<'a>(&'a CurrentRunner);
+
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ self.0.spawn.set(None);
+ self.0.id.set(None);
+ }
+ }
+
+ let _reset = Reset(self);
+
+ let spawn = unsafe { hide_lt(spawn as *mut SpawnLocal) };
+ self.spawn.set(Some(spawn));
+
+ f()
+ }
+}
+
+unsafe fn hide_lt<'a>(p: *mut (SpawnLocal + 'a)) -> *mut (SpawnLocal + 'static) {
+ use std::mem;
+ mem::transmute(p)
+}
+
+// ===== impl RunTimeoutError =====
+
+impl RunTimeoutError {
+ fn new(timeout: bool) -> Self {
+ RunTimeoutError { timeout }
+ }
+
+ /// Returns `true` if the error was caused by the operation timing out.
+ pub fn is_timeout(&self) -> bool {
+ self.timeout
+ }
+}
+
+impl From<tokio_executor::EnterError> for RunTimeoutError {
+ fn from(_: tokio_executor::EnterError) -> Self {
+ RunTimeoutError::new(false)
+ }
+}
+
+// ===== impl BlockError =====
+
+impl<T> BlockError<T> {
+ /// Returns the error yielded by the future being blocked on
+ pub fn into_inner(self) -> Option<T> {
+ self.inner
+ }
+}
+
+impl<T> From<tokio_executor::EnterError> for BlockError<T> {
+ fn from(_: tokio_executor::EnterError) -> Self {
+ BlockError { inner: None }
+ }
+}
diff --git a/third_party/rust/tokio-current-thread/src/scheduler.rs b/third_party/rust/tokio-current-thread/src/scheduler.rs
new file mode 100644
index 0000000000..c814b60f71
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/src/scheduler.rs
@@ -0,0 +1,770 @@
+use super::Borrow;
+use tokio_executor::park::Unpark;
+use tokio_executor::Enter;
+
+use futures::executor::{self, NotifyHandle, Spawn, UnsafeNotify};
+use futures::{Async, Future};
+
+use std::cell::UnsafeCell;
+use std::fmt::{self, Debug};
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
+use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
+use std::sync::{Arc, Weak};
+use std::thread;
+use std::usize;
+
+/// A generic task-aware scheduler.
+///
+/// This is used both by `FuturesUnordered` and the current-thread executor.
+pub struct Scheduler<U> {
+ inner: Arc<Inner<U>>,
+ nodes: List<U>,
+}
+
+pub struct Notify<'a, U: 'a>(&'a Arc<Node<U>>);
+
+// A linked-list of nodes
+struct List<U> {
+ len: usize,
+ head: *const Node<U>,
+ tail: *const Node<U>,
+}
+
+// Scheduler is implemented using two linked lists. The first linked list tracks
+// all items managed by a `Scheduler`. This list is stored on the `Scheduler`
+// struct and is **not** thread safe. The second linked list is an
+// implementation of the intrusive MPSC queue algorithm described by
+// 1024cores.net and is stored on `Inner`. This linked list can push items to
+// the back concurrently but only one consumer may pop from the front. To
+// enforce this requirement, all popping will be performed via fns on
+// `Scheduler` that take `&mut self`.
+//
+// When a item is submitted to the set a node is allocated and inserted in
+// both linked lists. This means that all insertion operations **must** be
+// originated from `Scheduler` with `&mut self` The next call to `tick` will
+// (eventually) see this node and call `poll` on the item.
+//
+// Nodes are wrapped in `Arc` cells which manage the lifetime of the node.
+// However, `Arc` handles are sometimes cast to `*const Node` pointers.
+// Specifically, when a node is stored in at least one of the two lists
+// described above, this represents a logical `Arc` handle. This is how
+// `Scheduler` maintains its reference to all nodes it manages. Each
+// `NotifyHandle` instance is an `Arc<Node>` as well.
+//
+// When `Scheduler` drops, it clears the linked list of all nodes that it
+// manages. When doing so, it must attempt to decrement the reference count (by
+// dropping an Arc handle). However, it can **only** decrement the reference
+// count if the node is not currently stored in the mpsc channel. If the node
+// **is** "queued" in the mpsc channel, then the arc reference count cannot be
+// decremented. Once the node is popped from the mpsc channel, then the final
+// arc reference count can be decremented, thus freeing the node.
+
+struct Inner<U> {
+ // Thread unpark handle
+ unpark: U,
+
+ // Tick number
+ tick_num: AtomicUsize,
+
+ // Head/tail of the readiness queue
+ head_readiness: AtomicPtr<Node<U>>,
+ tail_readiness: UnsafeCell<*const Node<U>>,
+
+ // Used as part of the mpsc queue algorithm
+ stub: Arc<Node<U>>,
+}
+
+unsafe impl<U: Sync + Send> Send for Inner<U> {}
+unsafe impl<U: Sync + Send> Sync for Inner<U> {}
+
+impl<U: Unpark> executor::Notify for Inner<U> {
+ fn notify(&self, _: usize) {
+ self.unpark.unpark();
+ }
+}
+
+struct Node<U> {
+ // The item
+ item: UnsafeCell<Option<Task>>,
+
+ // The tick at which this node was notified
+ notified_at: AtomicUsize,
+
+ // Next pointer for linked list tracking all active nodes
+ next_all: UnsafeCell<*const Node<U>>,
+
+ // Previous node in linked list tracking all active nodes
+ prev_all: UnsafeCell<*const Node<U>>,
+
+ // Next pointer in readiness queue
+ next_readiness: AtomicPtr<Node<U>>,
+
+ // Whether or not this node is currently in the mpsc queue.
+ queued: AtomicBool,
+
+ // Queue that we'll be enqueued to when notified
+ queue: Weak<Inner<U>>,
+}
+
+/// Returned by `Inner::dequeue`, representing either a dequeue success (with
+/// the dequeued node), an empty list, or an inconsistent state.
+///
+/// The inconsistent state is described in more detail at [1024cores], but
+/// roughly indicates that a node will be ready to dequeue sometime shortly in
+/// the future and the caller should try again soon.
+///
+/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+enum Dequeue<U> {
+ Data(*const Node<U>),
+ Empty,
+ Yield,
+ Inconsistent,
+}
+
+/// Wraps a spawned boxed future
+struct Task(Spawn<Box<Future<Item = (), Error = ()>>>);
+
+/// A task that is scheduled. `turn` must be called
+pub struct Scheduled<'a, U: 'a> {
+ task: &'a mut Task,
+ notify: &'a Notify<'a, U>,
+ done: &'a mut bool,
+}
+
+impl<U> Scheduler<U>
+where
+ U: Unpark,
+{
+ /// Constructs a new, empty `Scheduler`
+ ///
+ /// The returned `Scheduler` does not contain any items and, in this
+ /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`.
+ pub fn new(unpark: U) -> Self {
+ let stub = Arc::new(Node {
+ item: UnsafeCell::new(None),
+ notified_at: AtomicUsize::new(0),
+ next_all: UnsafeCell::new(ptr::null()),
+ prev_all: UnsafeCell::new(ptr::null()),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ queued: AtomicBool::new(true),
+ queue: Weak::new(),
+ });
+ let stub_ptr = &*stub as *const Node<U>;
+ let inner = Arc::new(Inner {
+ unpark,
+ tick_num: AtomicUsize::new(0),
+ head_readiness: AtomicPtr::new(stub_ptr as *mut _),
+ tail_readiness: UnsafeCell::new(stub_ptr),
+ stub: stub,
+ });
+
+ Scheduler {
+ inner: inner,
+ nodes: List::new(),
+ }
+ }
+
+ pub fn notify(&self) -> NotifyHandle {
+ self.inner.clone().into()
+ }
+
+ pub fn schedule(&mut self, item: Box<Future<Item = (), Error = ()>>) {
+ // Get the current scheduler tick
+ let tick_num = self.inner.tick_num.load(SeqCst);
+
+ let node = Arc::new(Node {
+ item: UnsafeCell::new(Some(Task::new(item))),
+ notified_at: AtomicUsize::new(tick_num),
+ next_all: UnsafeCell::new(ptr::null_mut()),
+ prev_all: UnsafeCell::new(ptr::null_mut()),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ queued: AtomicBool::new(true),
+ queue: Arc::downgrade(&self.inner),
+ });
+
+ // Right now our node has a strong reference count of 1. We transfer
+ // ownership of this reference count to our internal linked list
+ // and we'll reclaim ownership through the `unlink` function below.
+ let ptr = self.nodes.push_back(node);
+
+ // We'll need to get the item "into the system" to start tracking it,
+ // e.g. getting its unpark notifications going to us tracking which
+ // items are ready. To do that we unconditionally enqueue it for
+ // polling here.
+ self.inner.enqueue(ptr);
+ }
+
+ /// Returns `true` if there are currently any pending futures
+ pub fn has_pending_futures(&mut self) -> bool {
+ // See function definition for why the unsafe is needed and
+ // correctly used here
+ unsafe { self.inner.has_pending_futures() }
+ }
+
+ /// Advance the scheduler state, returning `true` if any futures were
+ /// processed.
+ ///
+ /// This function should be called whenever the caller is notified via a
+ /// wakeup.
+ pub fn tick(&mut self, eid: u64, enter: &mut Enter, num_futures: &AtomicUsize) -> bool {
+ let mut ret = false;
+ let tick = self.inner.tick_num.fetch_add(1, SeqCst).wrapping_add(1);
+
+ loop {
+ let node = match unsafe { self.inner.dequeue(Some(tick)) } {
+ Dequeue::Empty => {
+ return ret;
+ }
+ Dequeue::Yield => {
+ self.inner.unpark.unpark();
+ return ret;
+ }
+ Dequeue::Inconsistent => {
+ thread::yield_now();
+ continue;
+ }
+ Dequeue::Data(node) => node,
+ };
+
+ ret = true;
+
+ debug_assert!(node != self.inner.stub());
+
+ unsafe {
+ if (*(*node).item.get()).is_none() {
+ // The node has already been released. However, while it was
+ // being released, another thread notified it, which
+ // resulted in it getting pushed into the mpsc channel.
+ //
+ // In this case, we just decrement the ref count.
+ let node = ptr2arc(node);
+ assert!((*node.next_all.get()).is_null());
+ assert!((*node.prev_all.get()).is_null());
+ continue;
+ };
+
+ // We're going to need to be very careful if the `poll`
+ // function below panics. We need to (a) not leak memory and
+ // (b) ensure that we still don't have any use-after-frees. To
+ // manage this we do a few things:
+ //
+ // * This "bomb" here will call `release_node` if dropped
+ // abnormally. That way we'll be sure the memory management
+ // of the `node` is managed correctly.
+ //
+ // * We unlink the node from our internal queue to preemptively
+ // assume is is complete (will return Ready or panic), in
+ // which case we'll want to discard it regardless.
+ //
+ struct Bomb<'a, U: Unpark + 'a> {
+ borrow: &'a mut Borrow<'a, U>,
+ enter: &'a mut Enter,
+ node: Option<Arc<Node<U>>>,
+ }
+
+ impl<'a, U: Unpark> Drop for Bomb<'a, U> {
+ fn drop(&mut self) {
+ if let Some(node) = self.node.take() {
+ self.borrow.enter(self.enter, || release_node(node))
+ }
+ }
+ }
+
+ let node = self.nodes.remove(node);
+
+ let mut borrow = Borrow {
+ id: eid,
+ scheduler: self,
+ num_futures,
+ };
+
+ let mut bomb = Bomb {
+ node: Some(node),
+ enter: enter,
+ borrow: &mut borrow,
+ };
+
+ let mut done = false;
+
+ // Now that the bomb holds the node, create a new scope. This
+ // scope ensures that the borrow will go out of scope before we
+ // mutate the node pointer in `bomb` again
+ {
+ let node = bomb.node.as_ref().unwrap();
+
+ // Get a reference to the inner future. We already ensured
+ // that the item `is_some`.
+ let item = (*node.item.get()).as_mut().unwrap();
+
+ // Unset queued flag... this must be done before
+ // polling. This ensures that the item gets
+ // rescheduled if it is notified **during** a call
+ // to `poll`.
+ let prev = (*node).queued.swap(false, SeqCst);
+ assert!(prev);
+
+ // Poll the underlying item with the appropriate `notify`
+ // implementation. This is where a large bit of the unsafety
+ // starts to stem from internally. The `notify` instance itself
+ // is basically just our `Arc<Node>` and tracks the mpsc
+ // queue of ready items.
+ //
+ // Critically though `Node` won't actually access `Task`, the
+ // item, while it's floating around inside of `Task`
+ // instances. These structs will basically just use `T` to size
+ // the internal allocation, appropriately accessing fields and
+ // deallocating the node if need be.
+ let borrow = &mut *bomb.borrow;
+ let enter = &mut *bomb.enter;
+ let notify = Notify(bomb.node.as_ref().unwrap());
+
+ let mut scheduled = Scheduled {
+ task: item,
+ notify: &notify,
+ done: &mut done,
+ };
+
+ if borrow.enter(enter, || scheduled.tick()) {
+ // we have a borrow of the Runtime, so we know it's not shut down
+ borrow.num_futures.fetch_sub(2, SeqCst);
+ }
+ }
+
+ if !done {
+ // The future is not done, push it back into the "all
+ // node" list.
+ let node = bomb.node.take().unwrap();
+ bomb.borrow.scheduler.nodes.push_back(node);
+ }
+ }
+ }
+ }
+}
+
+impl<'a, U: Unpark> Scheduled<'a, U> {
+ /// Polls the task, returns `true` if the task has completed.
+ pub fn tick(&mut self) -> bool {
+ // Tick the future
+ let ret = match self.task.0.poll_future_notify(self.notify, 0) {
+ Ok(Async::Ready(_)) | Err(_) => true,
+ Ok(Async::NotReady) => false,
+ };
+
+ *self.done = ret;
+ ret
+ }
+}
+
+impl Task {
+ pub fn new(future: Box<Future<Item = (), Error = ()> + 'static>) -> Self {
+ Task(executor::spawn(future))
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Task").finish()
+ }
+}
+
+fn release_node<U>(node: Arc<Node<U>>) {
+ // The item is done, try to reset the queued flag. This will prevent
+ // `notify` from doing any work in the item
+ let prev = node.queued.swap(true, SeqCst);
+
+ // Drop the item, even if it hasn't finished yet. This is safe
+ // because we're dropping the item on the thread that owns
+ // `Scheduler`, which correctly tracks T's lifetimes and such.
+ unsafe {
+ drop((*node.item.get()).take());
+ }
+
+ // If the queued flag was previously set then it means that this node
+ // is still in our internal mpsc queue. We then transfer ownership
+ // of our reference count to the mpsc queue, and it'll come along and
+ // free it later, noticing that the item is `None`.
+ //
+ // If, however, the queued flag was *not* set then we're safe to
+ // release our reference count on the internal node. The queued flag
+ // was set above so all item `enqueue` operations will not actually
+ // enqueue the node, so our node will never see the mpsc queue again.
+ // The node itself will be deallocated once all reference counts have
+ // been dropped by the various owning tasks elsewhere.
+ if prev {
+ mem::forget(node);
+ }
+}
+
+impl<U> Debug for Scheduler<U> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "Scheduler {{ ... }}")
+ }
+}
+
+impl<U> Drop for Scheduler<U> {
+ fn drop(&mut self) {
+ // When a `Scheduler` is dropped we want to drop all items associated
+ // with it. At the same time though there may be tons of `Task` handles
+ // flying around which contain `Node` references inside them. We'll
+ // let those naturally get deallocated when the `Task` itself goes out
+ // of scope or gets notified.
+ while let Some(node) = self.nodes.pop_front() {
+ release_node(node);
+ }
+
+ // Note that at this point we could still have a bunch of nodes in the
+ // mpsc queue. None of those nodes, however, have items associated
+ // with them so they're safe to destroy on any thread. At this point
+ // the `Scheduler` struct, the owner of the one strong reference
+ // to `Inner` will drop the strong reference. At that point
+ // whichever thread releases the strong refcount last (be it this
+ // thread or some other thread as part of an `upgrade`) will clear out
+ // the mpsc queue and free all remaining nodes.
+ //
+ // While that freeing operation isn't guaranteed to happen here, it's
+ // guaranteed to happen "promptly" as no more "blocking work" will
+ // happen while there's a strong refcount held.
+ }
+}
+
+impl<U> Inner<U> {
+ /// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
+ fn enqueue(&self, node: *const Node<U>) {
+ unsafe {
+ debug_assert!((*node).queued.load(Relaxed));
+
+ // This action does not require any coordination
+ (*node).next_readiness.store(ptr::null_mut(), Relaxed);
+
+ // Note that these atomic orderings come from 1024cores
+ let node = node as *mut _;
+ let prev = self.head_readiness.swap(node, AcqRel);
+ (*prev).next_readiness.store(node, Release);
+ }
+ }
+
+ /// Returns `true` if there are currently any pending futures
+ ///
+ /// See `dequeue` for an explanation why this function is unsafe.
+ unsafe fn has_pending_futures(&self) -> bool {
+ let tail = *self.tail_readiness.get();
+ let next = (*tail).next_readiness.load(Acquire);
+
+ if tail == self.stub() {
+ if next.is_null() {
+ return false;
+ }
+ }
+
+ true
+ }
+
+ /// The dequeue function from the 1024cores intrusive MPSC queue algorithm
+ ///
+ /// Note that this unsafe as it required mutual exclusion (only one thread
+ /// can call this) to be guaranteed elsewhere.
+ unsafe fn dequeue(&self, tick: Option<usize>) -> Dequeue<U> {
+ let mut tail = *self.tail_readiness.get();
+ let mut next = (*tail).next_readiness.load(Acquire);
+
+ if tail == self.stub() {
+ if next.is_null() {
+ return Dequeue::Empty;
+ }
+
+ *self.tail_readiness.get() = next;
+ tail = next;
+ next = (*next).next_readiness.load(Acquire);
+ }
+
+ if let Some(tick) = tick {
+ let actual = (*tail).notified_at.load(SeqCst);
+
+ // Only dequeue if the node was not scheduled during the current
+ // tick.
+ if actual == tick {
+ // Only doing the check above **should** be enough in
+ // practice. However, technically there is a potential for
+ // deadlocking if there are `usize::MAX` ticks while the thread
+ // scheduling the task is frozen.
+ //
+ // If, for some reason, this is not enough, calling `unpark`
+ // here will resolve the issue.
+ return Dequeue::Yield;
+ }
+ }
+
+ if !next.is_null() {
+ *self.tail_readiness.get() = next;
+ debug_assert!(tail != self.stub());
+ return Dequeue::Data(tail);
+ }
+
+ if self.head_readiness.load(Acquire) as *const _ != tail {
+ return Dequeue::Inconsistent;
+ }
+
+ self.enqueue(self.stub());
+
+ next = (*tail).next_readiness.load(Acquire);
+
+ if !next.is_null() {
+ *self.tail_readiness.get() = next;
+ return Dequeue::Data(tail);
+ }
+
+ Dequeue::Inconsistent
+ }
+
+ fn stub(&self) -> *const Node<U> {
+ &*self.stub
+ }
+}
+
+impl<U> Drop for Inner<U> {
+ fn drop(&mut self) {
+ // Once we're in the destructor for `Inner` we need to clear out the
+ // mpsc queue of nodes if there's anything left in there.
+ //
+ // Note that each node has a strong reference count associated with it
+ // which is owned by the mpsc queue. All nodes should have had their
+ // items dropped already by the `Scheduler` destructor above,
+ // so we're just pulling out nodes and dropping their refcounts.
+ unsafe {
+ loop {
+ match self.dequeue(None) {
+ Dequeue::Empty => break,
+ Dequeue::Yield => unreachable!(),
+ Dequeue::Inconsistent => abort("inconsistent in drop"),
+ Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
+ }
+ }
+ }
+ }
+}
+
+impl<U> List<U> {
+ fn new() -> Self {
+ List {
+ len: 0,
+ head: ptr::null_mut(),
+ tail: ptr::null_mut(),
+ }
+ }
+
+ /// Appends an element to the back of the list
+ fn push_back(&mut self, node: Arc<Node<U>>) -> *const Node<U> {
+ let ptr = arc2ptr(node);
+
+ unsafe {
+ // Point to the current last node in the list
+ *(*ptr).prev_all.get() = self.tail;
+ *(*ptr).next_all.get() = ptr::null_mut();
+
+ if !self.tail.is_null() {
+ *(*self.tail).next_all.get() = ptr;
+ self.tail = ptr;
+ } else {
+ // This is the first node
+ self.tail = ptr;
+ self.head = ptr;
+ }
+ }
+
+ self.len += 1;
+
+ return ptr;
+ }
+
+ /// Pop an element from the front of the list
+ fn pop_front(&mut self) -> Option<Arc<Node<U>>> {
+ if self.head.is_null() {
+ // The list is empty
+ return None;
+ }
+
+ self.len -= 1;
+
+ unsafe {
+ // Convert the ptr to Arc<_>
+ let node = ptr2arc(self.head);
+
+ // Update the head pointer
+ self.head = *node.next_all.get();
+
+ // If the pointer is null, then the list is empty
+ if self.head.is_null() {
+ self.tail = ptr::null_mut();
+ } else {
+ *(*self.head).prev_all.get() = ptr::null_mut();
+ }
+
+ Some(node)
+ }
+ }
+
+ /// Remove a specific node
+ unsafe fn remove(&mut self, node: *const Node<U>) -> Arc<Node<U>> {
+ let node = ptr2arc(node);
+ let next = *node.next_all.get();
+ let prev = *node.prev_all.get();
+ *node.next_all.get() = ptr::null_mut();
+ *node.prev_all.get() = ptr::null_mut();
+
+ if !next.is_null() {
+ *(*next).prev_all.get() = prev;
+ } else {
+ self.tail = prev;
+ }
+
+ if !prev.is_null() {
+ *(*prev).next_all.get() = next;
+ } else {
+ self.head = next;
+ }
+
+ self.len -= 1;
+
+ return node;
+ }
+}
+
+impl<'a, U> Clone for Notify<'a, U> {
+ fn clone(&self) -> Self {
+ Notify(self.0)
+ }
+}
+
+impl<'a, U> fmt::Debug for Notify<'a, U> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Notify").finish()
+ }
+}
+
+impl<'a, U: Unpark> From<Notify<'a, U>> for NotifyHandle {
+ fn from(handle: Notify<'a, U>) -> NotifyHandle {
+ unsafe {
+ let ptr = handle.0.clone();
+ let ptr = mem::transmute::<Arc<Node<U>>, *mut ArcNode<U>>(ptr);
+ NotifyHandle::new(hide_lt(ptr))
+ }
+ }
+}
+
+struct ArcNode<U>(PhantomData<U>);
+
+// We should never touch `Task` on any thread other than the one owning
+// `Scheduler`, so this should be a safe operation.
+unsafe impl<U: Sync + Send> Send for ArcNode<U> {}
+unsafe impl<U: Sync + Send> Sync for ArcNode<U> {}
+
+impl<U: Unpark> executor::Notify for ArcNode<U> {
+ fn notify(&self, _id: usize) {
+ unsafe {
+ let me: *const ArcNode<U> = self;
+ let me: *const *const ArcNode<U> = &me;
+ let me = me as *const Arc<Node<U>>;
+ Node::notify(&*me)
+ }
+ }
+}
+
+unsafe impl<U: Unpark> UnsafeNotify for ArcNode<U> {
+ unsafe fn clone_raw(&self) -> NotifyHandle {
+ let me: *const ArcNode<U> = self;
+ let me: *const *const ArcNode<U> = &me;
+ let me = &*(me as *const Arc<Node<U>>);
+ Notify(me).into()
+ }
+
+ unsafe fn drop_raw(&self) {
+ let mut me: *const ArcNode<U> = self;
+ let me = &mut me as *mut *const ArcNode<U> as *mut Arc<Node<U>>;
+ ptr::drop_in_place(me);
+ }
+}
+
+unsafe fn hide_lt<U: Unpark>(p: *mut ArcNode<U>) -> *mut UnsafeNotify {
+ mem::transmute(p as *mut UnsafeNotify)
+}
+
+impl<U: Unpark> Node<U> {
+ fn notify(me: &Arc<Node<U>>) {
+ let inner = match me.queue.upgrade() {
+ Some(inner) => inner,
+ None => return,
+ };
+
+ // It's our job to notify the node that it's ready to get polled,
+ // meaning that we need to enqueue it into the readiness queue. To
+ // do this we flag that we're ready to be queued, and if successful
+ // we then do the literal queueing operation, ensuring that we're
+ // only queued once.
+ //
+ // Once the node is inserted we be sure to notify the parent task,
+ // as it'll want to come along and pick up our node now.
+ //
+ // Note that we don't change the reference count of the node here,
+ // we're just enqueueing the raw pointer. The `Scheduler`
+ // implementation guarantees that if we set the `queued` flag true that
+ // there's a reference count held by the main `Scheduler` queue
+ // still.
+ let prev = me.queued.swap(true, SeqCst);
+ if !prev {
+ // Get the current scheduler tick
+ let tick_num = inner.tick_num.load(SeqCst);
+ me.notified_at.store(tick_num, SeqCst);
+
+ inner.enqueue(&**me);
+ inner.unpark.unpark();
+ }
+ }
+}
+
+impl<U> Drop for Node<U> {
+ fn drop(&mut self) {
+ // Currently a `Node` is sent across all threads for any lifetime,
+ // regardless of `T`. This means that for memory safety we can't
+ // actually touch `T` at any time except when we have a reference to the
+ // `Scheduler` itself.
+ //
+ // Consequently it *should* be the case that we always drop items from
+ // the `Scheduler` instance, but this is a bomb in place to catch
+ // any bugs in that logic.
+ unsafe {
+ if (*self.item.get()).is_some() {
+ abort("item still here when dropping");
+ }
+ }
+ }
+}
+
+fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
+ let addr = &*ptr as *const T;
+ mem::forget(ptr);
+ return addr;
+}
+
+unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
+ let anchor = mem::transmute::<usize, Arc<T>>(0x10);
+ let addr = &*anchor as *const T;
+ mem::forget(anchor);
+ let offset = addr as isize - 0x10;
+ mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
+}
+
+fn abort(s: &str) -> ! {
+ struct DoublePanic;
+
+ impl Drop for DoublePanic {
+ fn drop(&mut self) {
+ panic!("panicking twice to abort the program");
+ }
+ }
+
+ let _bomb = DoublePanic;
+ panic!("{}", s);
+}
diff --git a/third_party/rust/tokio-current-thread/tests/current_thread.rs b/third_party/rust/tokio-current-thread/tests/current_thread.rs
new file mode 100644
index 0000000000..0ed0ca2467
--- /dev/null
+++ b/third_party/rust/tokio-current-thread/tests/current_thread.rs
@@ -0,0 +1,837 @@
+extern crate futures;
+extern crate tokio_current_thread;
+extern crate tokio_executor;
+
+use tokio_current_thread::{block_on_all, CurrentThread};
+
+use std::any::Any;
+use std::cell::{Cell, RefCell};
+use std::rc::Rc;
+use std::thread;
+use std::time::Duration;
+
+use futures::future::{self, lazy};
+use futures::task;
+// This is not actually unused --- we need this trait to be in scope for
+// the tests that sue TaskExecutor::current().execute(). The compiler
+// doesn't realise that.
+#[allow(unused_imports)]
+use futures::future::Executor as _futures_Executor;
+use futures::prelude::*;
+use futures::sync::oneshot;
+
+mod from_block_on_all {
+ use super::*;
+ fn test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F) {
+ let cnt = Rc::new(Cell::new(0));
+ let c = cnt.clone();
+
+ let msg = tokio_current_thread::block_on_all(lazy(move || {
+ c.set(1 + c.get());
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ c.set(1 + c.get());
+ Ok::<(), ()>(())
+ })));
+
+ Ok::<_, ()>("hello")
+ }))
+ .unwrap();
+
+ assert_eq!(2, cnt.get());
+ assert_eq!(msg, "hello");
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn)
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ });
+ }
+}
+
+#[test]
+fn block_waits() {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(|| {
+ thread::sleep(Duration::from_millis(1000));
+ tx.send(()).unwrap();
+ });
+
+ let cnt = Rc::new(Cell::new(0));
+ let cnt2 = cnt.clone();
+
+ block_on_all(rx.then(move |_| {
+ cnt.set(1 + cnt.get());
+ Ok::<_, ()>(())
+ }))
+ .unwrap();
+
+ assert_eq!(1, cnt2.get());
+}
+
+#[test]
+fn spawn_many() {
+ const ITER: usize = 200;
+
+ let cnt = Rc::new(Cell::new(0));
+ let mut tokio_current_thread = CurrentThread::new();
+
+ for _ in 0..ITER {
+ let cnt = cnt.clone();
+ tokio_current_thread.spawn(lazy(move || {
+ cnt.set(1 + cnt.get());
+ Ok::<(), ()>(())
+ }));
+ }
+
+ tokio_current_thread.run().unwrap();
+
+ assert_eq!(cnt.get(), ITER);
+}
+
+mod does_not_set_global_executor_by_default {
+ use super::*;
+
+ fn test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>(
+ spawn: F,
+ ) {
+ block_on_all(lazy(|| {
+ spawn(Box::new(lazy(|| ok()))).unwrap_err();
+ ok()
+ }))
+ .unwrap()
+ }
+
+ #[test]
+ fn spawn() {
+ use tokio_executor::Executor;
+ test(|f| tokio_executor::DefaultExecutor::current().spawn(f))
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| tokio_executor::DefaultExecutor::current().execute(f))
+ }
+}
+
+mod from_block_on_future {
+ use super::*;
+
+ fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) {
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut tokio_current_thread = CurrentThread::new();
+
+ tokio_current_thread
+ .block_on(lazy(|| {
+ let cnt = cnt.clone();
+
+ spawn(Box::new(lazy(move || {
+ cnt.set(1 + cnt.get());
+ Ok(())
+ })));
+
+ Ok::<_, ()>(())
+ }))
+ .unwrap();
+
+ tokio_current_thread.run().unwrap();
+
+ assert_eq!(1, cnt.get());
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn);
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ });
+ }
+}
+
+struct Never(Rc<()>);
+
+impl Future for Never {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(Async::NotReady)
+ }
+}
+
+mod outstanding_tasks_are_dropped_when_executor_is_dropped {
+ use super::*;
+
+ fn test<F, G>(spawn: F, dotspawn: G)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
+ G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
+ {
+ let mut rc = Rc::new(());
+
+ let mut tokio_current_thread = CurrentThread::new();
+ dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone())));
+
+ drop(tokio_current_thread);
+
+ // Ensure the daemon is dropped
+ assert!(Rc::get_mut(&mut rc).is_some());
+
+ // Using the global spawn fn
+
+ let mut rc = Rc::new(());
+
+ let mut tokio_current_thread = CurrentThread::new();
+
+ tokio_current_thread
+ .block_on(lazy(|| {
+ spawn(Box::new(Never(rc.clone())));
+ Ok::<_, ()>(())
+ }))
+ .unwrap();
+
+ drop(tokio_current_thread);
+
+ // Ensure the daemon is dropped
+ assert!(Rc::get_mut(&mut rc).is_some());
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn, |rt, f| {
+ rt.spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(
+ |f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ },
+ // Note: `CurrentThread` doesn't currently implement
+ // `futures::Executor`, so we'll call `.spawn(...)` rather than
+ // `.execute(...)` for now. If `CurrentThread` is changed to
+ // implement Executor, change this to `.execute(...).unwrap()`.
+ |rt, f| {
+ rt.spawn(f);
+ },
+ );
+ }
+}
+
+#[test]
+#[should_panic]
+fn nesting_run() {
+ block_on_all(lazy(|| {
+ block_on_all(lazy(|| ok())).unwrap();
+
+ ok()
+ }))
+ .unwrap();
+}
+
+mod run_in_future {
+ use super::*;
+
+ #[test]
+ #[should_panic]
+ fn spawn() {
+ block_on_all(lazy(|| {
+ tokio_current_thread::spawn(lazy(|| {
+ block_on_all(lazy(|| ok())).unwrap();
+ ok()
+ }));
+ ok()
+ }))
+ .unwrap();
+ }
+
+ #[test]
+ #[should_panic]
+ fn execute() {
+ block_on_all(lazy(|| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(lazy(|| {
+ block_on_all(lazy(|| ok())).unwrap();
+ ok()
+ }))
+ .unwrap();
+ ok()
+ }))
+ .unwrap();
+ }
+}
+
+#[test]
+fn tick_on_infini_future() {
+ let num = Rc::new(Cell::new(0));
+
+ struct Infini {
+ num: Rc<Cell<usize>>,
+ }
+
+ impl Future for Infini {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ self.num.set(1 + self.num.get());
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+
+ CurrentThread::new()
+ .spawn(Infini { num: num.clone() })
+ .turn(None)
+ .unwrap();
+
+ assert_eq!(1, num.get());
+}
+
+mod tasks_are_scheduled_fairly {
+ use super::*;
+ struct Spin {
+ state: Rc<RefCell<[i32; 2]>>,
+ idx: usize,
+ }
+
+ impl Future for Spin {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ let mut state = self.state.borrow_mut();
+
+ if self.idx == 0 {
+ let diff = state[0] - state[1];
+
+ assert!(diff.abs() <= 1);
+
+ if state[0] >= 50 {
+ return Ok(().into());
+ }
+ }
+
+ state[self.idx] += 1;
+
+ if state[self.idx] >= 100 {
+ return Ok(().into());
+ }
+
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+
+ fn test<F: Fn(Spin)>(spawn: F) {
+ let state = Rc::new(RefCell::new([0, 0]));
+
+ block_on_all(lazy(|| {
+ spawn(Spin {
+ state: state.clone(),
+ idx: 0,
+ });
+
+ spawn(Spin {
+ state: state,
+ idx: 1,
+ });
+
+ ok()
+ }))
+ .unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn)
+ }
+
+ #[test]
+ fn execute() {
+ test(|f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ })
+ }
+}
+
+mod and_turn {
+ use super::*;
+
+ fn test<F, G>(spawn: F, dotspawn: G)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
+ G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
+ {
+ let cnt = Rc::new(Cell::new(0));
+ let c = cnt.clone();
+
+ let mut tokio_current_thread = CurrentThread::new();
+
+ // Spawn a basic task to get the executor to turn
+ dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(()))));
+
+ // Turn once...
+ tokio_current_thread.turn(None).unwrap();
+
+ dotspawn(
+ &mut tokio_current_thread,
+ Box::new(lazy(move || {
+ c.set(1 + c.get());
+
+ // Spawn!
+ spawn(Box::new(lazy(move || {
+ c.set(1 + c.get());
+ Ok::<(), ()>(())
+ })));
+
+ Ok(())
+ })),
+ );
+
+ // This does not run the newly spawned thread
+ tokio_current_thread.turn(None).unwrap();
+ assert_eq!(1, cnt.get());
+
+ // This runs the newly spawned thread
+ tokio_current_thread.turn(None).unwrap();
+ assert_eq!(2, cnt.get());
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn, |rt, f| {
+ rt.spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(
+ |f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ },
+ // Note: `CurrentThread` doesn't currently implement
+ // `futures::Executor`, so we'll call `.spawn(...)` rather than
+ // `.execute(...)` for now. If `CurrentThread` is changed to
+ // implement Executor, change this to `.execute(...).unwrap()`.
+ |rt, f| {
+ rt.spawn(f);
+ },
+ );
+ }
+
+}
+
+mod in_drop {
+ use super::*;
+ struct OnDrop<F: FnOnce()>(Option<F>);
+
+ impl<F: FnOnce()> Drop for OnDrop<F> {
+ fn drop(&mut self) {
+ (self.0.take().unwrap())();
+ }
+ }
+
+ struct MyFuture {
+ _data: Box<Any>,
+ }
+
+ impl Future for MyFuture {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(().into())
+ }
+ }
+
+ fn test<F, G>(spawn: F, dotspawn: G)
+ where
+ F: Fn(Box<Future<Item = (), Error = ()>>) + 'static,
+ G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>),
+ {
+ let mut tokio_current_thread = CurrentThread::new();
+
+ let (tx, rx) = oneshot::channel();
+
+ dotspawn(
+ &mut tokio_current_thread,
+ Box::new(MyFuture {
+ _data: Box::new(OnDrop(Some(move || {
+ spawn(Box::new(lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ })));
+ }))),
+ }),
+ );
+
+ tokio_current_thread.block_on(rx).unwrap();
+ tokio_current_thread.run().unwrap();
+ }
+
+ #[test]
+ fn spawn() {
+ test(tokio_current_thread::spawn, |rt, f| {
+ rt.spawn(f);
+ })
+ }
+
+ #[test]
+ fn execute() {
+ test(
+ |f| {
+ tokio_current_thread::TaskExecutor::current()
+ .execute(f)
+ .unwrap();
+ },
+ // Note: `CurrentThread` doesn't currently implement
+ // `futures::Executor`, so we'll call `.spawn(...)` rather than
+ // `.execute(...)` for now. If `CurrentThread` is changed to
+ // implement Executor, change this to `.execute(...).unwrap()`.
+ |rt, f| {
+ rt.spawn(f);
+ },
+ );
+ }
+
+}
+
+#[test]
+fn hammer_turn() {
+ use futures::sync::mpsc;
+
+ const ITER: usize = 100;
+ const N: usize = 100;
+ const THREADS: usize = 4;
+
+ for _ in 0..ITER {
+ let mut ths = vec![];
+
+ // Add some jitter
+ for _ in 0..THREADS {
+ let th = thread::spawn(|| {
+ let mut tokio_current_thread = CurrentThread::new();
+
+ let (tx, rx) = mpsc::unbounded();
+
+ tokio_current_thread.spawn({
+ let cnt = Rc::new(Cell::new(0));
+ let c = cnt.clone();
+
+ rx.for_each(move |_| {
+ c.set(1 + c.get());
+ Ok(())
+ })
+ .map_err(|e| panic!("err={:?}", e))
+ .map(move |v| {
+ assert_eq!(N, cnt.get());
+ v
+ })
+ });
+
+ thread::spawn(move || {
+ for _ in 0..N {
+ tx.unbounded_send(()).unwrap();
+ thread::yield_now();
+ }
+ });
+
+ while !tokio_current_thread.is_idle() {
+ tokio_current_thread.turn(None).unwrap();
+ }
+ });
+
+ ths.push(th);
+ }
+
+ for th in ths {
+ th.join().unwrap();
+ }
+ }
+}
+
+#[test]
+fn turn_has_polled() {
+ let mut tokio_current_thread = CurrentThread::new();
+
+ // Spawn oneshot receiver
+ let (sender, receiver) = oneshot::channel::<()>();
+ tokio_current_thread.spawn(receiver.then(|_| Ok(())));
+
+ // Turn once...
+ let res = tokio_current_thread
+ .turn(Some(Duration::from_millis(0)))
+ .unwrap();
+
+ // Should've polled the receiver once, but considered it not ready
+ assert!(res.has_polled());
+
+ // Turn another time
+ let res = tokio_current_thread
+ .turn(Some(Duration::from_millis(0)))
+ .unwrap();
+
+ // Should've polled nothing, the receiver is not ready yet
+ assert!(!res.has_polled());
+
+ // Make the receiver ready
+ sender.send(()).unwrap();
+
+ // Turn another time
+ let res = tokio_current_thread
+ .turn(Some(Duration::from_millis(0)))
+ .unwrap();
+
+ // Should've polled the receiver, it's ready now
+ assert!(res.has_polled());
+
+ // Now the executor should be empty
+ assert!(tokio_current_thread.is_idle());
+ let res = tokio_current_thread
+ .turn(Some(Duration::from_millis(0)))
+ .unwrap();
+
+ // So should've polled nothing
+ assert!(!res.has_polled());
+}
+
+// Our own mock Park that is never really waiting and the only
+// thing it does is to send, on request, something (once) to a oneshot
+// channel
+struct MyPark {
+ sender: Option<oneshot::Sender<()>>,
+ send_now: Rc<Cell<bool>>,
+}
+
+struct MyUnpark;
+
+impl tokio_executor::park::Park for MyPark {
+ type Unpark = MyUnpark;
+ type Error = ();
+
+ fn unpark(&self) -> Self::Unpark {
+ MyUnpark
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ // If called twice with send_now, this will intentionally panic
+ if self.send_now.get() {
+ self.sender.take().unwrap().send(()).unwrap();
+ }
+
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
+ self.park()
+ }
+}
+
+impl tokio_executor::park::Unpark for MyUnpark {
+ fn unpark(&self) {}
+}
+
+#[test]
+fn turn_fair() {
+ let send_now = Rc::new(Cell::new(false));
+
+ let (sender, receiver) = oneshot::channel::<()>();
+ let (sender_2, receiver_2) = oneshot::channel::<()>();
+ let (sender_3, receiver_3) = oneshot::channel::<()>();
+
+ let my_park = MyPark {
+ sender: Some(sender_3),
+ send_now: send_now.clone(),
+ };
+
+ let mut tokio_current_thread = CurrentThread::new_with_park(my_park);
+
+ let receiver_1_done = Rc::new(Cell::new(false));
+ let receiver_1_done_clone = receiver_1_done.clone();
+
+ // Once an item is received on the oneshot channel, it will immediately
+ // immediately make the second oneshot channel ready
+ tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| {
+ sender_2.send(()).unwrap();
+ receiver_1_done_clone.set(true);
+
+ Ok(())
+ }));
+
+ let receiver_2_done = Rc::new(Cell::new(false));
+ let receiver_2_done_clone = receiver_2_done.clone();
+
+ tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| {
+ receiver_2_done_clone.set(true);
+ Ok(())
+ }));
+
+ // The third receiver is only woken up from our Park implementation, it simulates
+ // e.g. a socket that first has to be polled to know if it is ready now
+ let receiver_3_done = Rc::new(Cell::new(false));
+ let receiver_3_done_clone = receiver_3_done.clone();
+
+ tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| {
+ receiver_3_done_clone.set(true);
+ Ok(())
+ }));
+
+ // First turn should've polled both and considered them not ready
+ let res = tokio_current_thread
+ .turn(Some(Duration::from_millis(0)))
+ .unwrap();
+ assert!(res.has_polled());
+
+ // Next turn should've polled nothing
+ let res = tokio_current_thread
+ .turn(Some(Duration::from_millis(0)))
+ .unwrap();
+ assert!(!res.has_polled());
+
+ assert!(!receiver_1_done.get());
+ assert!(!receiver_2_done.get());
+ assert!(!receiver_3_done.get());
+
+ // After this the receiver future will wake up the second receiver future,
+ // so there are pending futures again
+ sender.send(()).unwrap();
+
+ // Now the first receiver should be done, the second receiver should be ready
+ // to be polled again and the socket not yet
+ let res = tokio_current_thread.turn(None).unwrap();
+ assert!(res.has_polled());
+
+ assert!(receiver_1_done.get());
+ assert!(!receiver_2_done.get());
+ assert!(!receiver_3_done.get());
+
+ // Now let our park implementation know that it should send something to sender 3
+ send_now.set(true);
+
+ // This should resolve the second receiver directly, but also poll the socket
+ // and read the packet from it. If it didn't do both here, we would handle
+ // futures that are woken up from the reactor and directly unfairly and would
+ // favour the ones that are woken up directly.
+ let res = tokio_current_thread.turn(None).unwrap();
+ assert!(res.has_polled());
+
+ assert!(receiver_1_done.get());
+ assert!(receiver_2_done.get());
+ assert!(receiver_3_done.get());
+
+ // Don't send again
+ send_now.set(false);
+
+ // Now we should be idle and turning should not poll anything
+ assert!(tokio_current_thread.is_idle());
+ let res = tokio_current_thread.turn(None).unwrap();
+ assert!(!res.has_polled());
+}
+
+#[test]
+fn spawn_from_other_thread() {
+ let mut current_thread = CurrentThread::new();
+
+ let handle = current_thread.handle();
+ let (sender, receiver) = oneshot::channel::<()>();
+
+ thread::spawn(move || {
+ handle
+ .spawn(lazy(move || {
+ sender.send(()).unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ });
+
+ let _ = current_thread.block_on(receiver).unwrap();
+}
+
+#[test]
+fn spawn_from_other_thread_unpark() {
+ use std::sync::mpsc::channel as mpsc_channel;
+
+ let mut current_thread = CurrentThread::new();
+
+ let handle = current_thread.handle();
+ let (sender_1, receiver_1) = oneshot::channel::<()>();
+ let (sender_2, receiver_2) = mpsc_channel::<()>();
+
+ thread::spawn(move || {
+ let _ = receiver_2.recv().unwrap();
+
+ handle
+ .spawn(lazy(move || {
+ sender_1.send(()).unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ });
+
+ // Ensure that unparking the executor works correctly. It will first
+ // check if there are new futures (there are none), then execute the
+ // lazy future below which will cause the future to be spawned from
+ // the other thread. Then the executor will park but should be woken
+ // up because *now* we have a new future to schedule
+ let _ = current_thread
+ .block_on(
+ lazy(move || {
+ sender_2.send(()).unwrap();
+ Ok(())
+ })
+ .and_then(|_| receiver_1),
+ )
+ .unwrap();
+}
+
+#[test]
+fn spawn_from_executor_with_handle() {
+ let mut current_thread = CurrentThread::new();
+ let handle = current_thread.handle();
+ let (tx, rx) = oneshot::channel();
+
+ current_thread.spawn(lazy(move || {
+ handle
+ .spawn(lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ Ok::<_, ()>(())
+ }));
+
+ current_thread.run();
+
+ rx.wait().unwrap();
+}
+
+fn ok() -> future::FutureResult<(), ()> {
+ future::ok(())
+}