summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/tests/threadpool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/tests/threadpool.rs')
-rw-r--r--third_party/rust/tokio-threadpool/tests/threadpool.rs555
1 files changed, 555 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/tests/threadpool.rs b/third_party/rust/tokio-threadpool/tests/threadpool.rs
new file mode 100644
index 0000000000..bd6736f9aa
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/tests/threadpool.rs
@@ -0,0 +1,555 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_threadpool;
+
+use tokio_executor::park::{Park, Unpark};
+use tokio_threadpool::park::{DefaultPark, DefaultUnpark};
+use tokio_threadpool::*;
+
+use futures::future::lazy;
+use futures::{Async, Future, Poll, Sink, Stream};
+
+use std::cell::Cell;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::atomic::*;
+use std::sync::{mpsc, Arc};
+use std::time::Duration;
+
+thread_local!(static FOO: Cell<u32> = Cell::new(0));
+
+fn ignore_results<F: Future + Send + 'static>(
+ f: F,
+) -> Box<dyn Future<Item = (), Error = ()> + Send> {
+ Box::new(f.map(|_| ()).map_err(|_| ()))
+}
+
+#[test]
+fn natural_shutdown_simple_futures() {
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+
+ FOO.with(|f| {
+ f.set(1);
+
+ let pool = {
+ let num_inc = num_inc.clone();
+ let num_dec = num_dec.clone();
+
+ Builder::new()
+ .around_worker(move |w, _| {
+ num_inc.fetch_add(1, Relaxed);
+ w.run();
+ num_dec.fetch_add(1, Relaxed);
+ })
+ .build()
+ };
+
+ let tx = pool.sender().clone();
+
+ let a = {
+ let (t, rx) = mpsc::channel();
+ tx.spawn(lazy(move || {
+ // Makes sure this runs on a worker thread
+ FOO.with(|f| assert_eq!(f.get(), 0));
+
+ t.send("one").unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ rx
+ };
+
+ let b = {
+ let (t, rx) = mpsc::channel();
+ tx.spawn(lazy(move || {
+ // Makes sure this runs on a worker thread
+ FOO.with(|f| assert_eq!(f.get(), 0));
+
+ t.send("two").unwrap();
+ Ok(())
+ }))
+ .unwrap();
+ rx
+ };
+
+ drop(tx);
+
+ assert_eq!("one", a.recv().unwrap());
+ assert_eq!("two", b.recv().unwrap());
+
+ // Wait for the pool to shutdown
+ pool.shutdown().wait().unwrap();
+
+ // Assert that at least one thread started
+ let num_inc = num_inc.load(Relaxed);
+ assert!(num_inc > 0);
+
+ // Assert that all threads shutdown
+ let num_dec = num_dec.load(Relaxed);
+ assert_eq!(num_inc, num_dec);
+ });
+ }
+}
+
+#[test]
+fn force_shutdown_drops_futures() {
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+ let num_drop = Arc::new(AtomicUsize::new(0));
+
+ struct Never(Arc<AtomicUsize>);
+
+ impl Future for Never {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.0.fetch_add(1, Relaxed);
+ }
+ }
+
+ let a = num_inc.clone();
+ let b = num_dec.clone();
+
+ let pool = Builder::new()
+ .around_worker(move |w, _| {
+ a.fetch_add(1, Relaxed);
+ w.run();
+ b.fetch_add(1, Relaxed);
+ })
+ .build();
+ let mut tx = pool.sender().clone();
+
+ tx.spawn(Never(num_drop.clone())).unwrap();
+
+ // Wait for the pool to shutdown
+ pool.shutdown_now().wait().unwrap();
+
+ // Assert that only a single thread was spawned.
+ let a = num_inc.load(Relaxed);
+ assert!(a >= 1);
+
+ // Assert that all threads shutdown
+ let b = num_dec.load(Relaxed);
+ assert_eq!(a, b);
+
+ // Assert that the future was dropped
+ let c = num_drop.load(Relaxed);
+ assert_eq!(c, 1);
+ }
+}
+
+#[test]
+fn drop_threadpool_drops_futures() {
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..1_000 {
+ let num_inc = Arc::new(AtomicUsize::new(0));
+ let num_dec = Arc::new(AtomicUsize::new(0));
+ let num_drop = Arc::new(AtomicUsize::new(0));
+
+ struct Never(Arc<AtomicUsize>);
+
+ impl Future for Never {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ impl Drop for Never {
+ fn drop(&mut self) {
+ self.0.fetch_add(1, Relaxed);
+ }
+ }
+
+ let a = num_inc.clone();
+ let b = num_dec.clone();
+
+ let pool = Builder::new()
+ .max_blocking(2)
+ .pool_size(20)
+ .around_worker(move |w, _| {
+ a.fetch_add(1, Relaxed);
+ w.run();
+ b.fetch_add(1, Relaxed);
+ })
+ .build();
+ let mut tx = pool.sender().clone();
+
+ tx.spawn(Never(num_drop.clone())).unwrap();
+
+ // Wait for the pool to shutdown
+ drop(pool);
+
+ // Assert that only a single thread was spawned.
+ let a = num_inc.load(Relaxed);
+ assert!(a >= 1);
+
+ // Assert that all threads shutdown
+ let b = num_dec.load(Relaxed);
+ assert_eq!(a, b);
+
+ // Assert that the future was dropped
+ let c = num_drop.load(Relaxed);
+ assert_eq!(c, 1);
+ }
+}
+
+#[test]
+fn many_oneshot_futures() {
+ const NUM: usize = 10_000;
+
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..50 {
+ let pool = ThreadPool::new();
+ let mut tx = pool.sender().clone();
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ tx.spawn(lazy(move || {
+ cnt.fetch_add(1, Relaxed);
+ Ok(())
+ }))
+ .unwrap();
+ }
+
+ // Wait for the pool to shutdown
+ pool.shutdown().wait().unwrap();
+
+ let num = cnt.load(Relaxed);
+ assert_eq!(num, NUM);
+ }
+}
+
+#[test]
+fn many_multishot_futures() {
+ use futures::sync::mpsc;
+
+ const CHAIN: usize = 200;
+ const CYCLES: usize = 5;
+ const TRACKS: usize = 50;
+
+ let _ = ::env_logger::try_init();
+
+ for _ in 0..50 {
+ let pool = ThreadPool::new();
+ let mut pool_tx = pool.sender().clone();
+
+ let mut start_txs = Vec::with_capacity(TRACKS);
+ let mut final_rxs = Vec::with_capacity(TRACKS);
+
+ for _ in 0..TRACKS {
+ let (start_tx, mut chain_rx) = mpsc::channel(10);
+
+ for _ in 0..CHAIN {
+ let (next_tx, next_rx) = mpsc::channel(10);
+
+ let rx = chain_rx.map_err(|e| panic!("{:?}", e));
+
+ // Forward all the messages
+ pool_tx
+ .spawn(
+ next_tx
+ .send_all(rx)
+ .map(|_| ())
+ .map_err(|e| panic!("{:?}", e)),
+ )
+ .unwrap();
+
+ chain_rx = next_rx;
+ }
+
+ // This final task cycles if needed
+ let (final_tx, final_rx) = mpsc::channel(10);
+ let cycle_tx = start_tx.clone();
+ let mut rem = CYCLES;
+
+ let task = chain_rx.take(CYCLES as u64).for_each(move |msg| {
+ rem -= 1;
+ let send = if rem == 0 {
+ final_tx.clone().send(msg)
+ } else {
+ cycle_tx.clone().send(msg)
+ };
+
+ send.then(|res| {
+ res.unwrap();
+ Ok(())
+ })
+ });
+ pool_tx.spawn(ignore_results(task)).unwrap();
+
+ start_txs.push(start_tx);
+ final_rxs.push(final_rx);
+ }
+
+ for start_tx in start_txs {
+ start_tx.send("ping").wait().unwrap();
+ }
+
+ for final_rx in final_rxs {
+ final_rx.wait().next().unwrap().unwrap();
+ }
+
+ // Shutdown the pool
+ pool.shutdown().wait().unwrap();
+ }
+}
+
+#[test]
+fn global_executor_is_configured() {
+ let pool = ThreadPool::new();
+ let tx = pool.sender().clone();
+
+ let (signal_tx, signal_rx) = mpsc::channel();
+
+ tx.spawn(lazy(move || {
+ tokio_executor::spawn(lazy(move || {
+ signal_tx.send(()).unwrap();
+ Ok(())
+ }));
+
+ Ok(())
+ }))
+ .unwrap();
+
+ signal_rx.recv().unwrap();
+
+ pool.shutdown().wait().unwrap();
+}
+
+#[test]
+fn new_threadpool_is_idle() {
+ let pool = ThreadPool::new();
+ pool.shutdown_on_idle().wait().unwrap();
+}
+
+#[test]
+fn busy_threadpool_is_not_idle() {
+ use futures::sync::oneshot;
+
+ // let pool = ThreadPool::new();
+ let pool = Builder::new().pool_size(4).max_blocking(2).build();
+ let tx = pool.sender().clone();
+
+ let (term_tx, term_rx) = oneshot::channel();
+
+ tx.spawn(term_rx.then(|_| Ok(()))).unwrap();
+
+ let mut idle = pool.shutdown_on_idle();
+
+ struct IdleFut<'a>(&'a mut Shutdown);
+
+ impl<'a> Future for IdleFut<'a> {
+ type Item = ();
+ type Error = ();
+ fn poll(&mut self) -> Poll<(), ()> {
+ assert!(self.0.poll().unwrap().is_not_ready());
+ Ok(Async::Ready(()))
+ }
+ }
+
+ IdleFut(&mut idle).wait().unwrap();
+
+ term_tx.send(()).unwrap();
+
+ idle.wait().unwrap();
+}
+
+#[test]
+fn panic_in_task() {
+ let pool = ThreadPool::new();
+ let tx = pool.sender().clone();
+
+ struct Boom;
+
+ impl Future for Boom {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ panic!();
+ }
+ }
+
+ impl Drop for Boom {
+ fn drop(&mut self) {
+ assert!(::std::thread::panicking());
+ }
+ }
+
+ tx.spawn(Boom).unwrap();
+
+ pool.shutdown_on_idle().wait().unwrap();
+}
+
+#[test]
+fn count_panics() {
+ let counter = Arc::new(AtomicUsize::new(0));
+ let counter_ = counter.clone();
+ let pool = tokio_threadpool::Builder::new()
+ .panic_handler(move |_err| {
+ // We caught a panic.
+ counter_.fetch_add(1, Relaxed);
+ })
+ .build();
+ // Spawn a future that will panic.
+ pool.spawn(lazy(|| -> Result<(), ()> { panic!() }));
+ pool.shutdown_on_idle().wait().unwrap();
+ let counter = counter.load(Relaxed);
+ assert_eq!(counter, 1);
+}
+
+#[test]
+fn multi_threadpool() {
+ use futures::sync::oneshot;
+
+ let pool1 = ThreadPool::new();
+ let pool2 = ThreadPool::new();
+
+ let (tx, rx) = oneshot::channel();
+ let (done_tx, done_rx) = mpsc::channel();
+
+ pool2.spawn({
+ rx.and_then(move |_| {
+ done_tx.send(()).unwrap();
+ Ok(())
+ })
+ .map_err(|e| panic!("err={:?}", e))
+ });
+
+ pool1.spawn(lazy(move || {
+ tx.send(()).unwrap();
+ Ok(())
+ }));
+
+ done_rx.recv().unwrap();
+}
+
+#[test]
+fn eagerly_drops_futures() {
+ use futures::future::{empty, lazy, Future};
+ use futures::task;
+ use std::sync::mpsc;
+
+ struct NotifyOnDrop(mpsc::Sender<()>);
+
+ impl Drop for NotifyOnDrop {
+ fn drop(&mut self) {
+ self.0.send(()).unwrap();
+ }
+ }
+
+ struct MyPark {
+ inner: DefaultPark,
+ #[allow(dead_code)]
+ park_tx: mpsc::SyncSender<()>,
+ unpark_tx: mpsc::SyncSender<()>,
+ }
+
+ impl Park for MyPark {
+ type Unpark = MyUnpark;
+ type Error = <DefaultPark as Park>::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ MyUnpark {
+ inner: self.inner.unpark(),
+ unpark_tx: self.unpark_tx.clone(),
+ }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+ }
+
+ struct MyUnpark {
+ inner: DefaultUnpark,
+ #[allow(dead_code)]
+ unpark_tx: mpsc::SyncSender<()>,
+ }
+
+ impl Unpark for MyUnpark {
+ fn unpark(&self) {
+ self.inner.unpark()
+ }
+ }
+
+ let (task_tx, task_rx) = mpsc::channel();
+ let (drop_tx, drop_rx) = mpsc::channel();
+ let (park_tx, park_rx) = mpsc::sync_channel(0);
+ let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
+
+ // Get the signal that the handler dropped.
+ let notify_on_drop = NotifyOnDrop(drop_tx);
+
+ let pool = tokio_threadpool::Builder::new()
+ .custom_park(move |_| MyPark {
+ inner: DefaultPark::new(),
+ park_tx: park_tx.clone(),
+ unpark_tx: unpark_tx.clone(),
+ })
+ .build();
+
+ pool.spawn(lazy(move || {
+ // Get a handle to the current task.
+ let task = task::current();
+
+ // Send it to the main thread to hold on to.
+ task_tx.send(task).unwrap();
+
+ // This future will never resolve, it is only used to hold on to thee
+ // `notify_on_drop` handle.
+ empty::<(), ()>().then(move |_| {
+ // This code path should never be reached.
+ if true {
+ panic!()
+ }
+
+ // Explicitly drop `notify_on_drop` here, this is mostly to ensure
+ // that the `notify_on_drop` handle gets moved into the task. It
+ // will actually get dropped when the runtime is dropped.
+ drop(notify_on_drop);
+
+ Ok(())
+ })
+ }));
+
+ // Wait until we get the task handle.
+ let task = task_rx.recv().unwrap();
+
+ // Drop the pool, this should result in futures being forcefully dropped.
+ drop(pool);
+
+ // Make sure `MyPark` and `MyUnpark` were dropped during shutdown.
+ assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
+ assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
+
+ // If the future is forcefully dropped, then we will get a signal here.
+ drop_rx.recv().unwrap();
+
+ // Ensure `task` lives until after the test completes.
+ drop(task);
+}