summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-channel/src/waker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/crossbeam-channel/src/waker.rs')
-rw-r--r--third_party/rust/crossbeam-channel/src/waker.rs286
1 files changed, 286 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/src/waker.rs b/third_party/rust/crossbeam-channel/src/waker.rs
new file mode 100644
index 0000000000..7eb58ba7f3
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/waker.rs
@@ -0,0 +1,286 @@
+//! Waking mechanism for threads blocked on channel operations.
+
+use std::ptr;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
+use std::thread::{self, ThreadId};
+
+use crate::context::Context;
+use crate::select::{Operation, Selected};
+
+/// Represents a thread blocked on a specific channel operation.
+pub(crate) struct Entry {
+ /// The operation.
+ pub(crate) oper: Operation,
+
+ /// Optional packet.
+ pub(crate) packet: *mut (),
+
+ /// Context associated with the thread owning this operation.
+ pub(crate) cx: Context,
+}
+
+/// A queue of threads blocked on channel operations.
+///
+/// This data structure is used by threads to register blocking operations and get woken up once
+/// an operation becomes ready.
+pub(crate) struct Waker {
+ /// A list of select operations.
+ selectors: Vec<Entry>,
+
+ /// A list of operations waiting to be ready.
+ observers: Vec<Entry>,
+}
+
+impl Waker {
+ /// Creates a new `Waker`.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ Waker {
+ selectors: Vec::new(),
+ observers: Vec::new(),
+ }
+ }
+
+ /// Registers a select operation.
+ #[inline]
+ pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
+ self.register_with_packet(oper, ptr::null_mut(), cx);
+ }
+
+ /// Registers a select operation and a packet.
+ #[inline]
+ pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
+ self.selectors.push(Entry {
+ oper,
+ packet,
+ cx: cx.clone(),
+ });
+ }
+
+ /// Unregisters a select operation.
+ #[inline]
+ pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
+ if let Some((i, _)) = self
+ .selectors
+ .iter()
+ .enumerate()
+ .find(|&(_, entry)| entry.oper == oper)
+ {
+ let entry = self.selectors.remove(i);
+ Some(entry)
+ } else {
+ None
+ }
+ }
+
+ /// Attempts to find another thread's entry, select the operation, and wake it up.
+ #[inline]
+ pub(crate) fn try_select(&mut self) -> Option<Entry> {
+ if self.selectors.is_empty() {
+ None
+ } else {
+ let thread_id = current_thread_id();
+
+ self.selectors
+ .iter()
+ .position(|selector| {
+ // Does the entry belong to a different thread?
+ selector.cx.thread_id() != thread_id
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
+ // Provide the packet.
+ selector.cx.store_packet(selector.packet);
+ // Wake the thread up.
+ selector.cx.unpark();
+ true
+ }
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
+ }
+ }
+
+ /// Returns `true` if there is an entry which can be selected by the current thread.
+ #[inline]
+ pub(crate) fn can_select(&self) -> bool {
+ if self.selectors.is_empty() {
+ false
+ } else {
+ let thread_id = current_thread_id();
+
+ self.selectors.iter().any(|entry| {
+ entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
+ })
+ }
+ }
+
+ /// Registers an operation waiting to be ready.
+ #[inline]
+ pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
+ self.observers.push(Entry {
+ oper,
+ packet: ptr::null_mut(),
+ cx: cx.clone(),
+ });
+ }
+
+ /// Unregisters an operation waiting to be ready.
+ #[inline]
+ pub(crate) fn unwatch(&mut self, oper: Operation) {
+ self.observers.retain(|e| e.oper != oper);
+ }
+
+ /// Notifies all operations waiting to be ready.
+ #[inline]
+ pub(crate) fn notify(&mut self) {
+ for entry in self.observers.drain(..) {
+ if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
+ entry.cx.unpark();
+ }
+ }
+ }
+
+ /// Notifies all registered operations that the channel is disconnected.
+ #[inline]
+ pub(crate) fn disconnect(&mut self) {
+ for entry in self.selectors.iter() {
+ if entry.cx.try_select(Selected::Disconnected).is_ok() {
+ // Wake the thread up.
+ //
+ // Here we don't remove the entry from the queue. Registered threads must
+ // unregister from the waker by themselves. They might also want to recover the
+ // packet value and destroy it, if necessary.
+ entry.cx.unpark();
+ }
+ }
+
+ self.notify();
+ }
+}
+
+impl Drop for Waker {
+ #[inline]
+ fn drop(&mut self) {
+ debug_assert_eq!(self.selectors.len(), 0);
+ debug_assert_eq!(self.observers.len(), 0);
+ }
+}
+
+/// A waker that can be shared among threads without locking.
+///
+/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
+pub(crate) struct SyncWaker {
+ /// The inner `Waker`.
+ inner: Mutex<Waker>,
+
+ /// `true` if the waker is empty.
+ is_empty: AtomicBool,
+}
+
+impl SyncWaker {
+ /// Creates a new `SyncWaker`.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ SyncWaker {
+ inner: Mutex::new(Waker::new()),
+ is_empty: AtomicBool::new(true),
+ }
+ }
+
+ /// Registers the current thread with an operation.
+ #[inline]
+ pub(crate) fn register(&self, oper: Operation, cx: &Context) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.register(oper, cx);
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+
+ /// Unregisters an operation previously registered by the current thread.
+ #[inline]
+ pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
+ let mut inner = self.inner.lock().unwrap();
+ let entry = inner.unregister(oper);
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ entry
+ }
+
+ /// Attempts to find one thread (not the current one), select its operation, and wake it up.
+ #[inline]
+ pub(crate) fn notify(&self) {
+ if !self.is_empty.load(Ordering::SeqCst) {
+ let mut inner = self.inner.lock().unwrap();
+ if !self.is_empty.load(Ordering::SeqCst) {
+ inner.try_select();
+ inner.notify();
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+ }
+ }
+
+ /// Registers an operation waiting to be ready.
+ #[inline]
+ pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.watch(oper, cx);
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+
+ /// Unregisters an operation waiting to be ready.
+ #[inline]
+ pub(crate) fn unwatch(&self, oper: Operation) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.unwatch(oper);
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+
+ /// Notifies all threads that the channel is disconnected.
+ #[inline]
+ pub(crate) fn disconnect(&self) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.disconnect();
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+}
+
+impl Drop for SyncWaker {
+ #[inline]
+ fn drop(&mut self) {
+ debug_assert!(self.is_empty.load(Ordering::SeqCst));
+ }
+}
+
+/// Returns the id of the current thread.
+#[inline]
+fn current_thread_id() -> ThreadId {
+ thread_local! {
+ /// Cached thread-local id.
+ static THREAD_ID: ThreadId = thread::current().id();
+ }
+
+ THREAD_ID
+ .try_with(|id| *id)
+ .unwrap_or_else(|_| thread::current().id())
+}