diff options
Diffstat (limited to 'library/std/src/sync/mpmc/context.rs')
-rw-r--r-- | library/std/src/sync/mpmc/context.rs | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/library/std/src/sync/mpmc/context.rs b/library/std/src/sync/mpmc/context.rs new file mode 100644 index 000000000..bbfc6ce00 --- /dev/null +++ b/library/std/src/sync/mpmc/context.rs @@ -0,0 +1,155 @@ +//! Thread-local channel context. + +use super::select::Selected; +use super::waker::current_thread_id; + +use crate::cell::Cell; +use crate::ptr; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::sync::Arc; +use crate::thread::{self, Thread}; +use crate::time::Instant; + +/// Thread-local context. +#[derive(Debug, Clone)] +pub struct Context { + inner: Arc<Inner>, +} + +/// Inner representation of `Context`. +#[derive(Debug)] +struct Inner { + /// Selected operation. + select: AtomicUsize, + + /// A slot into which another thread may store a pointer to its `Packet`. + packet: AtomicPtr<()>, + + /// Thread handle. + thread: Thread, + + /// Thread id. + thread_id: usize, +} + +impl Context { + /// Creates a new context for the duration of the closure. + #[inline] + pub fn with<F, R>(f: F) -> R + where + F: FnOnce(&Context) -> R, + { + thread_local! { + /// Cached thread-local context. + static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new())); + } + + let mut f = Some(f); + let mut f = |cx: &Context| -> R { + let f = f.take().unwrap(); + f(cx) + }; + + CONTEXT + .try_with(|cell| match cell.take() { + None => f(&Context::new()), + Some(cx) => { + cx.reset(); + let res = f(&cx); + cell.set(Some(cx)); + res + } + }) + .unwrap_or_else(|_| f(&Context::new())) + } + + /// Creates a new `Context`. + #[cold] + fn new() -> Context { + Context { + inner: Arc::new(Inner { + select: AtomicUsize::new(Selected::Waiting.into()), + packet: AtomicPtr::new(ptr::null_mut()), + thread: thread::current(), + thread_id: current_thread_id(), + }), + } + } + + /// Resets `select` and `packet`. + #[inline] + fn reset(&self) { + self.inner.select.store(Selected::Waiting.into(), Ordering::Release); + self.inner.packet.store(ptr::null_mut(), Ordering::Release); + } + + /// Attempts to select an operation. + /// + /// On failure, the previously selected operation is returned. + #[inline] + pub fn try_select(&self, select: Selected) -> Result<(), Selected> { + self.inner + .select + .compare_exchange( + Selected::Waiting.into(), + select.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) + .map(|_| ()) + .map_err(|e| e.into()) + } + + /// Stores a packet. + /// + /// This method must be called after `try_select` succeeds and there is a packet to provide. + #[inline] + pub fn store_packet(&self, packet: *mut ()) { + if !packet.is_null() { + self.inner.packet.store(packet, Ordering::Release); + } + } + + /// Waits until an operation is selected and returns it. + /// + /// If the deadline is reached, `Selected::Aborted` will be selected. + #[inline] + pub fn wait_until(&self, deadline: Option<Instant>) -> Selected { + loop { + // Check whether an operation has been selected. + let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); + if sel != Selected::Waiting { + return sel; + } + + // If there's a deadline, park the current thread until the deadline is reached. + if let Some(end) = deadline { + let now = Instant::now(); + + if now < end { + thread::park_timeout(end - now); + } else { + // The deadline has been reached. Try aborting select. + return match self.try_select(Selected::Aborted) { + Ok(()) => Selected::Aborted, + Err(s) => s, + }; + } + } else { + thread::park(); + } + } + } + + /// Unparks the thread this context belongs to. + #[inline] + pub fn unpark(&self) { + self.inner.thread.unpark(); + } + + /// Returns the id of the thread this context belongs to. + #[inline] + pub fn thread_id(&self) -> usize { + self.inner.thread_id + } +} |