diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-19 09:26:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-19 09:26:03 +0000 |
commit | 9918693037dce8aa4bb6f08741b6812923486c18 (patch) | |
tree | 21d2b40bec7e6a7ea664acee056eb3d08e15a1cf /vendor/parking_lot-0.11.2/src/condvar.rs | |
parent | Releasing progress-linux version 1.75.0+dfsg1-5~progress7.99u1. (diff) | |
download | rustc-9918693037dce8aa4bb6f08741b6812923486c18.tar.xz rustc-9918693037dce8aa4bb6f08741b6812923486c18.zip |
Merging upstream version 1.76.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/parking_lot-0.11.2/src/condvar.rs')
-rw-r--r-- | vendor/parking_lot-0.11.2/src/condvar.rs | 1057 |
1 files changed, 0 insertions, 1057 deletions
diff --git a/vendor/parking_lot-0.11.2/src/condvar.rs b/vendor/parking_lot-0.11.2/src/condvar.rs deleted file mode 100644 index 534b8aff8..000000000 --- a/vendor/parking_lot-0.11.2/src/condvar.rs +++ /dev/null @@ -1,1057 +0,0 @@ -// Copyright 2016 Amanieu d'Antras -// -// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or -// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or -// http://opensource.org/licenses/MIT>, at your option. This file may not be -// copied, modified, or distributed except according to those terms. - -use crate::mutex::MutexGuard; -use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; -use crate::{deadlock, util}; -use core::{ - fmt, ptr, - sync::atomic::{AtomicPtr, Ordering}, -}; -use instant::Instant; -use lock_api::RawMutex as RawMutex_; -use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; -use std::time::Duration; - -/// A type indicating whether a timed wait on a condition variable returned -/// due to a time out or not. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub struct WaitTimeoutResult(bool); - -impl WaitTimeoutResult { - /// Returns whether the wait was known to have timed out. - #[inline] - pub fn timed_out(self) -> bool { - self.0 - } -} - -/// A Condition Variable -/// -/// Condition variables represent the ability to block a thread such that it -/// consumes no CPU time while waiting for an event to occur. Condition -/// variables are typically associated with a boolean predicate (a condition) -/// and a mutex. The predicate is always verified inside of the mutex before -/// determining that thread must block. -/// -/// Note that this module places one additional restriction over the system -/// condition variables: each condvar can be used with only one mutex at a -/// time. Any attempt to use multiple mutexes on the same condition variable -/// simultaneously will result in a runtime panic. However it is possible to -/// switch to a different mutex if there are no threads currently waiting on -/// the condition variable. -/// -/// # Differences from the standard library `Condvar` -/// -/// - No spurious wakeups: A wait will only return a non-timeout result if it -/// was woken up by `notify_one` or `notify_all`. -/// - `Condvar::notify_all` will only wake up a single thread, the rest are -/// requeued to wait for the `Mutex` to be unlocked by the thread that was -/// woken up. -/// - Only requires 1 word of space, whereas the standard library boxes the -/// `Condvar` due to platform limitations. -/// - Can be statically constructed (requires the `const_fn` nightly feature). -/// - Does not require any drop glue when dropped. -/// - Inline fast path for the uncontended case. -/// -/// # Examples -/// -/// ``` -/// use parking_lot::{Mutex, Condvar}; -/// use std::sync::Arc; -/// use std::thread; -/// -/// let pair = Arc::new((Mutex::new(false), Condvar::new())); -/// let pair2 = pair.clone(); -/// -/// // Inside of our lock, spawn a new thread, and then wait for it to start -/// thread::spawn(move|| { -/// let &(ref lock, ref cvar) = &*pair2; -/// let mut started = lock.lock(); -/// *started = true; -/// cvar.notify_one(); -/// }); -/// -/// // wait for the thread to start up -/// let &(ref lock, ref cvar) = &*pair; -/// let mut started = lock.lock(); -/// if !*started { -/// cvar.wait(&mut started); -/// } -/// // Note that we used an if instead of a while loop above. This is only -/// // possible because parking_lot's Condvar will never spuriously wake up. -/// // This means that wait() will only return after notify_one or notify_all is -/// // called. -/// ``` -pub struct Condvar { - state: AtomicPtr<RawMutex>, -} - -impl Condvar { - /// Creates a new condition variable which is ready to be waited on and - /// notified. - #[inline] - pub const fn new() -> Condvar { - Condvar { - state: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Wakes up one blocked thread on this condvar. - /// - /// Returns whether a thread was woken up. - /// - /// If there is a blocked thread on this condition variable, then it will - /// be woken up from its call to `wait` or `wait_timeout`. Calls to - /// `notify_one` are not buffered in any way. - /// - /// To wake up all threads, see `notify_all()`. - /// - /// # Examples - /// - /// ``` - /// use parking_lot::Condvar; - /// - /// let condvar = Condvar::new(); - /// - /// // do something with condvar, share it with other threads - /// - /// if !condvar.notify_one() { - /// println!("Nobody was listening for this."); - /// } - /// ``` - #[inline] - pub fn notify_one(&self) -> bool { - // Nothing to do if there are no waiting threads - let state = self.state.load(Ordering::Relaxed); - if state.is_null() { - return false; - } - - self.notify_one_slow(state) - } - - #[cold] - fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { - unsafe { - // Unpark one thread and requeue the rest onto the mutex - let from = self as *const _ as usize; - let to = mutex as usize; - let validate = || { - // Make sure that our atomic state still points to the same - // mutex. If not then it means that all threads on the current - // mutex were woken up and a new waiting thread switched to a - // different mutex. In that case we can get away with doing - // nothing. - if self.state.load(Ordering::Relaxed) != mutex { - return RequeueOp::Abort; - } - - // Unpark one thread if the mutex is unlocked, otherwise just - // requeue everything to the mutex. This is safe to do here - // since unlocking the mutex when the parked bit is set requires - // locking the queue. There is the possibility of a race if the - // mutex gets locked after we check, but that doesn't matter in - // this case. - if (*mutex).mark_parked_if_locked() { - RequeueOp::RequeueOne - } else { - RequeueOp::UnparkOne - } - }; - let callback = |_op, result: UnparkResult| { - // Clear our state if there are no more waiting threads - if !result.have_more_threads { - self.state.store(ptr::null_mut(), Ordering::Relaxed); - } - TOKEN_NORMAL - }; - let res = parking_lot_core::unpark_requeue(from, to, validate, callback); - - res.unparked_threads + res.requeued_threads != 0 - } - } - - /// Wakes up all blocked threads on this condvar. - /// - /// Returns the number of threads woken up. - /// - /// This method will ensure that any current waiters on the condition - /// variable are awoken. Calls to `notify_all()` are not buffered in any - /// way. - /// - /// To wake up only one thread, see `notify_one()`. - #[inline] - pub fn notify_all(&self) -> usize { - // Nothing to do if there are no waiting threads - let state = self.state.load(Ordering::Relaxed); - if state.is_null() { - return 0; - } - - self.notify_all_slow(state) - } - - #[cold] - fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { - unsafe { - // Unpark one thread and requeue the rest onto the mutex - let from = self as *const _ as usize; - let to = mutex as usize; - let validate = || { - // Make sure that our atomic state still points to the same - // mutex. If not then it means that all threads on the current - // mutex were woken up and a new waiting thread switched to a - // different mutex. In that case we can get away with doing - // nothing. - if self.state.load(Ordering::Relaxed) != mutex { - return RequeueOp::Abort; - } - - // Clear our state since we are going to unpark or requeue all - // threads. - self.state.store(ptr::null_mut(), Ordering::Relaxed); - - // Unpark one thread if the mutex is unlocked, otherwise just - // requeue everything to the mutex. This is safe to do here - // since unlocking the mutex when the parked bit is set requires - // locking the queue. There is the possibility of a race if the - // mutex gets locked after we check, but that doesn't matter in - // this case. - if (*mutex).mark_parked_if_locked() { - RequeueOp::RequeueAll - } else { - RequeueOp::UnparkOneRequeueRest - } - }; - let callback = |op, result: UnparkResult| { - // If we requeued threads to the mutex, mark it as having - // parked threads. The RequeueAll case is already handled above. - if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { - (*mutex).mark_parked(); - } - TOKEN_NORMAL - }; - let res = parking_lot_core::unpark_requeue(from, to, validate, callback); - - res.unparked_threads + res.requeued_threads - } - } - - /// Blocks the current thread until this condition variable receives a - /// notification. - /// - /// This function will atomically unlock the mutex specified (represented by - /// `mutex_guard`) and block the current thread. This means that any calls - /// to `notify_*()` which happen logically after the mutex is unlocked are - /// candidates to wake this thread up. When this function call returns, the - /// lock specified will have been re-acquired. - /// - /// # Panics - /// - /// This function will panic if another thread is waiting on the `Condvar` - /// with a different `Mutex` object. - #[inline] - pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { - self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); - } - - /// Waits on this condition variable for a notification, timing out after - /// the specified time instant. - /// - /// The semantics of this function are equivalent to `wait()` except that - /// the thread will be blocked roughly until `timeout` is reached. This - /// method should not be used for precise timing due to anomalies such as - /// preemption or platform differences that may not cause the maximum - /// amount of time waited to be precisely `timeout`. - /// - /// Note that the best effort is made to ensure that the time waited is - /// measured with a monotonic clock, and not affected by the changes made to - /// the system time. - /// - /// The returned `WaitTimeoutResult` value indicates if the timeout is - /// known to have elapsed. - /// - /// Like `wait`, the lock specified will be re-acquired when this function - /// returns, regardless of whether the timeout elapsed or not. - /// - /// # Panics - /// - /// This function will panic if another thread is waiting on the `Condvar` - /// with a different `Mutex` object. - #[inline] - pub fn wait_until<T: ?Sized>( - &self, - mutex_guard: &mut MutexGuard<'_, T>, - timeout: Instant, - ) -> WaitTimeoutResult { - self.wait_until_internal( - unsafe { MutexGuard::mutex(mutex_guard).raw() }, - Some(timeout), - ) - } - - // This is a non-generic function to reduce the monomorphization cost of - // using `wait_until`. - fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { - unsafe { - let result; - let mut bad_mutex = false; - let mut requeued = false; - { - let addr = self as *const _ as usize; - let lock_addr = mutex as *const _ as *mut _; - let validate = || { - // Ensure we don't use two different mutexes with the same - // Condvar at the same time. This is done while locked to - // avoid races with notify_one - let state = self.state.load(Ordering::Relaxed); - if state.is_null() { - self.state.store(lock_addr, Ordering::Relaxed); - } else if state != lock_addr { - bad_mutex = true; - return false; - } - true - }; - let before_sleep = || { - // Unlock the mutex before sleeping... - mutex.unlock(); - }; - let timed_out = |k, was_last_thread| { - // If we were requeued to a mutex, then we did not time out. - // We'll just park ourselves on the mutex again when we try - // to lock it later. - requeued = k != addr; - - // If we were the last thread on the queue then we need to - // clear our state. This is normally done by the - // notify_{one,all} functions when not timing out. - if !requeued && was_last_thread { - self.state.store(ptr::null_mut(), Ordering::Relaxed); - } - }; - result = parking_lot_core::park( - addr, - validate, - before_sleep, - timed_out, - DEFAULT_PARK_TOKEN, - timeout, - ); - } - - // Panic if we tried to use multiple mutexes with a Condvar. Note - // that at this point the MutexGuard is still locked. It will be - // unlocked by the unwinding logic. - if bad_mutex { - panic!("attempted to use a condition variable with more than one mutex"); - } - - // ... and re-lock it once we are done sleeping - if result == ParkResult::Unparked(TOKEN_HANDOFF) { - deadlock::acquire_resource(mutex as *const _ as usize); - } else { - mutex.lock(); - } - - WaitTimeoutResult(!(result.is_unparked() || requeued)) - } - } - - /// Waits on this condition variable for a notification, timing out after a - /// specified duration. - /// - /// The semantics of this function are equivalent to `wait()` except that - /// the thread will be blocked for roughly no longer than `timeout`. This - /// method should not be used for precise timing due to anomalies such as - /// preemption or platform differences that may not cause the maximum - /// amount of time waited to be precisely `timeout`. - /// - /// Note that the best effort is made to ensure that the time waited is - /// measured with a monotonic clock, and not affected by the changes made to - /// the system time. - /// - /// The returned `WaitTimeoutResult` value indicates if the timeout is - /// known to have elapsed. - /// - /// Like `wait`, the lock specified will be re-acquired when this function - /// returns, regardless of whether the timeout elapsed or not. - /// - /// # Panics - /// - /// Panics if the given `timeout` is so large that it can't be added to the current time. - /// This panic is not possible if the crate is built with the `nightly` feature, then a too - /// large `timeout` becomes equivalent to just calling `wait`. - #[inline] - pub fn wait_for<T: ?Sized>( - &self, - mutex_guard: &mut MutexGuard<'_, T>, - timeout: Duration, - ) -> WaitTimeoutResult { - let deadline = util::to_deadline(timeout); - self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) - } -} - -impl Default for Condvar { - #[inline] - fn default() -> Condvar { - Condvar::new() - } -} - -impl fmt::Debug for Condvar { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.pad("Condvar { .. }") - } -} - -#[cfg(test)] -mod tests { - use crate::{Condvar, Mutex, MutexGuard}; - use instant::Instant; - use std::sync::mpsc::channel; - use std::sync::Arc; - use std::thread; - use std::time::Duration; - - #[test] - fn smoke() { - let c = Condvar::new(); - c.notify_one(); - c.notify_all(); - } - - #[test] - fn notify_one() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let mut g = m.lock(); - let _t = thread::spawn(move || { - let _g = m2.lock(); - c2.notify_one(); - }); - c.wait(&mut g); - } - - #[test] - fn notify_all() { - const N: usize = 10; - - let data = Arc::new((Mutex::new(0), Condvar::new())); - let (tx, rx) = channel(); - for _ in 0..N { - let data = data.clone(); - let tx = tx.clone(); - thread::spawn(move || { - let &(ref lock, ref cond) = &*data; - let mut cnt = lock.lock(); - *cnt += 1; - if *cnt == N { - tx.send(()).unwrap(); - } - while *cnt != 0 { - cond.wait(&mut cnt); - } - tx.send(()).unwrap(); - }); - } - drop(tx); - - let &(ref lock, ref cond) = &*data; - rx.recv().unwrap(); - let mut cnt = lock.lock(); - *cnt = 0; - cond.notify_all(); - drop(cnt); - - for _ in 0..N { - rx.recv().unwrap(); - } - } - - #[test] - fn notify_one_return_true() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let mut g = m.lock(); - let _t = thread::spawn(move || { - let _g = m2.lock(); - assert!(c2.notify_one()); - }); - c.wait(&mut g); - } - - #[test] - fn notify_one_return_false() { - let m = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - - let _t = thread::spawn(move || { - let _g = m.lock(); - assert!(!c.notify_one()); - }); - } - - #[test] - fn notify_all_return() { - const N: usize = 10; - - let data = Arc::new((Mutex::new(0), Condvar::new())); - let (tx, rx) = channel(); - for _ in 0..N { - let data = data.clone(); - let tx = tx.clone(); - thread::spawn(move || { - let &(ref lock, ref cond) = &*data; - let mut cnt = lock.lock(); - *cnt += 1; - if *cnt == N { - tx.send(()).unwrap(); - } - while *cnt != 0 { - cond.wait(&mut cnt); - } - tx.send(()).unwrap(); - }); - } - drop(tx); - - let &(ref lock, ref cond) = &*data; - rx.recv().unwrap(); - let mut cnt = lock.lock(); - *cnt = 0; - assert_eq!(cond.notify_all(), N); - drop(cnt); - - for _ in 0..N { - rx.recv().unwrap(); - } - - assert_eq!(cond.notify_all(), 0); - } - - #[test] - fn wait_for() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let mut g = m.lock(); - let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); - assert!(no_timeout.timed_out()); - - let _t = thread::spawn(move || { - let _g = m2.lock(); - c2.notify_one(); - }); - // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait. - let very_long_timeout = if cfg!(feature = "nightly") { - Duration::from_secs(u64::max_value()) - } else { - Duration::from_millis(u32::max_value() as u64) - }; - - let timeout_res = c.wait_for(&mut g, very_long_timeout); - assert!(!timeout_res.timed_out()); - - drop(g); - } - - #[test] - fn wait_until() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let mut g = m.lock(); - let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); - assert!(no_timeout.timed_out()); - let _t = thread::spawn(move || { - let _g = m2.lock(); - c2.notify_one(); - }); - let timeout_res = c.wait_until( - &mut g, - Instant::now() + Duration::from_millis(u32::max_value() as u64), - ); - assert!(!timeout_res.timed_out()); - drop(g); - } - - #[test] - #[should_panic] - fn two_mutexes() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let m3 = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - // Make sure we don't leave the child thread dangling - struct PanicGuard<'a>(&'a Condvar); - impl<'a> Drop for PanicGuard<'a> { - fn drop(&mut self) { - self.0.notify_one(); - } - } - - let (tx, rx) = channel(); - let g = m.lock(); - let _t = thread::spawn(move || { - let mut g = m2.lock(); - tx.send(()).unwrap(); - c2.wait(&mut g); - }); - drop(g); - rx.recv().unwrap(); - let _g = m.lock(); - let _guard = PanicGuard(&*c); - c.wait(&mut m3.lock()); - } - - #[test] - fn two_mutexes_disjoint() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let m3 = Arc::new(Mutex::new(())); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - - let mut g = m.lock(); - let _t = thread::spawn(move || { - let _g = m2.lock(); - c2.notify_one(); - }); - c.wait(&mut g); - drop(g); - - let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); - } - - #[test] - fn test_debug_condvar() { - let c = Condvar::new(); - assert_eq!(format!("{:?}", c), "Condvar { .. }"); - } - - #[test] - fn test_condvar_requeue() { - let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); - let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - let t = thread::spawn(move || { - let mut g = m2.lock(); - c2.wait(&mut g); - }); - - let mut g = m.lock(); - while !c.notify_one() { - // Wait for the thread to get into wait() - MutexGuard::bump(&mut g); - // Yield, so the other thread gets a chance to do something. - // (At least Miri needs this, because it doesn't preempt threads.) - thread::yield_now(); - } - // The thread should have been requeued to the mutex, which we wake up now. - drop(g); - t.join().unwrap(); - } - - #[test] - fn test_issue_129() { - let locks = Arc::new((Mutex::new(()), Condvar::new())); - - let (tx, rx) = channel(); - for _ in 0..4 { - let locks = locks.clone(); - let tx = tx.clone(); - thread::spawn(move || { - let mut guard = locks.0.lock(); - locks.1.wait(&mut guard); - locks.1.wait_for(&mut guard, Duration::from_millis(1)); - locks.1.notify_one(); - tx.send(()).unwrap(); - }); - } - - thread::sleep(Duration::from_millis(100)); - locks.1.notify_one(); - - for _ in 0..4 { - assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); - } - } -} - -/// This module contains an integration test that is heavily inspired from WebKit's own integration -/// tests for it's own Condvar. -#[cfg(test)] -mod webkit_queue_test { - use crate::{Condvar, Mutex, MutexGuard}; - use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; - - #[derive(Clone, Copy)] - enum Timeout { - Bounded(Duration), - Forever, - } - - #[derive(Clone, Copy)] - enum NotifyStyle { - One, - All, - } - - struct Queue { - items: VecDeque<usize>, - should_continue: bool, - } - - impl Queue { - fn new() -> Self { - Self { - items: VecDeque::new(), - should_continue: true, - } - } - } - - fn wait<T: ?Sized>( - condition: &Condvar, - lock: &mut MutexGuard<'_, T>, - predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, - timeout: &Timeout, - ) { - while !predicate(lock) { - match timeout { - Timeout::Forever => condition.wait(lock), - Timeout::Bounded(bound) => { - condition.wait_for(lock, *bound); - } - } - } - } - - fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { - match style { - NotifyStyle::One => { - condition.notify_one(); - } - NotifyStyle::All => { - if should_notify { - condition.notify_all(); - } - } - } - } - - fn run_queue_test( - num_producers: usize, - num_consumers: usize, - max_queue_size: usize, - messages_per_producer: usize, - notify_style: NotifyStyle, - timeout: Timeout, - delay: Duration, - ) { - let input_queue = Arc::new(Mutex::new(Queue::new())); - let empty_condition = Arc::new(Condvar::new()); - let full_condition = Arc::new(Condvar::new()); - - let output_vec = Arc::new(Mutex::new(vec![])); - - let consumers = (0..num_consumers) - .map(|_| { - consumer_thread( - input_queue.clone(), - empty_condition.clone(), - full_condition.clone(), - timeout, - notify_style, - output_vec.clone(), - max_queue_size, - ) - }) - .collect::<Vec<_>>(); - let producers = (0..num_producers) - .map(|_| { - producer_thread( - messages_per_producer, - input_queue.clone(), - empty_condition.clone(), - full_condition.clone(), - timeout, - notify_style, - max_queue_size, - ) - }) - .collect::<Vec<_>>(); - - thread::sleep(delay); - - for producer in producers.into_iter() { - producer.join().expect("Producer thread panicked"); - } - - { - let mut input_queue = input_queue.lock(); - input_queue.should_continue = false; - } - empty_condition.notify_all(); - - for consumer in consumers.into_iter() { - consumer.join().expect("Consumer thread panicked"); - } - - let mut output_vec = output_vec.lock(); - assert_eq!(output_vec.len(), num_producers * messages_per_producer); - output_vec.sort(); - for msg_idx in 0..messages_per_producer { - for producer_idx in 0..num_producers { - assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); - } - } - } - - fn consumer_thread( - input_queue: Arc<Mutex<Queue>>, - empty_condition: Arc<Condvar>, - full_condition: Arc<Condvar>, - timeout: Timeout, - notify_style: NotifyStyle, - output_queue: Arc<Mutex<Vec<usize>>>, - max_queue_size: usize, - ) -> thread::JoinHandle<()> { - thread::spawn(move || loop { - let (should_notify, result) = { - let mut queue = input_queue.lock(); - wait( - &*empty_condition, - &mut queue, - |state| -> bool { !state.items.is_empty() || !state.should_continue }, - &timeout, - ); - if queue.items.is_empty() && !queue.should_continue { - return; - } - let should_notify = queue.items.len() == max_queue_size; - let result = queue.items.pop_front(); - std::mem::drop(queue); - (should_notify, result) - }; - notify(notify_style, &*full_condition, should_notify); - - if let Some(result) = result { - output_queue.lock().push(result); - } - }) - } - - fn producer_thread( - num_messages: usize, - queue: Arc<Mutex<Queue>>, - empty_condition: Arc<Condvar>, - full_condition: Arc<Condvar>, - timeout: Timeout, - notify_style: NotifyStyle, - max_queue_size: usize, - ) -> thread::JoinHandle<()> { - thread::spawn(move || { - for message in 0..num_messages { - let should_notify = { - let mut queue = queue.lock(); - wait( - &*full_condition, - &mut queue, - |state| state.items.len() < max_queue_size, - &timeout, - ); - let should_notify = queue.items.is_empty(); - queue.items.push_back(message); - std::mem::drop(queue); - should_notify - }; - notify(notify_style, &*empty_condition, should_notify); - } - }) - } - - macro_rules! run_queue_tests { - ( $( $name:ident( - num_producers: $num_producers:expr, - num_consumers: $num_consumers:expr, - max_queue_size: $max_queue_size:expr, - messages_per_producer: $messages_per_producer:expr, - notification_style: $notification_style:expr, - timeout: $timeout:expr, - delay_seconds: $delay_seconds:expr); - )* ) => { - $(#[test] - fn $name() { - let delay = Duration::from_secs($delay_seconds); - run_queue_test( - $num_producers, - $num_consumers, - $max_queue_size, - $messages_per_producer, - $notification_style, - $timeout, - delay, - ); - })* - }; - } - - run_queue_tests! { - sanity_check_queue( - num_producers: 1, - num_consumers: 1, - max_queue_size: 1, - messages_per_producer: 100_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Bounded(Duration::from_secs(1)), - delay_seconds: 0 - ); - sanity_check_queue_timeout( - num_producers: 1, - num_consumers: 1, - max_queue_size: 1, - messages_per_producer: 100_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - new_test_without_timeout_5( - num_producers: 1, - num_consumers: 5, - max_queue_size: 1, - messages_per_producer: 100_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - one_producer_one_consumer_one_slot( - num_producers: 1, - num_consumers: 1, - max_queue_size: 1, - messages_per_producer: 100_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - one_producer_one_consumer_one_slot_timeout( - num_producers: 1, - num_consumers: 1, - max_queue_size: 1, - messages_per_producer: 100_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 1 - ); - one_producer_one_consumer_hundred_slots( - num_producers: 1, - num_consumers: 1, - max_queue_size: 100, - messages_per_producer: 1_000_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - ten_producers_one_consumer_one_slot( - num_producers: 10, - num_consumers: 1, - max_queue_size: 1, - messages_per_producer: 10000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - ten_producers_one_consumer_hundred_slots_notify_all( - num_producers: 10, - num_consumers: 1, - max_queue_size: 100, - messages_per_producer: 10000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - ten_producers_one_consumer_hundred_slots_notify_one( - num_producers: 10, - num_consumers: 1, - max_queue_size: 100, - messages_per_producer: 10000, - notification_style: NotifyStyle::One, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - one_producer_ten_consumers_one_slot( - num_producers: 1, - num_consumers: 10, - max_queue_size: 1, - messages_per_producer: 10000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - one_producer_ten_consumers_hundred_slots_notify_all( - num_producers: 1, - num_consumers: 10, - max_queue_size: 100, - messages_per_producer: 100_000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - one_producer_ten_consumers_hundred_slots_notify_one( - num_producers: 1, - num_consumers: 10, - max_queue_size: 100, - messages_per_producer: 100_000, - notification_style: NotifyStyle::One, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - ten_producers_ten_consumers_one_slot( - num_producers: 10, - num_consumers: 10, - max_queue_size: 1, - messages_per_producer: 50000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - ten_producers_ten_consumers_hundred_slots_notify_all( - num_producers: 10, - num_consumers: 10, - max_queue_size: 100, - messages_per_producer: 50000, - notification_style: NotifyStyle::All, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - ten_producers_ten_consumers_hundred_slots_notify_one( - num_producers: 10, - num_consumers: 10, - max_queue_size: 100, - messages_per_producer: 50000, - notification_style: NotifyStyle::One, - timeout: Timeout::Forever, - delay_seconds: 0 - ); - } -} |