diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/parking_lot/src/condvar.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/parking_lot/src/condvar.rs')
-rw-r--r-- | third_party/rust/parking_lot/src/condvar.rs | 1057 |
1 files changed, 1057 insertions, 0 deletions
diff --git a/third_party/rust/parking_lot/src/condvar.rs b/third_party/rust/parking_lot/src/condvar.rs new file mode 100644 index 0000000000..534b8aff8b --- /dev/null +++ b/third_party/rust/parking_lot/src/condvar.rs @@ -0,0 +1,1057 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::mutex::MutexGuard; +use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; +use crate::{deadlock, util}; +use core::{ + fmt, ptr, + sync::atomic::{AtomicPtr, Ordering}, +}; +use instant::Instant; +use lock_api::RawMutex as RawMutex_; +use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; +use std::time::Duration; + +/// A type indicating whether a timed wait on a condition variable returned +/// due to a time out or not. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct WaitTimeoutResult(bool); + +impl WaitTimeoutResult { + /// Returns whether the wait was known to have timed out. + #[inline] + pub fn timed_out(self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// Condition variables represent the ability to block a thread such that it +/// consumes no CPU time while waiting for an event to occur. Condition +/// variables are typically associated with a boolean predicate (a condition) +/// and a mutex. The predicate is always verified inside of the mutex before +/// determining that thread must block. +/// +/// Note that this module places one additional restriction over the system +/// condition variables: each condvar can be used with only one mutex at a +/// time. Any attempt to use multiple mutexes on the same condition variable +/// simultaneously will result in a runtime panic. However it is possible to +/// switch to a different mutex if there are no threads currently waiting on +/// the condition variable. +/// +/// # Differences from the standard library `Condvar` +/// +/// - No spurious wakeups: A wait will only return a non-timeout result if it +/// was woken up by `notify_one` or `notify_all`. +/// - `Condvar::notify_all` will only wake up a single thread, the rest are +/// requeued to wait for the `Mutex` to be unlocked by the thread that was +/// woken up. +/// - Only requires 1 word of space, whereas the standard library boxes the +/// `Condvar` due to platform limitations. +/// - Can be statically constructed (requires the `const_fn` nightly feature). +/// - Does not require any drop glue when dropped. +/// - Inline fast path for the uncontended case. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::{Mutex, Condvar}; +/// use std::sync::Arc; +/// use std::thread; +/// +/// let pair = Arc::new((Mutex::new(false), Condvar::new())); +/// let pair2 = pair.clone(); +/// +/// // Inside of our lock, spawn a new thread, and then wait for it to start +/// thread::spawn(move|| { +/// let &(ref lock, ref cvar) = &*pair2; +/// let mut started = lock.lock(); +/// *started = true; +/// cvar.notify_one(); +/// }); +/// +/// // wait for the thread to start up +/// let &(ref lock, ref cvar) = &*pair; +/// let mut started = lock.lock(); +/// if !*started { +/// cvar.wait(&mut started); +/// } +/// // Note that we used an if instead of a while loop above. This is only +/// // possible because parking_lot's Condvar will never spuriously wake up. +/// // This means that wait() will only return after notify_one or notify_all is +/// // called. +/// ``` +pub struct Condvar { + state: AtomicPtr<RawMutex>, +} + +impl Condvar { + /// Creates a new condition variable which is ready to be waited on and + /// notified. + #[inline] + pub const fn new() -> Condvar { + Condvar { + state: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Wakes up one blocked thread on this condvar. + /// + /// Returns whether a thread was woken up. + /// + /// If there is a blocked thread on this condition variable, then it will + /// be woken up from its call to `wait` or `wait_timeout`. Calls to + /// `notify_one` are not buffered in any way. + /// + /// To wake up all threads, see `notify_all()`. + /// + /// # Examples + /// + /// ``` + /// use parking_lot::Condvar; + /// + /// let condvar = Condvar::new(); + /// + /// // do something with condvar, share it with other threads + /// + /// if !condvar.notify_one() { + /// println!("Nobody was listening for this."); + /// } + /// ``` + #[inline] + pub fn notify_one(&self) -> bool { + // Nothing to do if there are no waiting threads + let state = self.state.load(Ordering::Relaxed); + if state.is_null() { + return false; + } + + self.notify_one_slow(state) + } + + #[cold] + fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { + unsafe { + // Unpark one thread and requeue the rest onto the mutex + let from = self as *const _ as usize; + let to = mutex as usize; + let validate = || { + // Make sure that our atomic state still points to the same + // mutex. If not then it means that all threads on the current + // mutex were woken up and a new waiting thread switched to a + // different mutex. In that case we can get away with doing + // nothing. + if self.state.load(Ordering::Relaxed) != mutex { + return RequeueOp::Abort; + } + + // Unpark one thread if the mutex is unlocked, otherwise just + // requeue everything to the mutex. This is safe to do here + // since unlocking the mutex when the parked bit is set requires + // locking the queue. There is the possibility of a race if the + // mutex gets locked after we check, but that doesn't matter in + // this case. + if (*mutex).mark_parked_if_locked() { + RequeueOp::RequeueOne + } else { + RequeueOp::UnparkOne + } + }; + let callback = |_op, result: UnparkResult| { + // Clear our state if there are no more waiting threads + if !result.have_more_threads { + self.state.store(ptr::null_mut(), Ordering::Relaxed); + } + TOKEN_NORMAL + }; + let res = parking_lot_core::unpark_requeue(from, to, validate, callback); + + res.unparked_threads + res.requeued_threads != 0 + } + } + + /// Wakes up all blocked threads on this condvar. + /// + /// Returns the number of threads woken up. + /// + /// This method will ensure that any current waiters on the condition + /// variable are awoken. Calls to `notify_all()` are not buffered in any + /// way. + /// + /// To wake up only one thread, see `notify_one()`. + #[inline] + pub fn notify_all(&self) -> usize { + // Nothing to do if there are no waiting threads + let state = self.state.load(Ordering::Relaxed); + if state.is_null() { + return 0; + } + + self.notify_all_slow(state) + } + + #[cold] + fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { + unsafe { + // Unpark one thread and requeue the rest onto the mutex + let from = self as *const _ as usize; + let to = mutex as usize; + let validate = || { + // Make sure that our atomic state still points to the same + // mutex. If not then it means that all threads on the current + // mutex were woken up and a new waiting thread switched to a + // different mutex. In that case we can get away with doing + // nothing. + if self.state.load(Ordering::Relaxed) != mutex { + return RequeueOp::Abort; + } + + // Clear our state since we are going to unpark or requeue all + // threads. + self.state.store(ptr::null_mut(), Ordering::Relaxed); + + // Unpark one thread if the mutex is unlocked, otherwise just + // requeue everything to the mutex. This is safe to do here + // since unlocking the mutex when the parked bit is set requires + // locking the queue. There is the possibility of a race if the + // mutex gets locked after we check, but that doesn't matter in + // this case. + if (*mutex).mark_parked_if_locked() { + RequeueOp::RequeueAll + } else { + RequeueOp::UnparkOneRequeueRest + } + }; + let callback = |op, result: UnparkResult| { + // If we requeued threads to the mutex, mark it as having + // parked threads. The RequeueAll case is already handled above. + if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { + (*mutex).mark_parked(); + } + TOKEN_NORMAL + }; + let res = parking_lot_core::unpark_requeue(from, to, validate, callback); + + res.unparked_threads + res.requeued_threads + } + } + + /// Blocks the current thread until this condition variable receives a + /// notification. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `mutex_guard`) and block the current thread. This means that any calls + /// to `notify_*()` which happen logically after the mutex is unlocked are + /// candidates to wake this thread up. When this function call returns, the + /// lock specified will have been re-acquired. + /// + /// # Panics + /// + /// This function will panic if another thread is waiting on the `Condvar` + /// with a different `Mutex` object. + #[inline] + pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { + self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); + } + + /// Waits on this condition variable for a notification, timing out after + /// the specified time instant. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked roughly until `timeout` is reached. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// # Panics + /// + /// This function will panic if another thread is waiting on the `Condvar` + /// with a different `Mutex` object. + #[inline] + pub fn wait_until<T: ?Sized>( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + timeout: Instant, + ) -> WaitTimeoutResult { + self.wait_until_internal( + unsafe { MutexGuard::mutex(mutex_guard).raw() }, + Some(timeout), + ) + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `wait_until`. + fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { + unsafe { + let result; + let mut bad_mutex = false; + let mut requeued = false; + { + let addr = self as *const _ as usize; + let lock_addr = mutex as *const _ as *mut _; + let validate = || { + // Ensure we don't use two different mutexes with the same + // Condvar at the same time. This is done while locked to + // avoid races with notify_one + let state = self.state.load(Ordering::Relaxed); + if state.is_null() { + self.state.store(lock_addr, Ordering::Relaxed); + } else if state != lock_addr { + bad_mutex = true; + return false; + } + true + }; + let before_sleep = || { + // Unlock the mutex before sleeping... + mutex.unlock(); + }; + let timed_out = |k, was_last_thread| { + // If we were requeued to a mutex, then we did not time out. + // We'll just park ourselves on the mutex again when we try + // to lock it later. + requeued = k != addr; + + // If we were the last thread on the queue then we need to + // clear our state. This is normally done by the + // notify_{one,all} functions when not timing out. + if !requeued && was_last_thread { + self.state.store(ptr::null_mut(), Ordering::Relaxed); + } + }; + result = parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + DEFAULT_PARK_TOKEN, + timeout, + ); + } + + // Panic if we tried to use multiple mutexes with a Condvar. Note + // that at this point the MutexGuard is still locked. It will be + // unlocked by the unwinding logic. + if bad_mutex { + panic!("attempted to use a condition variable with more than one mutex"); + } + + // ... and re-lock it once we are done sleeping + if result == ParkResult::Unparked(TOKEN_HANDOFF) { + deadlock::acquire_resource(mutex as *const _ as usize); + } else { + mutex.lock(); + } + + WaitTimeoutResult(!(result.is_unparked() || requeued)) + } + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked for roughly no longer than `timeout`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// # Panics + /// + /// Panics if the given `timeout` is so large that it can't be added to the current time. + /// This panic is not possible if the crate is built with the `nightly` feature, then a too + /// large `timeout` becomes equivalent to just calling `wait`. + #[inline] + pub fn wait_for<T: ?Sized>( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + timeout: Duration, + ) -> WaitTimeoutResult { + let deadline = util::to_deadline(timeout); + self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) + } +} + +impl Default for Condvar { + #[inline] + fn default() -> Condvar { + Condvar::new() + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Condvar { .. }") + } +} + +#[cfg(test)] +mod tests { + use crate::{Condvar, Mutex, MutexGuard}; + use instant::Instant; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + #[test] + fn smoke() { + let c = Condvar::new(); + c.notify_one(); + c.notify_all(); + } + + #[test] + fn notify_one() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + c.wait(&mut g); + } + + #[test] + fn notify_all() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cond.wait(&mut cnt); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock(); + *cnt = 0; + cond.notify_all(); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } + } + + #[test] + fn notify_one_return_true() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + assert!(c2.notify_one()); + }); + c.wait(&mut g); + } + + #[test] + fn notify_one_return_false() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let _t = thread::spawn(move || { + let _g = m.lock(); + assert!(!c.notify_one()); + }); + } + + #[test] + fn notify_all_return() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cond.wait(&mut cnt); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock(); + *cnt = 0; + assert_eq!(cond.notify_all(), N); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } + + assert_eq!(cond.notify_all(), 0); + } + + #[test] + fn wait_for() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); + assert!(no_timeout.timed_out()); + + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait. + let very_long_timeout = if cfg!(feature = "nightly") { + Duration::from_secs(u64::max_value()) + } else { + Duration::from_millis(u32::max_value() as u64) + }; + + let timeout_res = c.wait_for(&mut g, very_long_timeout); + assert!(!timeout_res.timed_out()); + + drop(g); + } + + #[test] + fn wait_until() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); + assert!(no_timeout.timed_out()); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + let timeout_res = c.wait_until( + &mut g, + Instant::now() + Duration::from_millis(u32::max_value() as u64), + ); + assert!(!timeout_res.timed_out()); + drop(g); + } + + #[test] + #[should_panic] + fn two_mutexes() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let m3 = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + // Make sure we don't leave the child thread dangling + struct PanicGuard<'a>(&'a Condvar); + impl<'a> Drop for PanicGuard<'a> { + fn drop(&mut self) { + self.0.notify_one(); + } + } + + let (tx, rx) = channel(); + let g = m.lock(); + let _t = thread::spawn(move || { + let mut g = m2.lock(); + tx.send(()).unwrap(); + c2.wait(&mut g); + }); + drop(g); + rx.recv().unwrap(); + let _g = m.lock(); + let _guard = PanicGuard(&*c); + c.wait(&mut m3.lock()); + } + + #[test] + fn two_mutexes_disjoint() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let m3 = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let mut g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + c.wait(&mut g); + drop(g); + + let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); + } + + #[test] + fn test_debug_condvar() { + let c = Condvar::new(); + assert_eq!(format!("{:?}", c), "Condvar { .. }"); + } + + #[test] + fn test_condvar_requeue() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + let t = thread::spawn(move || { + let mut g = m2.lock(); + c2.wait(&mut g); + }); + + let mut g = m.lock(); + while !c.notify_one() { + // Wait for the thread to get into wait() + MutexGuard::bump(&mut g); + // Yield, so the other thread gets a chance to do something. + // (At least Miri needs this, because it doesn't preempt threads.) + thread::yield_now(); + } + // The thread should have been requeued to the mutex, which we wake up now. + drop(g); + t.join().unwrap(); + } + + #[test] + fn test_issue_129() { + let locks = Arc::new((Mutex::new(()), Condvar::new())); + + let (tx, rx) = channel(); + for _ in 0..4 { + let locks = locks.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let mut guard = locks.0.lock(); + locks.1.wait(&mut guard); + locks.1.wait_for(&mut guard, Duration::from_millis(1)); + locks.1.notify_one(); + tx.send(()).unwrap(); + }); + } + + thread::sleep(Duration::from_millis(100)); + locks.1.notify_one(); + + for _ in 0..4 { + assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); + } + } +} + +/// This module contains an integration test that is heavily inspired from WebKit's own integration +/// tests for it's own Condvar. +#[cfg(test)] +mod webkit_queue_test { + use crate::{Condvar, Mutex, MutexGuard}; + use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; + + #[derive(Clone, Copy)] + enum Timeout { + Bounded(Duration), + Forever, + } + + #[derive(Clone, Copy)] + enum NotifyStyle { + One, + All, + } + + struct Queue { + items: VecDeque<usize>, + should_continue: bool, + } + + impl Queue { + fn new() -> Self { + Self { + items: VecDeque::new(), + should_continue: true, + } + } + } + + fn wait<T: ?Sized>( + condition: &Condvar, + lock: &mut MutexGuard<'_, T>, + predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, + timeout: &Timeout, + ) { + while !predicate(lock) { + match timeout { + Timeout::Forever => condition.wait(lock), + Timeout::Bounded(bound) => { + condition.wait_for(lock, *bound); + } + } + } + } + + fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { + match style { + NotifyStyle::One => { + condition.notify_one(); + } + NotifyStyle::All => { + if should_notify { + condition.notify_all(); + } + } + } + } + + fn run_queue_test( + num_producers: usize, + num_consumers: usize, + max_queue_size: usize, + messages_per_producer: usize, + notify_style: NotifyStyle, + timeout: Timeout, + delay: Duration, + ) { + let input_queue = Arc::new(Mutex::new(Queue::new())); + let empty_condition = Arc::new(Condvar::new()); + let full_condition = Arc::new(Condvar::new()); + + let output_vec = Arc::new(Mutex::new(vec![])); + + let consumers = (0..num_consumers) + .map(|_| { + consumer_thread( + input_queue.clone(), + empty_condition.clone(), + full_condition.clone(), + timeout, + notify_style, + output_vec.clone(), + max_queue_size, + ) + }) + .collect::<Vec<_>>(); + let producers = (0..num_producers) + .map(|_| { + producer_thread( + messages_per_producer, + input_queue.clone(), + empty_condition.clone(), + full_condition.clone(), + timeout, + notify_style, + max_queue_size, + ) + }) + .collect::<Vec<_>>(); + + thread::sleep(delay); + + for producer in producers.into_iter() { + producer.join().expect("Producer thread panicked"); + } + + { + let mut input_queue = input_queue.lock(); + input_queue.should_continue = false; + } + empty_condition.notify_all(); + + for consumer in consumers.into_iter() { + consumer.join().expect("Consumer thread panicked"); + } + + let mut output_vec = output_vec.lock(); + assert_eq!(output_vec.len(), num_producers * messages_per_producer); + output_vec.sort(); + for msg_idx in 0..messages_per_producer { + for producer_idx in 0..num_producers { + assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); + } + } + } + + fn consumer_thread( + input_queue: Arc<Mutex<Queue>>, + empty_condition: Arc<Condvar>, + full_condition: Arc<Condvar>, + timeout: Timeout, + notify_style: NotifyStyle, + output_queue: Arc<Mutex<Vec<usize>>>, + max_queue_size: usize, + ) -> thread::JoinHandle<()> { + thread::spawn(move || loop { + let (should_notify, result) = { + let mut queue = input_queue.lock(); + wait( + &*empty_condition, + &mut queue, + |state| -> bool { !state.items.is_empty() || !state.should_continue }, + &timeout, + ); + if queue.items.is_empty() && !queue.should_continue { + return; + } + let should_notify = queue.items.len() == max_queue_size; + let result = queue.items.pop_front(); + std::mem::drop(queue); + (should_notify, result) + }; + notify(notify_style, &*full_condition, should_notify); + + if let Some(result) = result { + output_queue.lock().push(result); + } + }) + } + + fn producer_thread( + num_messages: usize, + queue: Arc<Mutex<Queue>>, + empty_condition: Arc<Condvar>, + full_condition: Arc<Condvar>, + timeout: Timeout, + notify_style: NotifyStyle, + max_queue_size: usize, + ) -> thread::JoinHandle<()> { + thread::spawn(move || { + for message in 0..num_messages { + let should_notify = { + let mut queue = queue.lock(); + wait( + &*full_condition, + &mut queue, + |state| state.items.len() < max_queue_size, + &timeout, + ); + let should_notify = queue.items.is_empty(); + queue.items.push_back(message); + std::mem::drop(queue); + should_notify + }; + notify(notify_style, &*empty_condition, should_notify); + } + }) + } + + macro_rules! run_queue_tests { + ( $( $name:ident( + num_producers: $num_producers:expr, + num_consumers: $num_consumers:expr, + max_queue_size: $max_queue_size:expr, + messages_per_producer: $messages_per_producer:expr, + notification_style: $notification_style:expr, + timeout: $timeout:expr, + delay_seconds: $delay_seconds:expr); + )* ) => { + $(#[test] + fn $name() { + let delay = Duration::from_secs($delay_seconds); + run_queue_test( + $num_producers, + $num_consumers, + $max_queue_size, + $messages_per_producer, + $notification_style, + $timeout, + delay, + ); + })* + }; + } + + run_queue_tests! { + sanity_check_queue( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Bounded(Duration::from_secs(1)), + delay_seconds: 0 + ); + sanity_check_queue_timeout( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + new_test_without_timeout_5( + num_producers: 1, + num_consumers: 5, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_one_consumer_one_slot( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_one_consumer_one_slot_timeout( + num_producers: 1, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 1 + ); + one_producer_one_consumer_hundred_slots( + num_producers: 1, + num_consumers: 1, + max_queue_size: 100, + messages_per_producer: 1_000_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_one_consumer_one_slot( + num_producers: 10, + num_consumers: 1, + max_queue_size: 1, + messages_per_producer: 10000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_one_consumer_hundred_slots_notify_all( + num_producers: 10, + num_consumers: 1, + max_queue_size: 100, + messages_per_producer: 10000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_one_consumer_hundred_slots_notify_one( + num_producers: 10, + num_consumers: 1, + max_queue_size: 100, + messages_per_producer: 10000, + notification_style: NotifyStyle::One, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_ten_consumers_one_slot( + num_producers: 1, + num_consumers: 10, + max_queue_size: 1, + messages_per_producer: 10000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_ten_consumers_hundred_slots_notify_all( + num_producers: 1, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 100_000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + one_producer_ten_consumers_hundred_slots_notify_one( + num_producers: 1, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 100_000, + notification_style: NotifyStyle::One, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_ten_consumers_one_slot( + num_producers: 10, + num_consumers: 10, + max_queue_size: 1, + messages_per_producer: 50000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_ten_consumers_hundred_slots_notify_all( + num_producers: 10, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 50000, + notification_style: NotifyStyle::All, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + ten_producers_ten_consumers_hundred_slots_notify_one( + num_producers: 10, + num_consumers: 10, + max_queue_size: 100, + messages_per_producer: 50000, + notification_style: NotifyStyle::One, + timeout: Timeout::Forever, + delay_seconds: 0 + ); + } +} |