summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/task_local.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/task_local.rs')
-rw-r--r--third_party/rust/tokio/tests/task_local.rs119
1 files changed, 119 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/task_local.rs b/third_party/rust/tokio/tests/task_local.rs
new file mode 100644
index 0000000000..949a40c2a4
--- /dev/null
+++ b/third_party/rust/tokio/tests/task_local.rs
@@ -0,0 +1,119 @@
+#![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
+#![allow(clippy::declare_interior_mutable_const)]
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::oneshot;
+
+#[tokio::test(flavor = "multi_thread")]
+async fn local() {
+ tokio::task_local! {
+ static REQ_ID: u32;
+ pub static FOO: bool;
+ }
+
+ let j1 = tokio::spawn(REQ_ID.scope(1, async move {
+ assert_eq!(REQ_ID.get(), 1);
+ assert_eq!(REQ_ID.get(), 1);
+ }));
+
+ let j2 = tokio::spawn(REQ_ID.scope(2, async move {
+ REQ_ID.with(|v| {
+ assert_eq!(REQ_ID.get(), 2);
+ assert_eq!(*v, 2);
+ });
+
+ tokio::time::sleep(std::time::Duration::from_millis(10)).await;
+
+ assert_eq!(REQ_ID.get(), 2);
+ }));
+
+ let j3 = tokio::spawn(FOO.scope(true, async move {
+ assert!(FOO.get());
+ }));
+
+ j1.await.unwrap();
+ j2.await.unwrap();
+ j3.await.unwrap();
+}
+
+#[tokio::test]
+async fn task_local_available_on_abort() {
+ tokio::task_local! {
+ static KEY: u32;
+ }
+
+ struct MyFuture {
+ tx_poll: Option<oneshot::Sender<()>>,
+ tx_drop: Option<oneshot::Sender<u32>>,
+ }
+ impl Future for MyFuture {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ if let Some(tx_poll) = self.tx_poll.take() {
+ let _ = tx_poll.send(());
+ }
+ Poll::Pending
+ }
+ }
+ impl Drop for MyFuture {
+ fn drop(&mut self) {
+ let _ = self.tx_drop.take().unwrap().send(KEY.get());
+ }
+ }
+
+ let (tx_drop, rx_drop) = oneshot::channel();
+ let (tx_poll, rx_poll) = oneshot::channel();
+
+ let h = tokio::spawn(KEY.scope(
+ 42,
+ MyFuture {
+ tx_poll: Some(tx_poll),
+ tx_drop: Some(tx_drop),
+ },
+ ));
+
+ rx_poll.await.unwrap();
+ h.abort();
+ assert_eq!(rx_drop.await.unwrap(), 42);
+
+ let err = h.await.unwrap_err();
+ if !err.is_cancelled() {
+ if let Ok(panic) = err.try_into_panic() {
+ std::panic::resume_unwind(panic);
+ } else {
+ panic!();
+ }
+ }
+}
+
+#[tokio::test]
+async fn task_local_available_on_completion_drop() {
+ tokio::task_local! {
+ static KEY: u32;
+ }
+
+ struct MyFuture {
+ tx: Option<oneshot::Sender<u32>>,
+ }
+ impl Future for MyFuture {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Ready(())
+ }
+ }
+ impl Drop for MyFuture {
+ fn drop(&mut self) {
+ let _ = self.tx.take().unwrap().send(KEY.get());
+ }
+ }
+
+ let (tx, rx) = oneshot::channel();
+
+ let h = tokio::spawn(KEY.scope(42, MyFuture { tx: Some(tx) }));
+
+ assert_eq!(rx.await.unwrap(), 42);
+ h.await.unwrap();
+}