summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-executor/src/unpark_mutex.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-executor/src/unpark_mutex.rs')
-rw-r--r--third_party/rust/futures-executor/src/unpark_mutex.rs137
1 files changed, 137 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/src/unpark_mutex.rs b/third_party/rust/futures-executor/src/unpark_mutex.rs
new file mode 100644
index 0000000000..ac5112cfa2
--- /dev/null
+++ b/third_party/rust/futures-executor/src/unpark_mutex.rs
@@ -0,0 +1,137 @@
+use std::cell::UnsafeCell;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// A "lock" around data `D`, which employs a *helping* strategy.
+///
+/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being
+/// invoked on only a single thread at a time (2) `poll` being invoked at least
+/// once after each `unpark` (unless the future has completed).
+pub(crate) struct UnparkMutex<D> {
+ // The state of task execution (state machine described below)
+ status: AtomicUsize,
+
+ // The actual task data, accessible only in the POLLING state
+ inner: UnsafeCell<Option<D>>,
+}
+
+// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on
+// acquisition failure, the current lock holder performs the desired work --
+// re-polling.
+//
+// As such, these impls mirror those for `Mutex<D>`. In particular, a reference
+// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which
+// must therefore be `Send`.
+unsafe impl<D: Send> Send for UnparkMutex<D> {}
+unsafe impl<D: Send> Sync for UnparkMutex<D> {}
+
+// There are four possible task states, listed below with their possible
+// transitions:
+
+// The task is blocked, waiting on an event
+const WAITING: usize = 0; // --> POLLING
+
+// The task is actively being polled by a thread; arrival of additional events
+// of interest should move it to the REPOLL state
+const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE
+
+// The task is actively being polled, but will need to be re-polled upon
+// completion to ensure that all events were observed.
+const REPOLL: usize = 2; // --> POLLING
+
+// The task has finished executing (either successfully or with an error/panic)
+const COMPLETE: usize = 3; // No transitions out
+
+impl<D> UnparkMutex<D> {
+ pub(crate) fn new() -> Self {
+ Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) }
+ }
+
+ /// Attempt to "notify" the mutex that a poll should occur.
+ ///
+ /// An `Ok` result indicates that the `POLLING` state has been entered, and
+ /// the caller can proceed to poll the future. An `Err` result indicates
+ /// that polling is not necessary (because the task is finished or the
+ /// polling has been delegated).
+ pub(crate) fn notify(&self) -> Result<D, ()> {
+ let mut status = self.status.load(SeqCst);
+ loop {
+ match status {
+ // The task is idle, so try to run it immediately.
+ WAITING => {
+ match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) {
+ Ok(_) => {
+ let data = unsafe {
+ // SAFETY: we've ensured mutual exclusion via
+ // the status protocol; we are the only thread
+ // that has transitioned to the POLLING state,
+ // and we won't transition back to QUEUED until
+ // the lock is "released" by this thread. See
+ // the protocol diagram above.
+ (*self.inner.get()).take().unwrap()
+ };
+ return Ok(data);
+ }
+ Err(cur) => status = cur,
+ }
+ }
+
+ // The task is being polled, so we need to record that it should
+ // be *repolled* when complete.
+ POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) {
+ Ok(_) => return Err(()),
+ Err(cur) => status = cur,
+ },
+
+ // The task is already scheduled for polling, or is complete, so
+ // we've got nothing to do.
+ _ => return Err(()),
+ }
+ }
+ }
+
+ /// Alert the mutex that polling is about to begin, clearing any accumulated
+ /// re-poll requests.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn start_poll(&self) {
+ self.status.store(POLLING, SeqCst);
+ }
+
+ /// Alert the mutex that polling completed with `Pending`.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> {
+ *self.inner.get() = Some(data);
+
+ match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
+ // no unparks came in while we were running
+ Ok(_) => Ok(()),
+
+ // guaranteed to be in REPOLL state; just clobber the
+ // state and run again.
+ Err(status) => {
+ assert_eq!(status, REPOLL);
+ self.status.store(POLLING, SeqCst);
+ Err((*self.inner.get()).take().unwrap())
+ }
+ }
+ }
+
+ /// Alert the mutex that the task has completed execution and should not be
+ /// notified again.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn complete(&self) {
+ self.status.store(COMPLETE, SeqCst);
+ }
+}