summaryrefslogtreecommitdiffstats
path: root/vendor/parking_lot-0.11.2/src/condvar.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-19 09:26:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-19 09:26:03 +0000
commit9918693037dce8aa4bb6f08741b6812923486c18 (patch)
tree21d2b40bec7e6a7ea664acee056eb3d08e15a1cf /vendor/parking_lot-0.11.2/src/condvar.rs
parentReleasing progress-linux version 1.75.0+dfsg1-5~progress7.99u1. (diff)
downloadrustc-9918693037dce8aa4bb6f08741b6812923486c18.tar.xz
rustc-9918693037dce8aa4bb6f08741b6812923486c18.zip
Merging upstream version 1.76.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/parking_lot-0.11.2/src/condvar.rs')
-rw-r--r--vendor/parking_lot-0.11.2/src/condvar.rs1057
1 files changed, 0 insertions, 1057 deletions
diff --git a/vendor/parking_lot-0.11.2/src/condvar.rs b/vendor/parking_lot-0.11.2/src/condvar.rs
deleted file mode 100644
index 534b8aff8..000000000
--- a/vendor/parking_lot-0.11.2/src/condvar.rs
+++ /dev/null
@@ -1,1057 +0,0 @@
-// Copyright 2016 Amanieu d'Antras
-//
-// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
-// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
-// http://opensource.org/licenses/MIT>, at your option. This file may not be
-// copied, modified, or distributed except according to those terms.
-
-use crate::mutex::MutexGuard;
-use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
-use crate::{deadlock, util};
-use core::{
- fmt, ptr,
- sync::atomic::{AtomicPtr, Ordering},
-};
-use instant::Instant;
-use lock_api::RawMutex as RawMutex_;
-use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
-use std::time::Duration;
-
-/// A type indicating whether a timed wait on a condition variable returned
-/// due to a time out or not.
-#[derive(Debug, PartialEq, Eq, Copy, Clone)]
-pub struct WaitTimeoutResult(bool);
-
-impl WaitTimeoutResult {
- /// Returns whether the wait was known to have timed out.
- #[inline]
- pub fn timed_out(self) -> bool {
- self.0
- }
-}
-
-/// A Condition Variable
-///
-/// Condition variables represent the ability to block a thread such that it
-/// consumes no CPU time while waiting for an event to occur. Condition
-/// variables are typically associated with a boolean predicate (a condition)
-/// and a mutex. The predicate is always verified inside of the mutex before
-/// determining that thread must block.
-///
-/// Note that this module places one additional restriction over the system
-/// condition variables: each condvar can be used with only one mutex at a
-/// time. Any attempt to use multiple mutexes on the same condition variable
-/// simultaneously will result in a runtime panic. However it is possible to
-/// switch to a different mutex if there are no threads currently waiting on
-/// the condition variable.
-///
-/// # Differences from the standard library `Condvar`
-///
-/// - No spurious wakeups: A wait will only return a non-timeout result if it
-/// was woken up by `notify_one` or `notify_all`.
-/// - `Condvar::notify_all` will only wake up a single thread, the rest are
-/// requeued to wait for the `Mutex` to be unlocked by the thread that was
-/// woken up.
-/// - Only requires 1 word of space, whereas the standard library boxes the
-/// `Condvar` due to platform limitations.
-/// - Can be statically constructed (requires the `const_fn` nightly feature).
-/// - Does not require any drop glue when dropped.
-/// - Inline fast path for the uncontended case.
-///
-/// # Examples
-///
-/// ```
-/// use parking_lot::{Mutex, Condvar};
-/// use std::sync::Arc;
-/// use std::thread;
-///
-/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
-/// let pair2 = pair.clone();
-///
-/// // Inside of our lock, spawn a new thread, and then wait for it to start
-/// thread::spawn(move|| {
-/// let &(ref lock, ref cvar) = &*pair2;
-/// let mut started = lock.lock();
-/// *started = true;
-/// cvar.notify_one();
-/// });
-///
-/// // wait for the thread to start up
-/// let &(ref lock, ref cvar) = &*pair;
-/// let mut started = lock.lock();
-/// if !*started {
-/// cvar.wait(&mut started);
-/// }
-/// // Note that we used an if instead of a while loop above. This is only
-/// // possible because parking_lot's Condvar will never spuriously wake up.
-/// // This means that wait() will only return after notify_one or notify_all is
-/// // called.
-/// ```
-pub struct Condvar {
- state: AtomicPtr<RawMutex>,
-}
-
-impl Condvar {
- /// Creates a new condition variable which is ready to be waited on and
- /// notified.
- #[inline]
- pub const fn new() -> Condvar {
- Condvar {
- state: AtomicPtr::new(ptr::null_mut()),
- }
- }
-
- /// Wakes up one blocked thread on this condvar.
- ///
- /// Returns whether a thread was woken up.
- ///
- /// If there is a blocked thread on this condition variable, then it will
- /// be woken up from its call to `wait` or `wait_timeout`. Calls to
- /// `notify_one` are not buffered in any way.
- ///
- /// To wake up all threads, see `notify_all()`.
- ///
- /// # Examples
- ///
- /// ```
- /// use parking_lot::Condvar;
- ///
- /// let condvar = Condvar::new();
- ///
- /// // do something with condvar, share it with other threads
- ///
- /// if !condvar.notify_one() {
- /// println!("Nobody was listening for this.");
- /// }
- /// ```
- #[inline]
- pub fn notify_one(&self) -> bool {
- // Nothing to do if there are no waiting threads
- let state = self.state.load(Ordering::Relaxed);
- if state.is_null() {
- return false;
- }
-
- self.notify_one_slow(state)
- }
-
- #[cold]
- fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
- unsafe {
- // Unpark one thread and requeue the rest onto the mutex
- let from = self as *const _ as usize;
- let to = mutex as usize;
- let validate = || {
- // Make sure that our atomic state still points to the same
- // mutex. If not then it means that all threads on the current
- // mutex were woken up and a new waiting thread switched to a
- // different mutex. In that case we can get away with doing
- // nothing.
- if self.state.load(Ordering::Relaxed) != mutex {
- return RequeueOp::Abort;
- }
-
- // Unpark one thread if the mutex is unlocked, otherwise just
- // requeue everything to the mutex. This is safe to do here
- // since unlocking the mutex when the parked bit is set requires
- // locking the queue. There is the possibility of a race if the
- // mutex gets locked after we check, but that doesn't matter in
- // this case.
- if (*mutex).mark_parked_if_locked() {
- RequeueOp::RequeueOne
- } else {
- RequeueOp::UnparkOne
- }
- };
- let callback = |_op, result: UnparkResult| {
- // Clear our state if there are no more waiting threads
- if !result.have_more_threads {
- self.state.store(ptr::null_mut(), Ordering::Relaxed);
- }
- TOKEN_NORMAL
- };
- let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
-
- res.unparked_threads + res.requeued_threads != 0
- }
- }
-
- /// Wakes up all blocked threads on this condvar.
- ///
- /// Returns the number of threads woken up.
- ///
- /// This method will ensure that any current waiters on the condition
- /// variable are awoken. Calls to `notify_all()` are not buffered in any
- /// way.
- ///
- /// To wake up only one thread, see `notify_one()`.
- #[inline]
- pub fn notify_all(&self) -> usize {
- // Nothing to do if there are no waiting threads
- let state = self.state.load(Ordering::Relaxed);
- if state.is_null() {
- return 0;
- }
-
- self.notify_all_slow(state)
- }
-
- #[cold]
- fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
- unsafe {
- // Unpark one thread and requeue the rest onto the mutex
- let from = self as *const _ as usize;
- let to = mutex as usize;
- let validate = || {
- // Make sure that our atomic state still points to the same
- // mutex. If not then it means that all threads on the current
- // mutex were woken up and a new waiting thread switched to a
- // different mutex. In that case we can get away with doing
- // nothing.
- if self.state.load(Ordering::Relaxed) != mutex {
- return RequeueOp::Abort;
- }
-
- // Clear our state since we are going to unpark or requeue all
- // threads.
- self.state.store(ptr::null_mut(), Ordering::Relaxed);
-
- // Unpark one thread if the mutex is unlocked, otherwise just
- // requeue everything to the mutex. This is safe to do here
- // since unlocking the mutex when the parked bit is set requires
- // locking the queue. There is the possibility of a race if the
- // mutex gets locked after we check, but that doesn't matter in
- // this case.
- if (*mutex).mark_parked_if_locked() {
- RequeueOp::RequeueAll
- } else {
- RequeueOp::UnparkOneRequeueRest
- }
- };
- let callback = |op, result: UnparkResult| {
- // If we requeued threads to the mutex, mark it as having
- // parked threads. The RequeueAll case is already handled above.
- if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
- (*mutex).mark_parked();
- }
- TOKEN_NORMAL
- };
- let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
-
- res.unparked_threads + res.requeued_threads
- }
- }
-
- /// Blocks the current thread until this condition variable receives a
- /// notification.
- ///
- /// This function will atomically unlock the mutex specified (represented by
- /// `mutex_guard`) and block the current thread. This means that any calls
- /// to `notify_*()` which happen logically after the mutex is unlocked are
- /// candidates to wake this thread up. When this function call returns, the
- /// lock specified will have been re-acquired.
- ///
- /// # Panics
- ///
- /// This function will panic if another thread is waiting on the `Condvar`
- /// with a different `Mutex` object.
- #[inline]
- pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
- self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
- }
-
- /// Waits on this condition variable for a notification, timing out after
- /// the specified time instant.
- ///
- /// The semantics of this function are equivalent to `wait()` except that
- /// the thread will be blocked roughly until `timeout` is reached. This
- /// method should not be used for precise timing due to anomalies such as
- /// preemption or platform differences that may not cause the maximum
- /// amount of time waited to be precisely `timeout`.
- ///
- /// Note that the best effort is made to ensure that the time waited is
- /// measured with a monotonic clock, and not affected by the changes made to
- /// the system time.
- ///
- /// The returned `WaitTimeoutResult` value indicates if the timeout is
- /// known to have elapsed.
- ///
- /// Like `wait`, the lock specified will be re-acquired when this function
- /// returns, regardless of whether the timeout elapsed or not.
- ///
- /// # Panics
- ///
- /// This function will panic if another thread is waiting on the `Condvar`
- /// with a different `Mutex` object.
- #[inline]
- pub fn wait_until<T: ?Sized>(
- &self,
- mutex_guard: &mut MutexGuard<'_, T>,
- timeout: Instant,
- ) -> WaitTimeoutResult {
- self.wait_until_internal(
- unsafe { MutexGuard::mutex(mutex_guard).raw() },
- Some(timeout),
- )
- }
-
- // This is a non-generic function to reduce the monomorphization cost of
- // using `wait_until`.
- fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
- unsafe {
- let result;
- let mut bad_mutex = false;
- let mut requeued = false;
- {
- let addr = self as *const _ as usize;
- let lock_addr = mutex as *const _ as *mut _;
- let validate = || {
- // Ensure we don't use two different mutexes with the same
- // Condvar at the same time. This is done while locked to
- // avoid races with notify_one
- let state = self.state.load(Ordering::Relaxed);
- if state.is_null() {
- self.state.store(lock_addr, Ordering::Relaxed);
- } else if state != lock_addr {
- bad_mutex = true;
- return false;
- }
- true
- };
- let before_sleep = || {
- // Unlock the mutex before sleeping...
- mutex.unlock();
- };
- let timed_out = |k, was_last_thread| {
- // If we were requeued to a mutex, then we did not time out.
- // We'll just park ourselves on the mutex again when we try
- // to lock it later.
- requeued = k != addr;
-
- // If we were the last thread on the queue then we need to
- // clear our state. This is normally done by the
- // notify_{one,all} functions when not timing out.
- if !requeued && was_last_thread {
- self.state.store(ptr::null_mut(), Ordering::Relaxed);
- }
- };
- result = parking_lot_core::park(
- addr,
- validate,
- before_sleep,
- timed_out,
- DEFAULT_PARK_TOKEN,
- timeout,
- );
- }
-
- // Panic if we tried to use multiple mutexes with a Condvar. Note
- // that at this point the MutexGuard is still locked. It will be
- // unlocked by the unwinding logic.
- if bad_mutex {
- panic!("attempted to use a condition variable with more than one mutex");
- }
-
- // ... and re-lock it once we are done sleeping
- if result == ParkResult::Unparked(TOKEN_HANDOFF) {
- deadlock::acquire_resource(mutex as *const _ as usize);
- } else {
- mutex.lock();
- }
-
- WaitTimeoutResult(!(result.is_unparked() || requeued))
- }
- }
-
- /// Waits on this condition variable for a notification, timing out after a
- /// specified duration.
- ///
- /// The semantics of this function are equivalent to `wait()` except that
- /// the thread will be blocked for roughly no longer than `timeout`. This
- /// method should not be used for precise timing due to anomalies such as
- /// preemption or platform differences that may not cause the maximum
- /// amount of time waited to be precisely `timeout`.
- ///
- /// Note that the best effort is made to ensure that the time waited is
- /// measured with a monotonic clock, and not affected by the changes made to
- /// the system time.
- ///
- /// The returned `WaitTimeoutResult` value indicates if the timeout is
- /// known to have elapsed.
- ///
- /// Like `wait`, the lock specified will be re-acquired when this function
- /// returns, regardless of whether the timeout elapsed or not.
- ///
- /// # Panics
- ///
- /// Panics if the given `timeout` is so large that it can't be added to the current time.
- /// This panic is not possible if the crate is built with the `nightly` feature, then a too
- /// large `timeout` becomes equivalent to just calling `wait`.
- #[inline]
- pub fn wait_for<T: ?Sized>(
- &self,
- mutex_guard: &mut MutexGuard<'_, T>,
- timeout: Duration,
- ) -> WaitTimeoutResult {
- let deadline = util::to_deadline(timeout);
- self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
- }
-}
-
-impl Default for Condvar {
- #[inline]
- fn default() -> Condvar {
- Condvar::new()
- }
-}
-
-impl fmt::Debug for Condvar {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.pad("Condvar { .. }")
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::{Condvar, Mutex, MutexGuard};
- use instant::Instant;
- use std::sync::mpsc::channel;
- use std::sync::Arc;
- use std::thread;
- use std::time::Duration;
-
- #[test]
- fn smoke() {
- let c = Condvar::new();
- c.notify_one();
- c.notify_all();
- }
-
- #[test]
- fn notify_one() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
-
- let mut g = m.lock();
- let _t = thread::spawn(move || {
- let _g = m2.lock();
- c2.notify_one();
- });
- c.wait(&mut g);
- }
-
- #[test]
- fn notify_all() {
- const N: usize = 10;
-
- let data = Arc::new((Mutex::new(0), Condvar::new()));
- let (tx, rx) = channel();
- for _ in 0..N {
- let data = data.clone();
- let tx = tx.clone();
- thread::spawn(move || {
- let &(ref lock, ref cond) = &*data;
- let mut cnt = lock.lock();
- *cnt += 1;
- if *cnt == N {
- tx.send(()).unwrap();
- }
- while *cnt != 0 {
- cond.wait(&mut cnt);
- }
- tx.send(()).unwrap();
- });
- }
- drop(tx);
-
- let &(ref lock, ref cond) = &*data;
- rx.recv().unwrap();
- let mut cnt = lock.lock();
- *cnt = 0;
- cond.notify_all();
- drop(cnt);
-
- for _ in 0..N {
- rx.recv().unwrap();
- }
- }
-
- #[test]
- fn notify_one_return_true() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
-
- let mut g = m.lock();
- let _t = thread::spawn(move || {
- let _g = m2.lock();
- assert!(c2.notify_one());
- });
- c.wait(&mut g);
- }
-
- #[test]
- fn notify_one_return_false() {
- let m = Arc::new(Mutex::new(()));
- let c = Arc::new(Condvar::new());
-
- let _t = thread::spawn(move || {
- let _g = m.lock();
- assert!(!c.notify_one());
- });
- }
-
- #[test]
- fn notify_all_return() {
- const N: usize = 10;
-
- let data = Arc::new((Mutex::new(0), Condvar::new()));
- let (tx, rx) = channel();
- for _ in 0..N {
- let data = data.clone();
- let tx = tx.clone();
- thread::spawn(move || {
- let &(ref lock, ref cond) = &*data;
- let mut cnt = lock.lock();
- *cnt += 1;
- if *cnt == N {
- tx.send(()).unwrap();
- }
- while *cnt != 0 {
- cond.wait(&mut cnt);
- }
- tx.send(()).unwrap();
- });
- }
- drop(tx);
-
- let &(ref lock, ref cond) = &*data;
- rx.recv().unwrap();
- let mut cnt = lock.lock();
- *cnt = 0;
- assert_eq!(cond.notify_all(), N);
- drop(cnt);
-
- for _ in 0..N {
- rx.recv().unwrap();
- }
-
- assert_eq!(cond.notify_all(), 0);
- }
-
- #[test]
- fn wait_for() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
-
- let mut g = m.lock();
- let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
- assert!(no_timeout.timed_out());
-
- let _t = thread::spawn(move || {
- let _g = m2.lock();
- c2.notify_one();
- });
- // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait.
- let very_long_timeout = if cfg!(feature = "nightly") {
- Duration::from_secs(u64::max_value())
- } else {
- Duration::from_millis(u32::max_value() as u64)
- };
-
- let timeout_res = c.wait_for(&mut g, very_long_timeout);
- assert!(!timeout_res.timed_out());
-
- drop(g);
- }
-
- #[test]
- fn wait_until() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
-
- let mut g = m.lock();
- let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
- assert!(no_timeout.timed_out());
- let _t = thread::spawn(move || {
- let _g = m2.lock();
- c2.notify_one();
- });
- let timeout_res = c.wait_until(
- &mut g,
- Instant::now() + Duration::from_millis(u32::max_value() as u64),
- );
- assert!(!timeout_res.timed_out());
- drop(g);
- }
-
- #[test]
- #[should_panic]
- fn two_mutexes() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let m3 = Arc::new(Mutex::new(()));
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
-
- // Make sure we don't leave the child thread dangling
- struct PanicGuard<'a>(&'a Condvar);
- impl<'a> Drop for PanicGuard<'a> {
- fn drop(&mut self) {
- self.0.notify_one();
- }
- }
-
- let (tx, rx) = channel();
- let g = m.lock();
- let _t = thread::spawn(move || {
- let mut g = m2.lock();
- tx.send(()).unwrap();
- c2.wait(&mut g);
- });
- drop(g);
- rx.recv().unwrap();
- let _g = m.lock();
- let _guard = PanicGuard(&*c);
- c.wait(&mut m3.lock());
- }
-
- #[test]
- fn two_mutexes_disjoint() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let m3 = Arc::new(Mutex::new(()));
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
-
- let mut g = m.lock();
- let _t = thread::spawn(move || {
- let _g = m2.lock();
- c2.notify_one();
- });
- c.wait(&mut g);
- drop(g);
-
- let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
- }
-
- #[test]
- fn test_debug_condvar() {
- let c = Condvar::new();
- assert_eq!(format!("{:?}", c), "Condvar { .. }");
- }
-
- #[test]
- fn test_condvar_requeue() {
- let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
- let c = Arc::new(Condvar::new());
- let c2 = c.clone();
- let t = thread::spawn(move || {
- let mut g = m2.lock();
- c2.wait(&mut g);
- });
-
- let mut g = m.lock();
- while !c.notify_one() {
- // Wait for the thread to get into wait()
- MutexGuard::bump(&mut g);
- // Yield, so the other thread gets a chance to do something.
- // (At least Miri needs this, because it doesn't preempt threads.)
- thread::yield_now();
- }
- // The thread should have been requeued to the mutex, which we wake up now.
- drop(g);
- t.join().unwrap();
- }
-
- #[test]
- fn test_issue_129() {
- let locks = Arc::new((Mutex::new(()), Condvar::new()));
-
- let (tx, rx) = channel();
- for _ in 0..4 {
- let locks = locks.clone();
- let tx = tx.clone();
- thread::spawn(move || {
- let mut guard = locks.0.lock();
- locks.1.wait(&mut guard);
- locks.1.wait_for(&mut guard, Duration::from_millis(1));
- locks.1.notify_one();
- tx.send(()).unwrap();
- });
- }
-
- thread::sleep(Duration::from_millis(100));
- locks.1.notify_one();
-
- for _ in 0..4 {
- assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
- }
- }
-}
-
-/// This module contains an integration test that is heavily inspired from WebKit's own integration
-/// tests for it's own Condvar.
-#[cfg(test)]
-mod webkit_queue_test {
- use crate::{Condvar, Mutex, MutexGuard};
- use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
-
- #[derive(Clone, Copy)]
- enum Timeout {
- Bounded(Duration),
- Forever,
- }
-
- #[derive(Clone, Copy)]
- enum NotifyStyle {
- One,
- All,
- }
-
- struct Queue {
- items: VecDeque<usize>,
- should_continue: bool,
- }
-
- impl Queue {
- fn new() -> Self {
- Self {
- items: VecDeque::new(),
- should_continue: true,
- }
- }
- }
-
- fn wait<T: ?Sized>(
- condition: &Condvar,
- lock: &mut MutexGuard<'_, T>,
- predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
- timeout: &Timeout,
- ) {
- while !predicate(lock) {
- match timeout {
- Timeout::Forever => condition.wait(lock),
- Timeout::Bounded(bound) => {
- condition.wait_for(lock, *bound);
- }
- }
- }
- }
-
- fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
- match style {
- NotifyStyle::One => {
- condition.notify_one();
- }
- NotifyStyle::All => {
- if should_notify {
- condition.notify_all();
- }
- }
- }
- }
-
- fn run_queue_test(
- num_producers: usize,
- num_consumers: usize,
- max_queue_size: usize,
- messages_per_producer: usize,
- notify_style: NotifyStyle,
- timeout: Timeout,
- delay: Duration,
- ) {
- let input_queue = Arc::new(Mutex::new(Queue::new()));
- let empty_condition = Arc::new(Condvar::new());
- let full_condition = Arc::new(Condvar::new());
-
- let output_vec = Arc::new(Mutex::new(vec![]));
-
- let consumers = (0..num_consumers)
- .map(|_| {
- consumer_thread(
- input_queue.clone(),
- empty_condition.clone(),
- full_condition.clone(),
- timeout,
- notify_style,
- output_vec.clone(),
- max_queue_size,
- )
- })
- .collect::<Vec<_>>();
- let producers = (0..num_producers)
- .map(|_| {
- producer_thread(
- messages_per_producer,
- input_queue.clone(),
- empty_condition.clone(),
- full_condition.clone(),
- timeout,
- notify_style,
- max_queue_size,
- )
- })
- .collect::<Vec<_>>();
-
- thread::sleep(delay);
-
- for producer in producers.into_iter() {
- producer.join().expect("Producer thread panicked");
- }
-
- {
- let mut input_queue = input_queue.lock();
- input_queue.should_continue = false;
- }
- empty_condition.notify_all();
-
- for consumer in consumers.into_iter() {
- consumer.join().expect("Consumer thread panicked");
- }
-
- let mut output_vec = output_vec.lock();
- assert_eq!(output_vec.len(), num_producers * messages_per_producer);
- output_vec.sort();
- for msg_idx in 0..messages_per_producer {
- for producer_idx in 0..num_producers {
- assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
- }
- }
- }
-
- fn consumer_thread(
- input_queue: Arc<Mutex<Queue>>,
- empty_condition: Arc<Condvar>,
- full_condition: Arc<Condvar>,
- timeout: Timeout,
- notify_style: NotifyStyle,
- output_queue: Arc<Mutex<Vec<usize>>>,
- max_queue_size: usize,
- ) -> thread::JoinHandle<()> {
- thread::spawn(move || loop {
- let (should_notify, result) = {
- let mut queue = input_queue.lock();
- wait(
- &*empty_condition,
- &mut queue,
- |state| -> bool { !state.items.is_empty() || !state.should_continue },
- &timeout,
- );
- if queue.items.is_empty() && !queue.should_continue {
- return;
- }
- let should_notify = queue.items.len() == max_queue_size;
- let result = queue.items.pop_front();
- std::mem::drop(queue);
- (should_notify, result)
- };
- notify(notify_style, &*full_condition, should_notify);
-
- if let Some(result) = result {
- output_queue.lock().push(result);
- }
- })
- }
-
- fn producer_thread(
- num_messages: usize,
- queue: Arc<Mutex<Queue>>,
- empty_condition: Arc<Condvar>,
- full_condition: Arc<Condvar>,
- timeout: Timeout,
- notify_style: NotifyStyle,
- max_queue_size: usize,
- ) -> thread::JoinHandle<()> {
- thread::spawn(move || {
- for message in 0..num_messages {
- let should_notify = {
- let mut queue = queue.lock();
- wait(
- &*full_condition,
- &mut queue,
- |state| state.items.len() < max_queue_size,
- &timeout,
- );
- let should_notify = queue.items.is_empty();
- queue.items.push_back(message);
- std::mem::drop(queue);
- should_notify
- };
- notify(notify_style, &*empty_condition, should_notify);
- }
- })
- }
-
- macro_rules! run_queue_tests {
- ( $( $name:ident(
- num_producers: $num_producers:expr,
- num_consumers: $num_consumers:expr,
- max_queue_size: $max_queue_size:expr,
- messages_per_producer: $messages_per_producer:expr,
- notification_style: $notification_style:expr,
- timeout: $timeout:expr,
- delay_seconds: $delay_seconds:expr);
- )* ) => {
- $(#[test]
- fn $name() {
- let delay = Duration::from_secs($delay_seconds);
- run_queue_test(
- $num_producers,
- $num_consumers,
- $max_queue_size,
- $messages_per_producer,
- $notification_style,
- $timeout,
- delay,
- );
- })*
- };
- }
-
- run_queue_tests! {
- sanity_check_queue(
- num_producers: 1,
- num_consumers: 1,
- max_queue_size: 1,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Bounded(Duration::from_secs(1)),
- delay_seconds: 0
- );
- sanity_check_queue_timeout(
- num_producers: 1,
- num_consumers: 1,
- max_queue_size: 1,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- new_test_without_timeout_5(
- num_producers: 1,
- num_consumers: 5,
- max_queue_size: 1,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- one_producer_one_consumer_one_slot(
- num_producers: 1,
- num_consumers: 1,
- max_queue_size: 1,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- one_producer_one_consumer_one_slot_timeout(
- num_producers: 1,
- num_consumers: 1,
- max_queue_size: 1,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 1
- );
- one_producer_one_consumer_hundred_slots(
- num_producers: 1,
- num_consumers: 1,
- max_queue_size: 100,
- messages_per_producer: 1_000_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- ten_producers_one_consumer_one_slot(
- num_producers: 10,
- num_consumers: 1,
- max_queue_size: 1,
- messages_per_producer: 10000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- ten_producers_one_consumer_hundred_slots_notify_all(
- num_producers: 10,
- num_consumers: 1,
- max_queue_size: 100,
- messages_per_producer: 10000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- ten_producers_one_consumer_hundred_slots_notify_one(
- num_producers: 10,
- num_consumers: 1,
- max_queue_size: 100,
- messages_per_producer: 10000,
- notification_style: NotifyStyle::One,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- one_producer_ten_consumers_one_slot(
- num_producers: 1,
- num_consumers: 10,
- max_queue_size: 1,
- messages_per_producer: 10000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- one_producer_ten_consumers_hundred_slots_notify_all(
- num_producers: 1,
- num_consumers: 10,
- max_queue_size: 100,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- one_producer_ten_consumers_hundred_slots_notify_one(
- num_producers: 1,
- num_consumers: 10,
- max_queue_size: 100,
- messages_per_producer: 100_000,
- notification_style: NotifyStyle::One,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- ten_producers_ten_consumers_one_slot(
- num_producers: 10,
- num_consumers: 10,
- max_queue_size: 1,
- messages_per_producer: 50000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- ten_producers_ten_consumers_hundred_slots_notify_all(
- num_producers: 10,
- num_consumers: 10,
- max_queue_size: 100,
- messages_per_producer: 50000,
- notification_style: NotifyStyle::All,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- ten_producers_ten_consumers_hundred_slots_notify_one(
- num_producers: 10,
- num_consumers: 10,
- max_queue_size: 100,
- messages_per_producer: 50000,
- notification_style: NotifyStyle::One,
- timeout: Timeout::Forever,
- delay_seconds: 0
- );
- }
-}