//! Thread-local context used in select. use std::cell::Cell; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::{self, Thread, ThreadId}; use std::time::Instant; use crossbeam_utils::Backoff; use crate::select::Selected; /// Thread-local context used in select. #[derive(Debug, Clone)] pub struct Context { inner: Arc, } /// Inner representation of `Context`. #[derive(Debug)] struct Inner { /// Selected operation. select: AtomicUsize, /// A slot into which another thread may store a pointer to its `Packet`. packet: AtomicUsize, /// Thread handle. thread: Thread, /// Thread id. thread_id: ThreadId, } impl Context { /// Creates a new context for the duration of the closure. #[inline] pub fn with(f: F) -> R where F: FnOnce(&Context) -> R, { thread_local! { /// Cached thread-local context. static CONTEXT: Cell> = Cell::new(Some(Context::new())); } let mut f = Some(f); let mut f = move |cx: &Context| -> R { let f = f.take().unwrap(); f(cx) }; CONTEXT .try_with(|cell| match cell.take() { None => f(&Context::new()), Some(cx) => { cx.reset(); let res = f(&cx); cell.set(Some(cx)); res } }) .unwrap_or_else(|_| f(&Context::new())) } /// Creates a new `Context`. #[cold] fn new() -> Context { Context { inner: Arc::new(Inner { select: AtomicUsize::new(Selected::Waiting.into()), packet: AtomicUsize::new(0), thread: thread::current(), thread_id: thread::current().id(), }), } } /// Resets `select` and `packet`. #[inline] fn reset(&self) { self.inner .select .store(Selected::Waiting.into(), Ordering::Release); self.inner.packet.store(0, Ordering::Release); } /// Attempts to select an operation. /// /// On failure, the previously selected operation is returned. #[inline] pub fn try_select(&self, select: Selected) -> Result<(), Selected> { self.inner .select .compare_exchange( Selected::Waiting.into(), select.into(), Ordering::AcqRel, Ordering::Acquire, ) .map(|_| ()) .map_err(|e| e.into()) } /// Returns the selected operation. #[inline] pub fn selected(&self) -> Selected { Selected::from(self.inner.select.load(Ordering::Acquire)) } /// Stores a packet. /// /// This method must be called after `try_select` succeeds and there is a packet to provide. #[inline] pub fn store_packet(&self, packet: usize) { if packet != 0 { self.inner.packet.store(packet, Ordering::Release); } } /// Waits until a packet is provided and returns it. #[inline] pub fn wait_packet(&self) -> usize { let backoff = Backoff::new(); loop { let packet = self.inner.packet.load(Ordering::Acquire); if packet != 0 { return packet; } backoff.snooze(); } } /// Waits until an operation is selected and returns it. /// /// If the deadline is reached, `Selected::Aborted` will be selected. #[inline] pub fn wait_until(&self, deadline: Option) -> Selected { // Spin for a short time, waiting until an operation is selected. let backoff = Backoff::new(); loop { let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); if sel != Selected::Waiting { return sel; } if backoff.is_completed() { break; } else { backoff.snooze(); } } loop { // Check whether an operation has been selected. let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); if sel != Selected::Waiting { return sel; } // If there's a deadline, park the current thread until the deadline is reached. if let Some(end) = deadline { let now = Instant::now(); if now < end { thread::park_timeout(end - now); } else { // The deadline has been reached. Try aborting select. return match self.try_select(Selected::Aborted) { Ok(()) => Selected::Aborted, Err(s) => s, }; } } else { thread::park(); } } } /// Unparks the thread this context belongs to. #[inline] pub fn unpark(&self) { self.inner.thread.unpark(); } /// Returns the id of the thread this context belongs to. #[inline] pub fn thread_id(&self) -> ThreadId { self.inner.thread_id } }