193 lines
6.1 KiB
Rust
193 lines
6.1 KiB
Rust
#![warn(rust_2018_idioms)]
|
|
|
|
use std::rc::Rc;
|
|
use std::sync::Arc;
|
|
use tokio_util::task;
|
|
|
|
/// Simple test of running a !Send future via spawn_pinned
|
|
#[tokio::test]
|
|
async fn can_spawn_not_send_future() {
|
|
let pool = task::LocalPoolHandle::new(1);
|
|
|
|
let output = pool
|
|
.spawn_pinned(|| {
|
|
// Rc is !Send + !Sync
|
|
let local_data = Rc::new("test");
|
|
|
|
// This future holds an Rc, so it is !Send
|
|
async move { local_data.to_string() }
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(output, "test");
|
|
}
|
|
|
|
/// Dropping the join handle still lets the task execute
|
|
#[test]
|
|
fn can_drop_future_and_still_get_output() {
|
|
let pool = task::LocalPoolHandle::new(1);
|
|
let (sender, receiver) = std::sync::mpsc::channel();
|
|
|
|
let _ = pool.spawn_pinned(move || {
|
|
// Rc is !Send + !Sync
|
|
let local_data = Rc::new("test");
|
|
|
|
// This future holds an Rc, so it is !Send
|
|
async move {
|
|
let _ = sender.send(local_data.to_string());
|
|
}
|
|
});
|
|
|
|
assert_eq!(receiver.recv(), Ok("test".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
#[should_panic(expected = "assertion failed: pool_size > 0")]
|
|
fn cannot_create_zero_sized_pool() {
|
|
let _pool = task::LocalPoolHandle::new(0);
|
|
}
|
|
|
|
/// We should be able to spawn multiple futures onto the pool at the same time.
|
|
#[tokio::test]
|
|
async fn can_spawn_multiple_futures() {
|
|
let pool = task::LocalPoolHandle::new(2);
|
|
|
|
let join_handle1 = pool.spawn_pinned(|| {
|
|
let local_data = Rc::new("test1");
|
|
async move { local_data.to_string() }
|
|
});
|
|
let join_handle2 = pool.spawn_pinned(|| {
|
|
let local_data = Rc::new("test2");
|
|
async move { local_data.to_string() }
|
|
});
|
|
|
|
assert_eq!(join_handle1.await.unwrap(), "test1");
|
|
assert_eq!(join_handle2.await.unwrap(), "test2");
|
|
}
|
|
|
|
/// A panic in the spawned task causes the join handle to return an error.
|
|
/// But, you can continue to spawn tasks.
|
|
#[tokio::test]
|
|
async fn task_panic_propagates() {
|
|
let pool = task::LocalPoolHandle::new(1);
|
|
|
|
let join_handle = pool.spawn_pinned(|| async {
|
|
panic!("Test panic");
|
|
});
|
|
|
|
let result = join_handle.await;
|
|
assert!(result.is_err());
|
|
let error = result.unwrap_err();
|
|
assert!(error.is_panic());
|
|
let panic_str: &str = *error.into_panic().downcast().unwrap();
|
|
assert_eq!(panic_str, "Test panic");
|
|
|
|
// Trying again with a "safe" task still works
|
|
let join_handle = pool.spawn_pinned(|| async { "test" });
|
|
let result = join_handle.await;
|
|
assert!(result.is_ok());
|
|
assert_eq!(result.unwrap(), "test");
|
|
}
|
|
|
|
/// A panic during task creation causes the join handle to return an error.
|
|
/// But, you can continue to spawn tasks.
|
|
#[tokio::test]
|
|
async fn callback_panic_does_not_kill_worker() {
|
|
let pool = task::LocalPoolHandle::new(1);
|
|
|
|
let join_handle = pool.spawn_pinned(|| {
|
|
panic!("Test panic");
|
|
#[allow(unreachable_code)]
|
|
async {}
|
|
});
|
|
|
|
let result = join_handle.await;
|
|
assert!(result.is_err());
|
|
let error = result.unwrap_err();
|
|
assert!(error.is_panic());
|
|
let panic_str: &str = *error.into_panic().downcast().unwrap();
|
|
assert_eq!(panic_str, "Test panic");
|
|
|
|
// Trying again with a "safe" callback works
|
|
let join_handle = pool.spawn_pinned(|| async { "test" });
|
|
let result = join_handle.await;
|
|
assert!(result.is_ok());
|
|
assert_eq!(result.unwrap(), "test");
|
|
}
|
|
|
|
/// Canceling the task via the returned join handle cancels the spawned task
|
|
/// (which has a different, internal join handle).
|
|
#[tokio::test]
|
|
async fn task_cancellation_propagates() {
|
|
let pool = task::LocalPoolHandle::new(1);
|
|
let notify_dropped = Arc::new(());
|
|
let weak_notify_dropped = Arc::downgrade(¬ify_dropped);
|
|
|
|
let (start_sender, start_receiver) = tokio::sync::oneshot::channel();
|
|
let (drop_sender, drop_receiver) = tokio::sync::oneshot::channel::<()>();
|
|
let join_handle = pool.spawn_pinned(|| async move {
|
|
let _drop_sender = drop_sender;
|
|
// Move the Arc into the task
|
|
let _notify_dropped = notify_dropped;
|
|
let _ = start_sender.send(());
|
|
|
|
// Keep the task running until it gets aborted
|
|
futures::future::pending::<()>().await;
|
|
});
|
|
|
|
// Wait for the task to start
|
|
let _ = start_receiver.await;
|
|
|
|
join_handle.abort();
|
|
|
|
// Wait for the inner task to abort, dropping the sender.
|
|
// The top level join handle aborts quicker than the inner task (the abort
|
|
// needs to propagate and get processed on the worker thread), so we can't
|
|
// just await the top level join handle.
|
|
let _ = drop_receiver.await;
|
|
|
|
// Check that the Arc has been dropped. This verifies that the inner task
|
|
// was canceled as well.
|
|
assert!(weak_notify_dropped.upgrade().is_none());
|
|
}
|
|
|
|
/// Tasks should be given to the least burdened worker. When spawning two tasks
|
|
/// on a pool with two empty workers the tasks should be spawned on separate
|
|
/// workers.
|
|
#[tokio::test]
|
|
async fn tasks_are_balanced() {
|
|
let pool = task::LocalPoolHandle::new(2);
|
|
|
|
// Spawn a task so one thread has a task count of 1
|
|
let (start_sender1, start_receiver1) = tokio::sync::oneshot::channel();
|
|
let (end_sender1, end_receiver1) = tokio::sync::oneshot::channel();
|
|
let join_handle1 = pool.spawn_pinned(|| async move {
|
|
let _ = start_sender1.send(());
|
|
let _ = end_receiver1.await;
|
|
std::thread::current().id()
|
|
});
|
|
|
|
// Wait for the first task to start up
|
|
let _ = start_receiver1.await;
|
|
|
|
// This task should be spawned on the other thread
|
|
let (start_sender2, start_receiver2) = tokio::sync::oneshot::channel();
|
|
let join_handle2 = pool.spawn_pinned(|| async move {
|
|
let _ = start_sender2.send(());
|
|
std::thread::current().id()
|
|
});
|
|
|
|
// Wait for the second task to start up
|
|
let _ = start_receiver2.await;
|
|
|
|
// Allow the first task to end
|
|
let _ = end_sender1.send(());
|
|
|
|
let thread_id1 = join_handle1.await.unwrap();
|
|
let thread_id2 = join_handle2.await.unwrap();
|
|
|
|
// Since the first task was active when the second task spawned, they should
|
|
// be on separate workers/threads.
|
|
assert_ne!(thread_id1, thread_id2);
|
|
}
|