diff options
Diffstat (limited to 'third_party/rust/tokio-threadpool/tests/threadpool.rs')
-rw-r--r-- | third_party/rust/tokio-threadpool/tests/threadpool.rs | 555 |
1 files changed, 555 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/tests/threadpool.rs b/third_party/rust/tokio-threadpool/tests/threadpool.rs new file mode 100644 index 0000000000..bd6736f9aa --- /dev/null +++ b/third_party/rust/tokio-threadpool/tests/threadpool.rs @@ -0,0 +1,555 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio_executor; +extern crate tokio_threadpool; + +use tokio_executor::park::{Park, Unpark}; +use tokio_threadpool::park::{DefaultPark, DefaultUnpark}; +use tokio_threadpool::*; + +use futures::future::lazy; +use futures::{Async, Future, Poll, Sink, Stream}; + +use std::cell::Cell; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::*; +use std::sync::{mpsc, Arc}; +use std::time::Duration; + +thread_local!(static FOO: Cell<u32> = Cell::new(0)); + +fn ignore_results<F: Future + Send + 'static>( + f: F, +) -> Box<dyn Future<Item = (), Error = ()> + Send> { + Box::new(f.map(|_| ()).map_err(|_| ())) +} + +#[test] +fn natural_shutdown_simple_futures() { + let _ = ::env_logger::try_init(); + + for _ in 0..1_000 { + let num_inc = Arc::new(AtomicUsize::new(0)); + let num_dec = Arc::new(AtomicUsize::new(0)); + + FOO.with(|f| { + f.set(1); + + let pool = { + let num_inc = num_inc.clone(); + let num_dec = num_dec.clone(); + + Builder::new() + .around_worker(move |w, _| { + num_inc.fetch_add(1, Relaxed); + w.run(); + num_dec.fetch_add(1, Relaxed); + }) + .build() + }; + + let tx = pool.sender().clone(); + + let a = { + let (t, rx) = mpsc::channel(); + tx.spawn(lazy(move || { + // Makes sure this runs on a worker thread + FOO.with(|f| assert_eq!(f.get(), 0)); + + t.send("one").unwrap(); + Ok(()) + })) + .unwrap(); + rx + }; + + let b = { + let (t, rx) = mpsc::channel(); + tx.spawn(lazy(move || { + // Makes sure this runs on a worker thread + FOO.with(|f| assert_eq!(f.get(), 0)); + + t.send("two").unwrap(); + Ok(()) + })) + .unwrap(); + rx + }; + + drop(tx); + + assert_eq!("one", a.recv().unwrap()); + assert_eq!("two", b.recv().unwrap()); + + // Wait for the pool to shutdown + pool.shutdown().wait().unwrap(); + + // Assert that at least one thread started + let num_inc = num_inc.load(Relaxed); + assert!(num_inc > 0); + + // Assert that all threads shutdown + let num_dec = num_dec.load(Relaxed); + assert_eq!(num_inc, num_dec); + }); + } +} + +#[test] +fn force_shutdown_drops_futures() { + let _ = ::env_logger::try_init(); + + for _ in 0..1_000 { + let num_inc = Arc::new(AtomicUsize::new(0)); + let num_dec = Arc::new(AtomicUsize::new(0)); + let num_drop = Arc::new(AtomicUsize::new(0)); + + struct Never(Arc<AtomicUsize>); + + impl Future for Never { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.0.fetch_add(1, Relaxed); + } + } + + let a = num_inc.clone(); + let b = num_dec.clone(); + + let pool = Builder::new() + .around_worker(move |w, _| { + a.fetch_add(1, Relaxed); + w.run(); + b.fetch_add(1, Relaxed); + }) + .build(); + let mut tx = pool.sender().clone(); + + tx.spawn(Never(num_drop.clone())).unwrap(); + + // Wait for the pool to shutdown + pool.shutdown_now().wait().unwrap(); + + // Assert that only a single thread was spawned. + let a = num_inc.load(Relaxed); + assert!(a >= 1); + + // Assert that all threads shutdown + let b = num_dec.load(Relaxed); + assert_eq!(a, b); + + // Assert that the future was dropped + let c = num_drop.load(Relaxed); + assert_eq!(c, 1); + } +} + +#[test] +fn drop_threadpool_drops_futures() { + let _ = ::env_logger::try_init(); + + for _ in 0..1_000 { + let num_inc = Arc::new(AtomicUsize::new(0)); + let num_dec = Arc::new(AtomicUsize::new(0)); + let num_drop = Arc::new(AtomicUsize::new(0)); + + struct Never(Arc<AtomicUsize>); + + impl Future for Never { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.0.fetch_add(1, Relaxed); + } + } + + let a = num_inc.clone(); + let b = num_dec.clone(); + + let pool = Builder::new() + .max_blocking(2) + .pool_size(20) + .around_worker(move |w, _| { + a.fetch_add(1, Relaxed); + w.run(); + b.fetch_add(1, Relaxed); + }) + .build(); + let mut tx = pool.sender().clone(); + + tx.spawn(Never(num_drop.clone())).unwrap(); + + // Wait for the pool to shutdown + drop(pool); + + // Assert that only a single thread was spawned. + let a = num_inc.load(Relaxed); + assert!(a >= 1); + + // Assert that all threads shutdown + let b = num_dec.load(Relaxed); + assert_eq!(a, b); + + // Assert that the future was dropped + let c = num_drop.load(Relaxed); + assert_eq!(c, 1); + } +} + +#[test] +fn many_oneshot_futures() { + const NUM: usize = 10_000; + + let _ = ::env_logger::try_init(); + + for _ in 0..50 { + let pool = ThreadPool::new(); + let mut tx = pool.sender().clone(); + let cnt = Arc::new(AtomicUsize::new(0)); + + for _ in 0..NUM { + let cnt = cnt.clone(); + tx.spawn(lazy(move || { + cnt.fetch_add(1, Relaxed); + Ok(()) + })) + .unwrap(); + } + + // Wait for the pool to shutdown + pool.shutdown().wait().unwrap(); + + let num = cnt.load(Relaxed); + assert_eq!(num, NUM); + } +} + +#[test] +fn many_multishot_futures() { + use futures::sync::mpsc; + + const CHAIN: usize = 200; + const CYCLES: usize = 5; + const TRACKS: usize = 50; + + let _ = ::env_logger::try_init(); + + for _ in 0..50 { + let pool = ThreadPool::new(); + let mut pool_tx = pool.sender().clone(); + + let mut start_txs = Vec::with_capacity(TRACKS); + let mut final_rxs = Vec::with_capacity(TRACKS); + + for _ in 0..TRACKS { + let (start_tx, mut chain_rx) = mpsc::channel(10); + + for _ in 0..CHAIN { + let (next_tx, next_rx) = mpsc::channel(10); + + let rx = chain_rx.map_err(|e| panic!("{:?}", e)); + + // Forward all the messages + pool_tx + .spawn( + next_tx + .send_all(rx) + .map(|_| ()) + .map_err(|e| panic!("{:?}", e)), + ) + .unwrap(); + + chain_rx = next_rx; + } + + // This final task cycles if needed + let (final_tx, final_rx) = mpsc::channel(10); + let cycle_tx = start_tx.clone(); + let mut rem = CYCLES; + + let task = chain_rx.take(CYCLES as u64).for_each(move |msg| { + rem -= 1; + let send = if rem == 0 { + final_tx.clone().send(msg) + } else { + cycle_tx.clone().send(msg) + }; + + send.then(|res| { + res.unwrap(); + Ok(()) + }) + }); + pool_tx.spawn(ignore_results(task)).unwrap(); + + start_txs.push(start_tx); + final_rxs.push(final_rx); + } + + for start_tx in start_txs { + start_tx.send("ping").wait().unwrap(); + } + + for final_rx in final_rxs { + final_rx.wait().next().unwrap().unwrap(); + } + + // Shutdown the pool + pool.shutdown().wait().unwrap(); + } +} + +#[test] +fn global_executor_is_configured() { + let pool = ThreadPool::new(); + let tx = pool.sender().clone(); + + let (signal_tx, signal_rx) = mpsc::channel(); + + tx.spawn(lazy(move || { + tokio_executor::spawn(lazy(move || { + signal_tx.send(()).unwrap(); + Ok(()) + })); + + Ok(()) + })) + .unwrap(); + + signal_rx.recv().unwrap(); + + pool.shutdown().wait().unwrap(); +} + +#[test] +fn new_threadpool_is_idle() { + let pool = ThreadPool::new(); + pool.shutdown_on_idle().wait().unwrap(); +} + +#[test] +fn busy_threadpool_is_not_idle() { + use futures::sync::oneshot; + + // let pool = ThreadPool::new(); + let pool = Builder::new().pool_size(4).max_blocking(2).build(); + let tx = pool.sender().clone(); + + let (term_tx, term_rx) = oneshot::channel(); + + tx.spawn(term_rx.then(|_| Ok(()))).unwrap(); + + let mut idle = pool.shutdown_on_idle(); + + struct IdleFut<'a>(&'a mut Shutdown); + + impl<'a> Future for IdleFut<'a> { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll<(), ()> { + assert!(self.0.poll().unwrap().is_not_ready()); + Ok(Async::Ready(())) + } + } + + IdleFut(&mut idle).wait().unwrap(); + + term_tx.send(()).unwrap(); + + idle.wait().unwrap(); +} + +#[test] +fn panic_in_task() { + let pool = ThreadPool::new(); + let tx = pool.sender().clone(); + + struct Boom; + + impl Future for Boom { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + panic!(); + } + } + + impl Drop for Boom { + fn drop(&mut self) { + assert!(::std::thread::panicking()); + } + } + + tx.spawn(Boom).unwrap(); + + pool.shutdown_on_idle().wait().unwrap(); +} + +#[test] +fn count_panics() { + let counter = Arc::new(AtomicUsize::new(0)); + let counter_ = counter.clone(); + let pool = tokio_threadpool::Builder::new() + .panic_handler(move |_err| { + // We caught a panic. + counter_.fetch_add(1, Relaxed); + }) + .build(); + // Spawn a future that will panic. + pool.spawn(lazy(|| -> Result<(), ()> { panic!() })); + pool.shutdown_on_idle().wait().unwrap(); + let counter = counter.load(Relaxed); + assert_eq!(counter, 1); +} + +#[test] +fn multi_threadpool() { + use futures::sync::oneshot; + + let pool1 = ThreadPool::new(); + let pool2 = ThreadPool::new(); + + let (tx, rx) = oneshot::channel(); + let (done_tx, done_rx) = mpsc::channel(); + + pool2.spawn({ + rx.and_then(move |_| { + done_tx.send(()).unwrap(); + Ok(()) + }) + .map_err(|e| panic!("err={:?}", e)) + }); + + pool1.spawn(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })); + + done_rx.recv().unwrap(); +} + +#[test] +fn eagerly_drops_futures() { + use futures::future::{empty, lazy, Future}; + use futures::task; + use std::sync::mpsc; + + struct NotifyOnDrop(mpsc::Sender<()>); + + impl Drop for NotifyOnDrop { + fn drop(&mut self) { + self.0.send(()).unwrap(); + } + } + + struct MyPark { + inner: DefaultPark, + #[allow(dead_code)] + park_tx: mpsc::SyncSender<()>, + unpark_tx: mpsc::SyncSender<()>, + } + + impl Park for MyPark { + type Unpark = MyUnpark; + type Error = <DefaultPark as Park>::Error; + + fn unpark(&self) -> Self::Unpark { + MyUnpark { + inner: self.inner.unpark(), + unpark_tx: self.unpark_tx.clone(), + } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } + } + + struct MyUnpark { + inner: DefaultUnpark, + #[allow(dead_code)] + unpark_tx: mpsc::SyncSender<()>, + } + + impl Unpark for MyUnpark { + fn unpark(&self) { + self.inner.unpark() + } + } + + let (task_tx, task_rx) = mpsc::channel(); + let (drop_tx, drop_rx) = mpsc::channel(); + let (park_tx, park_rx) = mpsc::sync_channel(0); + let (unpark_tx, unpark_rx) = mpsc::sync_channel(0); + + // Get the signal that the handler dropped. + let notify_on_drop = NotifyOnDrop(drop_tx); + + let pool = tokio_threadpool::Builder::new() + .custom_park(move |_| MyPark { + inner: DefaultPark::new(), + park_tx: park_tx.clone(), + unpark_tx: unpark_tx.clone(), + }) + .build(); + + pool.spawn(lazy(move || { + // Get a handle to the current task. + let task = task::current(); + + // Send it to the main thread to hold on to. + task_tx.send(task).unwrap(); + + // This future will never resolve, it is only used to hold on to thee + // `notify_on_drop` handle. + empty::<(), ()>().then(move |_| { + // This code path should never be reached. + if true { + panic!() + } + + // Explicitly drop `notify_on_drop` here, this is mostly to ensure + // that the `notify_on_drop` handle gets moved into the task. It + // will actually get dropped when the runtime is dropped. + drop(notify_on_drop); + + Ok(()) + }) + })); + + // Wait until we get the task handle. + let task = task_rx.recv().unwrap(); + + // Drop the pool, this should result in futures being forcefully dropped. + drop(pool); + + // Make sure `MyPark` and `MyUnpark` were dropped during shutdown. + assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)); + assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)); + + // If the future is forcefully dropped, then we will get a signal here. + drop_rx.recv().unwrap(); + + // Ensure `task` lives until after the test completes. + drop(task); +} |