//! Interface to the select mechanism. use std::fmt; use std::marker::PhantomData; use std::mem; use std::time::{Duration, Instant}; use crossbeam_utils::Backoff; use crate::channel::{self, Receiver, Sender}; use crate::context::Context; use crate::err::{ReadyTimeoutError, TryReadyError}; use crate::err::{RecvError, SendError}; use crate::err::{SelectTimeoutError, TrySelectError}; use crate::flavors; use crate::utils; /// Temporary data that gets initialized during select or a blocking operation, and is consumed by /// `read` or `write`. /// /// Each field contains data associated with a specific channel flavor. // This is a private API that is used by the select macro. #[derive(Debug, Default)] pub struct Token { pub(crate) at: flavors::at::AtToken, pub(crate) array: flavors::array::ArrayToken, pub(crate) list: flavors::list::ListToken, #[allow(dead_code)] pub(crate) never: flavors::never::NeverToken, pub(crate) tick: flavors::tick::TickToken, pub(crate) zero: flavors::zero::ZeroToken, } /// Identifier associated with an operation by a specific thread on a specific channel. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct Operation(usize); impl Operation { /// Creates an operation identifier from a mutable reference. /// /// This function essentially just turns the address of the reference into a number. The /// reference should point to a variable that is specific to the thread and the operation, /// and is alive for the entire duration of select or blocking operation. #[inline] pub fn hook(r: &mut T) -> Operation { let val = r as *mut T as usize; // Make sure that the pointer address doesn't equal the numerical representation of // `Selected::{Waiting, Aborted, Disconnected}`. assert!(val > 2); Operation(val) } } /// Current state of a select or a blocking operation. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Selected { /// Still waiting for an operation. Waiting, /// The attempt to block the current thread has been aborted. Aborted, /// An operation became ready because a channel is disconnected. Disconnected, /// An operation became ready because a message can be sent or received. Operation(Operation), } impl From for Selected { #[inline] fn from(val: usize) -> Selected { match val { 0 => Selected::Waiting, 1 => Selected::Aborted, 2 => Selected::Disconnected, oper => Selected::Operation(Operation(oper)), } } } impl Into for Selected { #[inline] fn into(self) -> usize { match self { Selected::Waiting => 0, Selected::Aborted => 1, Selected::Disconnected => 2, Selected::Operation(Operation(val)) => val, } } } /// A receiver or a sender that can participate in select. /// /// This is a handle that assists select in executing an operation, registration, deciding on the /// appropriate deadline for blocking, etc. // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. pub trait SelectHandle { /// Attempts to select an operation and returns `true` on success. fn try_select(&self, token: &mut Token) -> bool; /// Returns a deadline for an operation, if there is one. fn deadline(&self) -> Option; /// Registers an operation for execution and returns `true` if it is now ready. fn register(&self, oper: Operation, cx: &Context) -> bool; /// Unregisters an operation for execution. fn unregister(&self, oper: Operation); /// Attempts to select an operation the thread got woken up for and returns `true` on success. fn accept(&self, token: &mut Token, cx: &Context) -> bool; /// Returns `true` if an operation can be executed without blocking. fn is_ready(&self) -> bool; /// Registers an operation for readiness notification and returns `true` if it is now ready. fn watch(&self, oper: Operation, cx: &Context) -> bool; /// Unregisters an operation for readiness notification. fn unwatch(&self, oper: Operation); } impl SelectHandle for &T { fn try_select(&self, token: &mut Token) -> bool { (**self).try_select(token) } fn deadline(&self) -> Option { (**self).deadline() } fn register(&self, oper: Operation, cx: &Context) -> bool { (**self).register(oper, cx) } fn unregister(&self, oper: Operation) { (**self).unregister(oper); } fn accept(&self, token: &mut Token, cx: &Context) -> bool { (**self).accept(token, cx) } fn is_ready(&self) -> bool { (**self).is_ready() } fn watch(&self, oper: Operation, cx: &Context) -> bool { (**self).watch(oper, cx) } fn unwatch(&self, oper: Operation) { (**self).unwatch(oper) } } /// Determines when a select operation should time out. #[derive(Clone, Copy, Eq, PartialEq)] enum Timeout { /// No blocking. Now, /// Block forever. Never, /// Time out after the time instant. At(Instant), } /// Runs until one of the operations is selected, potentially blocking the current thread. /// /// Successful receive operations will have to be followed up by `channel::read()` and successful /// send operations by `channel::write()`. fn run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)> { if handles.is_empty() { // Wait until the timeout and return. match timeout { Timeout::Now => return None, Timeout::Never => { utils::sleep_until(None); unreachable!(); } Timeout::At(when) => { utils::sleep_until(Some(when)); return None; } } } // Shuffle the operations for fairness. utils::shuffle(handles); // Create a token, which serves as a temporary variable that gets initialized in this function // and is later used by a call to `channel::read()` or `channel::write()` that completes the // selected operation. let mut token = Token::default(); // Try selecting one of the operations without blocking. for &(handle, i, ptr) in handles.iter() { if handle.try_select(&mut token) { return Some((token, i, ptr)); } } loop { // Prepare for blocking. let res = Context::with(|cx| { let mut sel = Selected::Waiting; let mut registered_count = 0; let mut index_ready = None; if let Timeout::Now = timeout { cx.try_select(Selected::Aborted).unwrap(); } // Register all operations. for (handle, i, _) in handles.iter_mut() { registered_count += 1; // If registration returns `false`, that means the operation has just become ready. if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) { // Try aborting select. sel = match cx.try_select(Selected::Aborted) { Ok(()) => { index_ready = Some(*i); Selected::Aborted } Err(s) => s, }; break; } // If another thread has already selected one of the operations, stop registration. sel = cx.selected(); if sel != Selected::Waiting { break; } } if sel == Selected::Waiting { // Check with each operation for how long we're allowed to block, and compute the // earliest deadline. let mut deadline: Option = match timeout { Timeout::Now => return None, Timeout::Never => None, Timeout::At(when) => Some(when), }; for &(handle, _, _) in handles.iter() { if let Some(x) = handle.deadline() { deadline = deadline.map(|y| x.min(y)).or(Some(x)); } } // Block the current thread. sel = cx.wait_until(deadline); } // Unregister all registered operations. for (handle, _, _) in handles.iter_mut().take(registered_count) { handle.unregister(Operation::hook::<&dyn SelectHandle>(handle)); } match sel { Selected::Waiting => unreachable!(), Selected::Aborted => { // If an operation became ready during registration, try selecting it. if let Some(index_ready) = index_ready { for &(handle, i, ptr) in handles.iter() { if i == index_ready && handle.try_select(&mut token) { return Some((i, ptr)); } } } } Selected::Disconnected => {} Selected::Operation(_) => { // Find the selected operation. for (handle, i, ptr) in handles.iter_mut() { // Is this the selected operation? if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle)) { // Try selecting this operation. if handle.accept(&mut token, cx) { return Some((*i, *ptr)); } } } } } None }); // Return if an operation was selected. if let Some((i, ptr)) = res { return Some((token, i, ptr)); } // Try selecting one of the operations without blocking. for &(handle, i, ptr) in handles.iter() { if handle.try_select(&mut token) { return Some((token, i, ptr)); } } match timeout { Timeout::Now => return None, Timeout::Never => {} Timeout::At(when) => { if Instant::now() >= when { return None; } } } } } /// Runs until one of the operations becomes ready, potentially blocking the current thread. fn run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option { if handles.is_empty() { // Wait until the timeout and return. match timeout { Timeout::Now => return None, Timeout::Never => { utils::sleep_until(None); unreachable!(); } Timeout::At(when) => { utils::sleep_until(Some(when)); return None; } } } // Shuffle the operations for fairness. utils::shuffle(handles); loop { let backoff = Backoff::new(); loop { // Check operations for readiness. for &(handle, i, _) in handles.iter() { if handle.is_ready() { return Some(i); } } if backoff.is_completed() { break; } else { backoff.snooze(); } } // Check for timeout. match timeout { Timeout::Now => return None, Timeout::Never => {} Timeout::At(when) => { if Instant::now() >= when { return None; } } } // Prepare for blocking. let res = Context::with(|cx| { let mut sel = Selected::Waiting; let mut registered_count = 0; // Begin watching all operations. for (handle, _, _) in handles.iter_mut() { registered_count += 1; let oper = Operation::hook::<&dyn SelectHandle>(handle); // If registration returns `false`, that means the operation has just become ready. if handle.watch(oper, cx) { sel = match cx.try_select(Selected::Operation(oper)) { Ok(()) => Selected::Operation(oper), Err(s) => s, }; break; } // If another thread has already chosen one of the operations, stop registration. sel = cx.selected(); if sel != Selected::Waiting { break; } } if sel == Selected::Waiting { // Check with each operation for how long we're allowed to block, and compute the // earliest deadline. let mut deadline: Option = match timeout { Timeout::Now => unreachable!(), Timeout::Never => None, Timeout::At(when) => Some(when), }; for &(handle, _, _) in handles.iter() { if let Some(x) = handle.deadline() { deadline = deadline.map(|y| x.min(y)).or(Some(x)); } } // Block the current thread. sel = cx.wait_until(deadline); } // Unwatch all operations. for (handle, _, _) in handles.iter_mut().take(registered_count) { handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle)); } match sel { Selected::Waiting => unreachable!(), Selected::Aborted => {} Selected::Disconnected => {} Selected::Operation(_) => { for (handle, i, _) in handles.iter_mut() { let oper = Operation::hook::<&dyn SelectHandle>(handle); if sel == Selected::Operation(oper) { return Some(*i); } } } } None }); // Return if an operation became ready. if res.is_some() { return res; } } } /// Attempts to select one of the operations without blocking. // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result, TrySelectError> { match run_select(handles, Timeout::Now) { None => Err(TrySelectError), Some((token, index, ptr)) => Ok(SelectedOperation { token, index, ptr, _marker: PhantomData, }), } } /// Blocks until one of the operations becomes ready and selects it. // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a> { if handles.is_empty() { panic!("no operations have been added to `Select`"); } let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap(); SelectedOperation { token, index, ptr, _marker: PhantomData, } } /// Blocks for a limited time until one of the operations becomes ready and selects it. // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result, SelectTimeoutError> { select_deadline(handles, utils::convert_timeout_to_deadline(timeout)) } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. #[inline] pub(crate) fn select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result, SelectTimeoutError> { match run_select(handles, Timeout::At(deadline)) { None => Err(SelectTimeoutError), Some((token, index, ptr)) => Ok(SelectedOperation { token, index, ptr, _marker: PhantomData, }), } } /// Selects from a set of channel operations. /// /// `Select` allows you to define a set of channel operations, wait until any one of them becomes /// ready, and finally execute it. If multiple operations are ready at the same time, a random one /// among them is selected. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even /// when it will simply return an error because the channel is disconnected. /// /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a /// dynamically created list of channel operations. /// /// [`select!`]: crate::select! /// /// Once a list of operations has been built with `Select`, there are two different ways of /// proceeding: /// /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful, /// the returned selected operation has already begun and **must** be completed. If we don't /// complete it, a panic will occur. /// /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's /// possible for another thread to make the operation not ready just before we try executing it, /// so it's wise to use a retry loop. However, note that these methods might return with success /// spuriously, so it's a good idea to always double check if the operation is really ready. /// /// # Examples /// /// Use [`select`] to receive a message from a list of receivers: /// /// ``` /// use crossbeam_channel::{Receiver, RecvError, Select}; /// /// fn recv_multiple(rs: &[Receiver]) -> Result { /// // Build a list of operations. /// let mut sel = Select::new(); /// for r in rs { /// sel.recv(r); /// } /// /// // Complete the selected operation. /// let oper = sel.select(); /// let index = oper.index(); /// oper.recv(&rs[index]) /// } /// ``` /// /// Use [`ready`] to receive a message from a list of receivers: /// /// ``` /// use crossbeam_channel::{Receiver, RecvError, Select}; /// /// fn recv_multiple(rs: &[Receiver]) -> Result { /// // Build a list of operations. /// let mut sel = Select::new(); /// for r in rs { /// sel.recv(r); /// } /// /// loop { /// // Wait until a receive operation becomes ready and try executing it. /// let index = sel.ready(); /// let res = rs[index].try_recv(); /// /// // If the operation turns out not to be ready, retry. /// if let Err(e) = res { /// if e.is_empty() { /// continue; /// } /// } /// /// // Success! /// return res.map_err(|_| RecvError); /// } /// } /// ``` /// /// [`try_select`]: Select::try_select /// [`select`]: Select::select /// [`select_timeout`]: Select::select_timeout /// [`try_ready`]: Select::try_ready /// [`ready`]: Select::ready /// [`ready_timeout`]: Select::ready_timeout pub struct Select<'a> { /// A list of senders and receivers participating in selection. handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>, /// The next index to assign to an operation. next_index: usize, } unsafe impl Send for Select<'_> {} unsafe impl Sync for Select<'_> {} impl<'a> Select<'a> { /// Creates an empty list of channel operations for selection. /// /// # Examples /// /// ``` /// use crossbeam_channel::Select; /// /// let mut sel = Select::new(); /// /// // The list of operations is empty, which means no operation can be selected. /// assert!(sel.try_select().is_err()); /// ``` pub fn new() -> Select<'a> { Select { handles: Vec::with_capacity(4), next_index: 0, } } /// Adds a send operation. /// /// Returns the index of the added operation. /// /// # Examples /// /// ``` /// use crossbeam_channel::{unbounded, Select}; /// /// let (s, r) = unbounded::(); /// /// let mut sel = Select::new(); /// let index = sel.send(&s); /// ``` pub fn send(&mut self, s: &'a Sender) -> usize { let i = self.next_index; let ptr = s as *const Sender<_> as *const u8; self.handles.push((s, i, ptr)); self.next_index += 1; i } /// Adds a receive operation. /// /// Returns the index of the added operation. /// /// # Examples /// /// ``` /// use crossbeam_channel::{unbounded, Select}; /// /// let (s, r) = unbounded::(); /// /// let mut sel = Select::new(); /// let index = sel.recv(&r); /// ``` pub fn recv(&mut self, r: &'a Receiver) -> usize { let i = self.next_index; let ptr = r as *const Receiver<_> as *const u8; self.handles.push((r, i, ptr)); self.next_index += 1; i } /// Removes a previously added operation. /// /// This is useful when an operation is selected because the channel got disconnected and we /// want to try again to select a different operation instead. /// /// If new operations are added after removing some, the indices of removed operations will not /// be reused. /// /// # Panics /// /// An attempt to remove a non-existing or already removed operation will panic. /// /// # Examples /// /// ``` /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded::(); /// let (_, r2) = unbounded::(); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // Both operations are initially ready, so a random one will be executed. /// let oper = sel.select(); /// assert_eq!(oper.index(), oper2); /// assert!(oper.recv(&r2).is_err()); /// sel.remove(oper2); /// /// s1.send(10).unwrap(); /// /// let oper = sel.select(); /// assert_eq!(oper.index(), oper1); /// assert_eq!(oper.recv(&r1), Ok(10)); /// ``` pub fn remove(&mut self, index: usize) { assert!( index < self.next_index, "index out of bounds; {} >= {}", index, self.next_index, ); let i = self .handles .iter() .enumerate() .find(|(_, (_, i, _))| *i == index) .expect("no operation with this index") .0; self.handles.swap_remove(i); } /// Attempts to select one of the operations without blocking. /// /// If an operation is ready, it is selected and returned. If multiple operations are ready at /// the same time, a random one among them is selected. If none of the operations are ready, an /// error is returned. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// The selected operation must be completed with [`SelectedOperation::send`] /// or [`SelectedOperation::recv`]. /// /// # Examples /// /// ``` /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// s1.send(10).unwrap(); /// s2.send(20).unwrap(); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // Both operations are initially ready, so a random one will be executed. /// let oper = sel.try_select(); /// match oper { /// Err(_) => panic!("both operations should be ready"), /// Ok(oper) => match oper.index() { /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), /// _ => unreachable!(), /// } /// } /// ``` pub fn try_select(&mut self) -> Result, TrySelectError> { try_select(&mut self.handles) } /// Blocks until one of the operations becomes ready and selects it. /// /// Once an operation becomes ready, it is selected and returned. If multiple operations are /// ready at the same time, a random one among them is selected. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// The selected operation must be completed with [`SelectedOperation::send`] /// or [`SelectedOperation::recv`]. /// /// # Panics /// /// Panics if no operations have been added to `Select`. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s1.send(10).unwrap(); /// }); /// thread::spawn(move || s2.send(20).unwrap()); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // The second operation will be selected because it becomes ready first. /// let oper = sel.select(); /// match oper.index() { /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), /// _ => unreachable!(), /// } /// ``` pub fn select(&mut self) -> SelectedOperation<'a> { select(&mut self.handles) } /// Blocks for a limited time until one of the operations becomes ready and selects it. /// /// If an operation becomes ready, it is selected and returned. If multiple operations are /// ready at the same time, a random one among them is selected. If none of the operations /// become ready for the specified duration, an error is returned. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// The selected operation must be completed with [`SelectedOperation::send`] /// or [`SelectedOperation::recv`]. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s1.send(10).unwrap(); /// }); /// thread::spawn(move || s2.send(20).unwrap()); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // The second operation will be selected because it becomes ready first. /// let oper = sel.select_timeout(Duration::from_millis(500)); /// match oper { /// Err(_) => panic!("should not have timed out"), /// Ok(oper) => match oper.index() { /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), /// _ => unreachable!(), /// } /// } /// ``` pub fn select_timeout( &mut self, timeout: Duration, ) -> Result, SelectTimeoutError> { select_timeout(&mut self.handles, timeout) } /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. /// /// If an operation becomes ready, it is selected and returned. If multiple operations are /// ready at the same time, a random one among them is selected. If none of the operations /// become ready before the given deadline, an error is returned. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// The selected operation must be completed with [`SelectedOperation::send`] /// or [`SelectedOperation::recv`]. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::{Instant, Duration}; /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s1.send(10).unwrap(); /// }); /// thread::spawn(move || s2.send(20).unwrap()); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// let deadline = Instant::now() + Duration::from_millis(500); /// /// // The second operation will be selected because it becomes ready first. /// let oper = sel.select_deadline(deadline); /// match oper { /// Err(_) => panic!("should not have timed out"), /// Ok(oper) => match oper.index() { /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), /// _ => unreachable!(), /// } /// } /// ``` pub fn select_deadline( &mut self, deadline: Instant, ) -> Result, SelectTimeoutError> { select_deadline(&mut self.handles, deadline) } /// Attempts to find a ready operation without blocking. /// /// If an operation is ready, its index is returned. If multiple operations are ready at the /// same time, a random one among them is chosen. If none of the operations are ready, an error /// is returned. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// Note that this method might return with success spuriously, so it's a good idea to always /// double check if the operation is really ready. /// /// # Examples /// /// ``` /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// s1.send(10).unwrap(); /// s2.send(20).unwrap(); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // Both operations are initially ready, so a random one will be chosen. /// match sel.try_ready() { /// Err(_) => panic!("both operations should be ready"), /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), /// Ok(_) => unreachable!(), /// } /// ``` pub fn try_ready(&mut self) -> Result { match run_ready(&mut self.handles, Timeout::Now) { None => Err(TryReadyError), Some(index) => Ok(index), } } /// Blocks until one of the operations becomes ready. /// /// Once an operation becomes ready, its index is returned. If multiple operations are ready at /// the same time, a random one among them is chosen. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// Note that this method might return with success spuriously, so it's a good idea to always /// double check if the operation is really ready. /// /// # Panics /// /// Panics if no operations have been added to `Select`. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s1.send(10).unwrap(); /// }); /// thread::spawn(move || s2.send(20).unwrap()); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // The second operation will be selected because it becomes ready first. /// match sel.ready() { /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), /// _ => unreachable!(), /// } /// ``` pub fn ready(&mut self) -> usize { if self.handles.is_empty() { panic!("no operations have been added to `Select`"); } run_ready(&mut self.handles, Timeout::Never).unwrap() } /// Blocks for a limited time until one of the operations becomes ready. /// /// If an operation becomes ready, its index is returned. If multiple operations are ready at /// the same time, a random one among them is chosen. If none of the operations become ready /// for the specified duration, an error is returned. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// Note that this method might return with success spuriously, so it's a good idea to double /// check if the operation is really ready. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::Duration; /// use crossbeam_channel::{unbounded, Select}; /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s1.send(10).unwrap(); /// }); /// thread::spawn(move || s2.send(20).unwrap()); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // The second operation will be selected because it becomes ready first. /// match sel.ready_timeout(Duration::from_millis(500)) { /// Err(_) => panic!("should not have timed out"), /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), /// Ok(_) => unreachable!(), /// } /// ``` pub fn ready_timeout(&mut self, timeout: Duration) -> Result { self.ready_deadline(utils::convert_timeout_to_deadline(timeout)) } /// Blocks until a given deadline, or until one of the operations becomes ready. /// /// If an operation becomes ready, its index is returned. If multiple operations are ready at /// the same time, a random one among them is chosen. If none of the operations become ready /// before the deadline, an error is returned. /// /// An operation is considered to be ready if it doesn't have to block. Note that it is ready /// even when it will simply return an error because the channel is disconnected. /// /// Note that this method might return with success spuriously, so it's a good idea to double /// check if the operation is really ready. /// /// # Examples /// /// ``` /// use std::thread; /// use std::time::{Duration, Instant}; /// use crossbeam_channel::{unbounded, Select}; /// /// let deadline = Instant::now() + Duration::from_millis(500); /// /// let (s1, r1) = unbounded(); /// let (s2, r2) = unbounded(); /// /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(1)); /// s1.send(10).unwrap(); /// }); /// thread::spawn(move || s2.send(20).unwrap()); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r1); /// let oper2 = sel.recv(&r2); /// /// // The second operation will be selected because it becomes ready first. /// match sel.ready_deadline(deadline) { /// Err(_) => panic!("should not have timed out"), /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), /// Ok(_) => unreachable!(), /// } /// ``` pub fn ready_deadline(&mut self, deadline: Instant) -> Result { match run_ready(&mut self.handles, Timeout::At(deadline)) { None => Err(ReadyTimeoutError), Some(index) => Ok(index), } } } impl<'a> Clone for Select<'a> { fn clone(&self) -> Select<'a> { Select { handles: self.handles.clone(), next_index: self.next_index, } } } impl<'a> Default for Select<'a> { fn default() -> Select<'a> { Select::new() } } impl fmt::Debug for Select<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Select { .. }") } } /// A selected operation that needs to be completed. /// /// To complete the operation, call [`send`] or [`recv`]. /// /// # Panics /// /// Forgetting to complete the operation is an error and might lead to deadlocks. If a /// `SelectedOperation` is dropped without completion, a panic occurs. /// /// [`send`]: SelectedOperation::send /// [`recv`]: SelectedOperation::recv #[must_use] pub struct SelectedOperation<'a> { /// Token needed to complete the operation. token: Token, /// The index of the selected operation. index: usize, /// The address of the selected `Sender` or `Receiver`. ptr: *const u8, /// Indicates that `Sender`s and `Receiver`s are borrowed. _marker: PhantomData<&'a ()>, } impl SelectedOperation<'_> { /// Returns the index of the selected operation. /// /// # Examples /// /// ``` /// use crossbeam_channel::{bounded, Select}; /// /// let (s1, r1) = bounded::<()>(0); /// let (s2, r2) = bounded::<()>(0); /// let (s3, r3) = bounded::<()>(1); /// /// let mut sel = Select::new(); /// let oper1 = sel.send(&s1); /// let oper2 = sel.recv(&r2); /// let oper3 = sel.send(&s3); /// /// // Only the last operation is ready. /// let oper = sel.select(); /// assert_eq!(oper.index(), 2); /// assert_eq!(oper.index(), oper3); /// /// // Complete the operation. /// oper.send(&s3, ()).unwrap(); /// ``` pub fn index(&self) -> usize { self.index } /// Completes the send operation. /// /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`] /// when the operation was added. /// /// # Panics /// /// Panics if an incorrect [`Sender`] reference is passed. /// /// # Examples /// /// ``` /// use crossbeam_channel::{bounded, Select, SendError}; /// /// let (s, r) = bounded::(0); /// drop(r); /// /// let mut sel = Select::new(); /// let oper1 = sel.send(&s); /// /// let oper = sel.select(); /// assert_eq!(oper.index(), oper1); /// assert_eq!(oper.send(&s, 10), Err(SendError(10))); /// ``` pub fn send(mut self, s: &Sender, msg: T) -> Result<(), SendError> { assert!( s as *const Sender as *const u8 == self.ptr, "passed a sender that wasn't selected", ); let res = unsafe { channel::write(s, &mut self.token, msg) }; mem::forget(self); res.map_err(SendError) } /// Completes the receive operation. /// /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`] /// when the operation was added. /// /// # Panics /// /// Panics if an incorrect [`Receiver`] reference is passed. /// /// # Examples /// /// ``` /// use crossbeam_channel::{bounded, Select, RecvError}; /// /// let (s, r) = bounded::(0); /// drop(s); /// /// let mut sel = Select::new(); /// let oper1 = sel.recv(&r); /// /// let oper = sel.select(); /// assert_eq!(oper.index(), oper1); /// assert_eq!(oper.recv(&r), Err(RecvError)); /// ``` pub fn recv(mut self, r: &Receiver) -> Result { assert!( r as *const Receiver as *const u8 == self.ptr, "passed a receiver that wasn't selected", ); let res = unsafe { channel::read(r, &mut self.token) }; mem::forget(self); res.map_err(|_| RecvError) } } impl fmt::Debug for SelectedOperation<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("SelectedOperation { .. }") } } impl Drop for SelectedOperation<'_> { fn drop(&mut self) { panic!("dropped `SelectedOperation` without completing the operation"); } }