summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/park.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/park.rs
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/park.rs')
-rw-r--r--vendor/tokio/src/runtime/park.rs337
1 files changed, 214 insertions, 123 deletions
diff --git a/vendor/tokio/src/runtime/park.rs b/vendor/tokio/src/runtime/park.rs
index 033b9f20b..2392846ab 100644
--- a/vendor/tokio/src/runtime/park.rs
+++ b/vendor/tokio/src/runtime/park.rs
@@ -1,153 +1,102 @@
-//! Parks the runtime.
-//!
-//! A combination of the various resource driver park handles.
+#![cfg_attr(not(feature = "full"), allow(dead_code))]
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
-use crate::loom::thread;
-use crate::park::{Park, Unpark};
-use crate::runtime::driver::Driver;
-use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
-pub(crate) struct Parker {
+#[derive(Debug)]
+pub(crate) struct ParkThread {
inner: Arc<Inner>,
}
-pub(crate) struct Unparker {
+/// Unblocks a thread that was blocked by `ParkThread`.
+#[derive(Clone, Debug)]
+pub(crate) struct UnparkThread {
inner: Arc<Inner>,
}
+#[derive(Debug)]
struct Inner {
- /// Avoids entering the park if possible
state: AtomicUsize,
-
- /// Used to coordinate access to the driver / condvar
mutex: Mutex<()>,
-
- /// Condvar to block on if the driver is unavailable.
condvar: Condvar,
-
- /// Resource (I/O, time, ...) driver
- shared: Arc<Shared>,
}
const EMPTY: usize = 0;
-const PARKED_CONDVAR: usize = 1;
-const PARKED_DRIVER: usize = 2;
-const NOTIFIED: usize = 3;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
-/// Shared across multiple Parker handles
-struct Shared {
- /// Shared driver. Only one thread at a time can use this
- driver: TryLock<Driver>,
+tokio_thread_local! {
+ static CURRENT_PARKER: ParkThread = ParkThread::new();
+}
- /// Unpark handle
- handle: <Driver as Park>::Unpark,
+// Bit of a hack, but it is only for loom
+#[cfg(loom)]
+tokio_thread_local! {
+ static CURRENT_THREAD_PARK_COUNT: AtomicUsize = AtomicUsize::new(0);
}
-impl Parker {
- pub(crate) fn new(driver: Driver) -> Parker {
- let handle = driver.unpark();
+// ==== impl ParkThread ====
- Parker {
+impl ParkThread {
+ pub(crate) fn new() -> Self {
+ Self {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
- shared: Arc::new(Shared {
- driver: TryLock::new(driver),
- handle,
- }),
}),
}
}
-}
-impl Clone for Parker {
- fn clone(&self) -> Parker {
- Parker {
- inner: Arc::new(Inner {
- state: AtomicUsize::new(EMPTY),
- mutex: Mutex::new(()),
- condvar: Condvar::new(),
- shared: self.inner.shared.clone(),
- }),
- }
+ pub(crate) fn unpark(&self) -> UnparkThread {
+ let inner = self.inner.clone();
+ UnparkThread { inner }
}
-}
-
-impl Park for Parker {
- type Unpark = Unparker;
- type Error = ();
- fn unpark(&self) -> Unparker {
- Unparker {
- inner: self.inner.clone(),
- }
- }
-
- fn park(&mut self) -> Result<(), Self::Error> {
+ pub(crate) fn park(&mut self) {
+ #[cfg(loom)]
+ CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
self.inner.park();
- Ok(())
}
- fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- // Only parking with zero is supported...
- assert_eq!(duration, Duration::from_millis(0));
+ pub(crate) fn park_timeout(&mut self, duration: Duration) {
+ #[cfg(loom)]
+ CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
- if let Some(mut driver) = self.inner.shared.driver.try_lock() {
- driver.park_timeout(duration).map_err(|_| ())
- } else {
- Ok(())
- }
+ // Wasm doesn't have threads, so just sleep.
+ #[cfg(not(tokio_wasm))]
+ self.inner.park_timeout(duration);
+ #[cfg(tokio_wasm)]
+ std::thread::sleep(duration);
}
- fn shutdown(&mut self) {
+ pub(crate) fn shutdown(&mut self) {
self.inner.shutdown();
}
}
-impl Unpark for Unparker {
- fn unpark(&self) {
- self.inner.unpark();
- }
-}
+// ==== impl Inner ====
impl Inner {
/// Parks the current thread for at most `dur`.
fn park(&self) {
- for _ in 0..3 {
- // If we were previously notified then we consume this notification and
- // return quickly.
- if self
- .state
- .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
- .is_ok()
- {
- return;
- }
-
- thread::yield_now();
- }
-
- if let Some(mut driver) = self.shared.driver.try_lock() {
- self.park_driver(&mut driver);
- } else {
- self.park_condvar();
+ // If we were previously notified then we consume this notification and
+ // return quickly.
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return;
}
- }
- fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
- match self
- .state
- .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
- {
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
@@ -180,33 +129,44 @@ impl Inner {
}
}
- fn park_driver(&self, driver: &mut Driver) {
- match self
+ fn park_timeout(&self, dur: Duration) {
+ // Like `park` above we have a fast path for an already-notified thread,
+ // and afterwards we start coordinating for a sleep. Return quickly.
+ if self
.state
- .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
{
+ return;
+ }
+
+ if dur == Duration::from_millis(0) {
+ return;
+ }
+
+ let m = self.mutex.lock();
+
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
- // We must read here, even though we know it will be `NOTIFIED`.
- // This is because `unpark` may have been called again since we read
- // `NOTIFIED` in the `compare_exchange` above. We must perform an
- // acquire operation that synchronizes with that `unpark` to observe
- // any writes it made before the call to unpark. To do that we must
- // read from the write it made to `state`.
+ // We must read again here, see `park`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
- Err(actual) => panic!("inconsistent park state; actual = {}", actual),
+ Err(actual) => panic!("inconsistent park_timeout state; actual = {}", actual),
}
- // TODO: don't unwrap
- driver.park().unwrap();
+ // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+ // from a notification, we just want to unconditionally set the state back to
+ // empty, either consuming a notification or un-flagging ourselves as
+ // parked.
+ let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap();
match self.state.swap(EMPTY, SeqCst) {
- NOTIFIED => {} // got a notification, hurray!
- PARKED_DRIVER => {} // no notification, alas
+ NOTIFIED => {} // got a notification, hurray!
+ PARKED => {} // no notification, alas
n => panic!("inconsistent park_timeout state: {}", n),
}
}
@@ -218,15 +178,12 @@ impl Inner {
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
- EMPTY => {} // no one was waiting
- NOTIFIED => {} // already unparked
- PARKED_CONDVAR => self.unpark_condvar(),
- PARKED_DRIVER => self.unpark_driver(),
- actual => panic!("inconsistent state in unpark; actual = {}", actual),
+ EMPTY => return, // no one was waiting
+ NOTIFIED => return, // already unparked
+ PARKED => {} // gotta go wake someone up
+ _ => panic!("inconsistent state in unpark"),
}
- }
- fn unpark_condvar(&self) {
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
@@ -243,15 +200,149 @@ impl Inner {
self.condvar.notify_one()
}
- fn unpark_driver(&self) {
- self.shared.handle.unpark();
+ fn shutdown(&self) {
+ self.condvar.notify_all();
}
+}
- fn shutdown(&self) {
- if let Some(mut driver) = self.shared.driver.try_lock() {
- driver.shutdown();
+impl Default for ParkThread {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ===== impl UnparkThread =====
+
+impl UnparkThread {
+ pub(crate) fn unpark(&self) {
+ self.inner.unpark();
+ }
+}
+
+use crate::loom::thread::AccessError;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem;
+use std::rc::Rc;
+use std::task::{RawWaker, RawWakerVTable, Waker};
+
+/// Blocks the current thread using a condition variable.
+#[derive(Debug)]
+pub(crate) struct CachedParkThread {
+ _anchor: PhantomData<Rc<()>>,
+}
+
+impl CachedParkThread {
+ /// Creates a new `ParkThread` handle for the current thread.
+ ///
+ /// This type cannot be moved to other threads, so it should be created on
+ /// the thread that the caller intends to park.
+ pub(crate) fn new() -> CachedParkThread {
+ CachedParkThread {
+ _anchor: PhantomData,
}
+ }
- self.condvar.notify_all();
+ pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
+ self.unpark().map(|unpark| unpark.into_waker())
}
+
+ fn unpark(&self) -> Result<UnparkThread, AccessError> {
+ self.with_current(|park_thread| park_thread.unpark())
+ }
+
+ pub(crate) fn park(&mut self) {
+ self.with_current(|park_thread| park_thread.inner.park())
+ .unwrap();
+ }
+
+ pub(crate) fn park_timeout(&mut self, duration: Duration) {
+ self.with_current(|park_thread| park_thread.inner.park_timeout(duration))
+ .unwrap();
+ }
+
+ /// Gets a reference to the `ParkThread` handle for this thread.
+ fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
+ where
+ F: FnOnce(&ParkThread) -> R,
+ {
+ CURRENT_PARKER.try_with(|inner| f(inner))
+ }
+
+ pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
+ use std::task::Context;
+ use std::task::Poll::Ready;
+
+ // `get_unpark()` should not return a Result
+ let waker = self.waker()?;
+ let mut cx = Context::from_waker(&waker);
+
+ pin!(f);
+
+ loop {
+ if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return Ok(v);
+ }
+
+ self.park();
+ }
+ }
+}
+
+impl UnparkThread {
+ pub(crate) fn into_waker(self) -> Waker {
+ unsafe {
+ let raw = unparker_to_raw_waker(self.inner);
+ Waker::from_raw(raw)
+ }
+ }
+}
+
+impl Inner {
+ #[allow(clippy::wrong_self_convention)]
+ fn into_raw(this: Arc<Inner>) -> *const () {
+ Arc::into_raw(this) as *const ()
+ }
+
+ unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
+ Arc::from_raw(ptr as *const Inner)
+ }
+}
+
+unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
+ RawWaker::new(
+ Inner::into_raw(unparker),
+ &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
+ )
+}
+
+unsafe fn clone(raw: *const ()) -> RawWaker {
+ let unparker = Inner::from_raw(raw);
+
+ // Increment the ref count
+ mem::forget(unparker.clone());
+
+ unparker_to_raw_waker(unparker)
+}
+
+unsafe fn drop_waker(raw: *const ()) {
+ let _ = Inner::from_raw(raw);
+}
+
+unsafe fn wake(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+}
+
+unsafe fn wake_by_ref(raw: *const ()) {
+ let unparker = Inner::from_raw(raw);
+ unparker.unpark();
+
+ // We don't actually own a reference to the unparker
+ mem::forget(unparker);
+}
+
+#[cfg(loom)]
+pub(crate) fn current_thread_park_count() -> usize {
+ CURRENT_THREAD_PARK_COUNT.with(|count| count.load(SeqCst))
}