summaryrefslogtreecommitdiffstats
path: root/third_party/rust/parking_lot/src/condvar.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/parking_lot/src/condvar.rs
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/parking_lot/src/condvar.rs')
-rw-r--r--third_party/rust/parking_lot/src/condvar.rs1057
1 files changed, 1057 insertions, 0 deletions
diff --git a/third_party/rust/parking_lot/src/condvar.rs b/third_party/rust/parking_lot/src/condvar.rs
new file mode 100644
index 0000000000..534b8aff8b
--- /dev/null
+++ b/third_party/rust/parking_lot/src/condvar.rs
@@ -0,0 +1,1057 @@
+// Copyright 2016 Amanieu d'Antras
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+
+use crate::mutex::MutexGuard;
+use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
+use crate::{deadlock, util};
+use core::{
+ fmt, ptr,
+ sync::atomic::{AtomicPtr, Ordering},
+};
+use instant::Instant;
+use lock_api::RawMutex as RawMutex_;
+use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
+use std::time::Duration;
+
+/// A type indicating whether a timed wait on a condition variable returned
+/// due to a time out or not.
+#[derive(Debug, PartialEq, Eq, Copy, Clone)]
+pub struct WaitTimeoutResult(bool);
+
+impl WaitTimeoutResult {
+ /// Returns whether the wait was known to have timed out.
+ #[inline]
+ pub fn timed_out(self) -> bool {
+ self.0
+ }
+}
+
+/// A Condition Variable
+///
+/// Condition variables represent the ability to block a thread such that it
+/// consumes no CPU time while waiting for an event to occur. Condition
+/// variables are typically associated with a boolean predicate (a condition)
+/// and a mutex. The predicate is always verified inside of the mutex before
+/// determining that thread must block.
+///
+/// Note that this module places one additional restriction over the system
+/// condition variables: each condvar can be used with only one mutex at a
+/// time. Any attempt to use multiple mutexes on the same condition variable
+/// simultaneously will result in a runtime panic. However it is possible to
+/// switch to a different mutex if there are no threads currently waiting on
+/// the condition variable.
+///
+/// # Differences from the standard library `Condvar`
+///
+/// - No spurious wakeups: A wait will only return a non-timeout result if it
+/// was woken up by `notify_one` or `notify_all`.
+/// - `Condvar::notify_all` will only wake up a single thread, the rest are
+/// requeued to wait for the `Mutex` to be unlocked by the thread that was
+/// woken up.
+/// - Only requires 1 word of space, whereas the standard library boxes the
+/// `Condvar` due to platform limitations.
+/// - Can be statically constructed (requires the `const_fn` nightly feature).
+/// - Does not require any drop glue when dropped.
+/// - Inline fast path for the uncontended case.
+///
+/// # Examples
+///
+/// ```
+/// use parking_lot::{Mutex, Condvar};
+/// use std::sync::Arc;
+/// use std::thread;
+///
+/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
+/// let pair2 = pair.clone();
+///
+/// // Inside of our lock, spawn a new thread, and then wait for it to start
+/// thread::spawn(move|| {
+/// let &(ref lock, ref cvar) = &*pair2;
+/// let mut started = lock.lock();
+/// *started = true;
+/// cvar.notify_one();
+/// });
+///
+/// // wait for the thread to start up
+/// let &(ref lock, ref cvar) = &*pair;
+/// let mut started = lock.lock();
+/// if !*started {
+/// cvar.wait(&mut started);
+/// }
+/// // Note that we used an if instead of a while loop above. This is only
+/// // possible because parking_lot's Condvar will never spuriously wake up.
+/// // This means that wait() will only return after notify_one or notify_all is
+/// // called.
+/// ```
+pub struct Condvar {
+ state: AtomicPtr<RawMutex>,
+}
+
+impl Condvar {
+ /// Creates a new condition variable which is ready to be waited on and
+ /// notified.
+ #[inline]
+ pub const fn new() -> Condvar {
+ Condvar {
+ state: AtomicPtr::new(ptr::null_mut()),
+ }
+ }
+
+ /// Wakes up one blocked thread on this condvar.
+ ///
+ /// Returns whether a thread was woken up.
+ ///
+ /// If there is a blocked thread on this condition variable, then it will
+ /// be woken up from its call to `wait` or `wait_timeout`. Calls to
+ /// `notify_one` are not buffered in any way.
+ ///
+ /// To wake up all threads, see `notify_all()`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use parking_lot::Condvar;
+ ///
+ /// let condvar = Condvar::new();
+ ///
+ /// // do something with condvar, share it with other threads
+ ///
+ /// if !condvar.notify_one() {
+ /// println!("Nobody was listening for this.");
+ /// }
+ /// ```
+ #[inline]
+ pub fn notify_one(&self) -> bool {
+ // Nothing to do if there are no waiting threads
+ let state = self.state.load(Ordering::Relaxed);
+ if state.is_null() {
+ return false;
+ }
+
+ self.notify_one_slow(state)
+ }
+
+ #[cold]
+ fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
+ unsafe {
+ // Unpark one thread and requeue the rest onto the mutex
+ let from = self as *const _ as usize;
+ let to = mutex as usize;
+ let validate = || {
+ // Make sure that our atomic state still points to the same
+ // mutex. If not then it means that all threads on the current
+ // mutex were woken up and a new waiting thread switched to a
+ // different mutex. In that case we can get away with doing
+ // nothing.
+ if self.state.load(Ordering::Relaxed) != mutex {
+ return RequeueOp::Abort;
+ }
+
+ // Unpark one thread if the mutex is unlocked, otherwise just
+ // requeue everything to the mutex. This is safe to do here
+ // since unlocking the mutex when the parked bit is set requires
+ // locking the queue. There is the possibility of a race if the
+ // mutex gets locked after we check, but that doesn't matter in
+ // this case.
+ if (*mutex).mark_parked_if_locked() {
+ RequeueOp::RequeueOne
+ } else {
+ RequeueOp::UnparkOne
+ }
+ };
+ let callback = |_op, result: UnparkResult| {
+ // Clear our state if there are no more waiting threads
+ if !result.have_more_threads {
+ self.state.store(ptr::null_mut(), Ordering::Relaxed);
+ }
+ TOKEN_NORMAL
+ };
+ let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
+
+ res.unparked_threads + res.requeued_threads != 0
+ }
+ }
+
+ /// Wakes up all blocked threads on this condvar.
+ ///
+ /// Returns the number of threads woken up.
+ ///
+ /// This method will ensure that any current waiters on the condition
+ /// variable are awoken. Calls to `notify_all()` are not buffered in any
+ /// way.
+ ///
+ /// To wake up only one thread, see `notify_one()`.
+ #[inline]
+ pub fn notify_all(&self) -> usize {
+ // Nothing to do if there are no waiting threads
+ let state = self.state.load(Ordering::Relaxed);
+ if state.is_null() {
+ return 0;
+ }
+
+ self.notify_all_slow(state)
+ }
+
+ #[cold]
+ fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
+ unsafe {
+ // Unpark one thread and requeue the rest onto the mutex
+ let from = self as *const _ as usize;
+ let to = mutex as usize;
+ let validate = || {
+ // Make sure that our atomic state still points to the same
+ // mutex. If not then it means that all threads on the current
+ // mutex were woken up and a new waiting thread switched to a
+ // different mutex. In that case we can get away with doing
+ // nothing.
+ if self.state.load(Ordering::Relaxed) != mutex {
+ return RequeueOp::Abort;
+ }
+
+ // Clear our state since we are going to unpark or requeue all
+ // threads.
+ self.state.store(ptr::null_mut(), Ordering::Relaxed);
+
+ // Unpark one thread if the mutex is unlocked, otherwise just
+ // requeue everything to the mutex. This is safe to do here
+ // since unlocking the mutex when the parked bit is set requires
+ // locking the queue. There is the possibility of a race if the
+ // mutex gets locked after we check, but that doesn't matter in
+ // this case.
+ if (*mutex).mark_parked_if_locked() {
+ RequeueOp::RequeueAll
+ } else {
+ RequeueOp::UnparkOneRequeueRest
+ }
+ };
+ let callback = |op, result: UnparkResult| {
+ // If we requeued threads to the mutex, mark it as having
+ // parked threads. The RequeueAll case is already handled above.
+ if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
+ (*mutex).mark_parked();
+ }
+ TOKEN_NORMAL
+ };
+ let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
+
+ res.unparked_threads + res.requeued_threads
+ }
+ }
+
+ /// Blocks the current thread until this condition variable receives a
+ /// notification.
+ ///
+ /// This function will atomically unlock the mutex specified (represented by
+ /// `mutex_guard`) and block the current thread. This means that any calls
+ /// to `notify_*()` which happen logically after the mutex is unlocked are
+ /// candidates to wake this thread up. When this function call returns, the
+ /// lock specified will have been re-acquired.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if another thread is waiting on the `Condvar`
+ /// with a different `Mutex` object.
+ #[inline]
+ pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
+ self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
+ }
+
+ /// Waits on this condition variable for a notification, timing out after
+ /// the specified time instant.
+ ///
+ /// The semantics of this function are equivalent to `wait()` except that
+ /// the thread will be blocked roughly until `timeout` is reached. This
+ /// method should not be used for precise timing due to anomalies such as
+ /// preemption or platform differences that may not cause the maximum
+ /// amount of time waited to be precisely `timeout`.
+ ///
+ /// Note that the best effort is made to ensure that the time waited is
+ /// measured with a monotonic clock, and not affected by the changes made to
+ /// the system time.
+ ///
+ /// The returned `WaitTimeoutResult` value indicates if the timeout is
+ /// known to have elapsed.
+ ///
+ /// Like `wait`, the lock specified will be re-acquired when this function
+ /// returns, regardless of whether the timeout elapsed or not.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if another thread is waiting on the `Condvar`
+ /// with a different `Mutex` object.
+ #[inline]
+ pub fn wait_until<T: ?Sized>(
+ &self,
+ mutex_guard: &mut MutexGuard<'_, T>,
+ timeout: Instant,
+ ) -> WaitTimeoutResult {
+ self.wait_until_internal(
+ unsafe { MutexGuard::mutex(mutex_guard).raw() },
+ Some(timeout),
+ )
+ }
+
+ // This is a non-generic function to reduce the monomorphization cost of
+ // using `wait_until`.
+ fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
+ unsafe {
+ let result;
+ let mut bad_mutex = false;
+ let mut requeued = false;
+ {
+ let addr = self as *const _ as usize;
+ let lock_addr = mutex as *const _ as *mut _;
+ let validate = || {
+ // Ensure we don't use two different mutexes with the same
+ // Condvar at the same time. This is done while locked to
+ // avoid races with notify_one
+ let state = self.state.load(Ordering::Relaxed);
+ if state.is_null() {
+ self.state.store(lock_addr, Ordering::Relaxed);
+ } else if state != lock_addr {
+ bad_mutex = true;
+ return false;
+ }
+ true
+ };
+ let before_sleep = || {
+ // Unlock the mutex before sleeping...
+ mutex.unlock();
+ };
+ let timed_out = |k, was_last_thread| {
+ // If we were requeued to a mutex, then we did not time out.
+ // We'll just park ourselves on the mutex again when we try
+ // to lock it later.
+ requeued = k != addr;
+
+ // If we were the last thread on the queue then we need to
+ // clear our state. This is normally done by the
+ // notify_{one,all} functions when not timing out.
+ if !requeued && was_last_thread {
+ self.state.store(ptr::null_mut(), Ordering::Relaxed);
+ }
+ };
+ result = parking_lot_core::park(
+ addr,
+ validate,
+ before_sleep,
+ timed_out,
+ DEFAULT_PARK_TOKEN,
+ timeout,
+ );
+ }
+
+ // Panic if we tried to use multiple mutexes with a Condvar. Note
+ // that at this point the MutexGuard is still locked. It will be
+ // unlocked by the unwinding logic.
+ if bad_mutex {
+ panic!("attempted to use a condition variable with more than one mutex");
+ }
+
+ // ... and re-lock it once we are done sleeping
+ if result == ParkResult::Unparked(TOKEN_HANDOFF) {
+ deadlock::acquire_resource(mutex as *const _ as usize);
+ } else {
+ mutex.lock();
+ }
+
+ WaitTimeoutResult(!(result.is_unparked() || requeued))
+ }
+ }
+
+ /// Waits on this condition variable for a notification, timing out after a
+ /// specified duration.
+ ///
+ /// The semantics of this function are equivalent to `wait()` except that
+ /// the thread will be blocked for roughly no longer than `timeout`. This
+ /// method should not be used for precise timing due to anomalies such as
+ /// preemption or platform differences that may not cause the maximum
+ /// amount of time waited to be precisely `timeout`.
+ ///
+ /// Note that the best effort is made to ensure that the time waited is
+ /// measured with a monotonic clock, and not affected by the changes made to
+ /// the system time.
+ ///
+ /// The returned `WaitTimeoutResult` value indicates if the timeout is
+ /// known to have elapsed.
+ ///
+ /// Like `wait`, the lock specified will be re-acquired when this function
+ /// returns, regardless of whether the timeout elapsed or not.
+ ///
+ /// # Panics
+ ///
+ /// Panics if the given `timeout` is so large that it can't be added to the current time.
+ /// This panic is not possible if the crate is built with the `nightly` feature, then a too
+ /// large `timeout` becomes equivalent to just calling `wait`.
+ #[inline]
+ pub fn wait_for<T: ?Sized>(
+ &self,
+ mutex_guard: &mut MutexGuard<'_, T>,
+ timeout: Duration,
+ ) -> WaitTimeoutResult {
+ let deadline = util::to_deadline(timeout);
+ self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
+ }
+}
+
+impl Default for Condvar {
+ #[inline]
+ fn default() -> Condvar {
+ Condvar::new()
+ }
+}
+
+impl fmt::Debug for Condvar {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Condvar { .. }")
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::{Condvar, Mutex, MutexGuard};
+ use instant::Instant;
+ use std::sync::mpsc::channel;
+ use std::sync::Arc;
+ use std::thread;
+ use std::time::Duration;
+
+ #[test]
+ fn smoke() {
+ let c = Condvar::new();
+ c.notify_one();
+ c.notify_all();
+ }
+
+ #[test]
+ fn notify_one() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+
+ let mut g = m.lock();
+ let _t = thread::spawn(move || {
+ let _g = m2.lock();
+ c2.notify_one();
+ });
+ c.wait(&mut g);
+ }
+
+ #[test]
+ fn notify_all() {
+ const N: usize = 10;
+
+ let data = Arc::new((Mutex::new(0), Condvar::new()));
+ let (tx, rx) = channel();
+ for _ in 0..N {
+ let data = data.clone();
+ let tx = tx.clone();
+ thread::spawn(move || {
+ let &(ref lock, ref cond) = &*data;
+ let mut cnt = lock.lock();
+ *cnt += 1;
+ if *cnt == N {
+ tx.send(()).unwrap();
+ }
+ while *cnt != 0 {
+ cond.wait(&mut cnt);
+ }
+ tx.send(()).unwrap();
+ });
+ }
+ drop(tx);
+
+ let &(ref lock, ref cond) = &*data;
+ rx.recv().unwrap();
+ let mut cnt = lock.lock();
+ *cnt = 0;
+ cond.notify_all();
+ drop(cnt);
+
+ for _ in 0..N {
+ rx.recv().unwrap();
+ }
+ }
+
+ #[test]
+ fn notify_one_return_true() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+
+ let mut g = m.lock();
+ let _t = thread::spawn(move || {
+ let _g = m2.lock();
+ assert!(c2.notify_one());
+ });
+ c.wait(&mut g);
+ }
+
+ #[test]
+ fn notify_one_return_false() {
+ let m = Arc::new(Mutex::new(()));
+ let c = Arc::new(Condvar::new());
+
+ let _t = thread::spawn(move || {
+ let _g = m.lock();
+ assert!(!c.notify_one());
+ });
+ }
+
+ #[test]
+ fn notify_all_return() {
+ const N: usize = 10;
+
+ let data = Arc::new((Mutex::new(0), Condvar::new()));
+ let (tx, rx) = channel();
+ for _ in 0..N {
+ let data = data.clone();
+ let tx = tx.clone();
+ thread::spawn(move || {
+ let &(ref lock, ref cond) = &*data;
+ let mut cnt = lock.lock();
+ *cnt += 1;
+ if *cnt == N {
+ tx.send(()).unwrap();
+ }
+ while *cnt != 0 {
+ cond.wait(&mut cnt);
+ }
+ tx.send(()).unwrap();
+ });
+ }
+ drop(tx);
+
+ let &(ref lock, ref cond) = &*data;
+ rx.recv().unwrap();
+ let mut cnt = lock.lock();
+ *cnt = 0;
+ assert_eq!(cond.notify_all(), N);
+ drop(cnt);
+
+ for _ in 0..N {
+ rx.recv().unwrap();
+ }
+
+ assert_eq!(cond.notify_all(), 0);
+ }
+
+ #[test]
+ fn wait_for() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+
+ let mut g = m.lock();
+ let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
+ assert!(no_timeout.timed_out());
+
+ let _t = thread::spawn(move || {
+ let _g = m2.lock();
+ c2.notify_one();
+ });
+ // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait.
+ let very_long_timeout = if cfg!(feature = "nightly") {
+ Duration::from_secs(u64::max_value())
+ } else {
+ Duration::from_millis(u32::max_value() as u64)
+ };
+
+ let timeout_res = c.wait_for(&mut g, very_long_timeout);
+ assert!(!timeout_res.timed_out());
+
+ drop(g);
+ }
+
+ #[test]
+ fn wait_until() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+
+ let mut g = m.lock();
+ let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
+ assert!(no_timeout.timed_out());
+ let _t = thread::spawn(move || {
+ let _g = m2.lock();
+ c2.notify_one();
+ });
+ let timeout_res = c.wait_until(
+ &mut g,
+ Instant::now() + Duration::from_millis(u32::max_value() as u64),
+ );
+ assert!(!timeout_res.timed_out());
+ drop(g);
+ }
+
+ #[test]
+ #[should_panic]
+ fn two_mutexes() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let m3 = Arc::new(Mutex::new(()));
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+
+ // Make sure we don't leave the child thread dangling
+ struct PanicGuard<'a>(&'a Condvar);
+ impl<'a> Drop for PanicGuard<'a> {
+ fn drop(&mut self) {
+ self.0.notify_one();
+ }
+ }
+
+ let (tx, rx) = channel();
+ let g = m.lock();
+ let _t = thread::spawn(move || {
+ let mut g = m2.lock();
+ tx.send(()).unwrap();
+ c2.wait(&mut g);
+ });
+ drop(g);
+ rx.recv().unwrap();
+ let _g = m.lock();
+ let _guard = PanicGuard(&*c);
+ c.wait(&mut m3.lock());
+ }
+
+ #[test]
+ fn two_mutexes_disjoint() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let m3 = Arc::new(Mutex::new(()));
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+
+ let mut g = m.lock();
+ let _t = thread::spawn(move || {
+ let _g = m2.lock();
+ c2.notify_one();
+ });
+ c.wait(&mut g);
+ drop(g);
+
+ let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
+ }
+
+ #[test]
+ fn test_debug_condvar() {
+ let c = Condvar::new();
+ assert_eq!(format!("{:?}", c), "Condvar { .. }");
+ }
+
+ #[test]
+ fn test_condvar_requeue() {
+ let m = Arc::new(Mutex::new(()));
+ let m2 = m.clone();
+ let c = Arc::new(Condvar::new());
+ let c2 = c.clone();
+ let t = thread::spawn(move || {
+ let mut g = m2.lock();
+ c2.wait(&mut g);
+ });
+
+ let mut g = m.lock();
+ while !c.notify_one() {
+ // Wait for the thread to get into wait()
+ MutexGuard::bump(&mut g);
+ // Yield, so the other thread gets a chance to do something.
+ // (At least Miri needs this, because it doesn't preempt threads.)
+ thread::yield_now();
+ }
+ // The thread should have been requeued to the mutex, which we wake up now.
+ drop(g);
+ t.join().unwrap();
+ }
+
+ #[test]
+ fn test_issue_129() {
+ let locks = Arc::new((Mutex::new(()), Condvar::new()));
+
+ let (tx, rx) = channel();
+ for _ in 0..4 {
+ let locks = locks.clone();
+ let tx = tx.clone();
+ thread::spawn(move || {
+ let mut guard = locks.0.lock();
+ locks.1.wait(&mut guard);
+ locks.1.wait_for(&mut guard, Duration::from_millis(1));
+ locks.1.notify_one();
+ tx.send(()).unwrap();
+ });
+ }
+
+ thread::sleep(Duration::from_millis(100));
+ locks.1.notify_one();
+
+ for _ in 0..4 {
+ assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
+ }
+ }
+}
+
+/// This module contains an integration test that is heavily inspired from WebKit's own integration
+/// tests for it's own Condvar.
+#[cfg(test)]
+mod webkit_queue_test {
+ use crate::{Condvar, Mutex, MutexGuard};
+ use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
+
+ #[derive(Clone, Copy)]
+ enum Timeout {
+ Bounded(Duration),
+ Forever,
+ }
+
+ #[derive(Clone, Copy)]
+ enum NotifyStyle {
+ One,
+ All,
+ }
+
+ struct Queue {
+ items: VecDeque<usize>,
+ should_continue: bool,
+ }
+
+ impl Queue {
+ fn new() -> Self {
+ Self {
+ items: VecDeque::new(),
+ should_continue: true,
+ }
+ }
+ }
+
+ fn wait<T: ?Sized>(
+ condition: &Condvar,
+ lock: &mut MutexGuard<'_, T>,
+ predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
+ timeout: &Timeout,
+ ) {
+ while !predicate(lock) {
+ match timeout {
+ Timeout::Forever => condition.wait(lock),
+ Timeout::Bounded(bound) => {
+ condition.wait_for(lock, *bound);
+ }
+ }
+ }
+ }
+
+ fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
+ match style {
+ NotifyStyle::One => {
+ condition.notify_one();
+ }
+ NotifyStyle::All => {
+ if should_notify {
+ condition.notify_all();
+ }
+ }
+ }
+ }
+
+ fn run_queue_test(
+ num_producers: usize,
+ num_consumers: usize,
+ max_queue_size: usize,
+ messages_per_producer: usize,
+ notify_style: NotifyStyle,
+ timeout: Timeout,
+ delay: Duration,
+ ) {
+ let input_queue = Arc::new(Mutex::new(Queue::new()));
+ let empty_condition = Arc::new(Condvar::new());
+ let full_condition = Arc::new(Condvar::new());
+
+ let output_vec = Arc::new(Mutex::new(vec![]));
+
+ let consumers = (0..num_consumers)
+ .map(|_| {
+ consumer_thread(
+ input_queue.clone(),
+ empty_condition.clone(),
+ full_condition.clone(),
+ timeout,
+ notify_style,
+ output_vec.clone(),
+ max_queue_size,
+ )
+ })
+ .collect::<Vec<_>>();
+ let producers = (0..num_producers)
+ .map(|_| {
+ producer_thread(
+ messages_per_producer,
+ input_queue.clone(),
+ empty_condition.clone(),
+ full_condition.clone(),
+ timeout,
+ notify_style,
+ max_queue_size,
+ )
+ })
+ .collect::<Vec<_>>();
+
+ thread::sleep(delay);
+
+ for producer in producers.into_iter() {
+ producer.join().expect("Producer thread panicked");
+ }
+
+ {
+ let mut input_queue = input_queue.lock();
+ input_queue.should_continue = false;
+ }
+ empty_condition.notify_all();
+
+ for consumer in consumers.into_iter() {
+ consumer.join().expect("Consumer thread panicked");
+ }
+
+ let mut output_vec = output_vec.lock();
+ assert_eq!(output_vec.len(), num_producers * messages_per_producer);
+ output_vec.sort();
+ for msg_idx in 0..messages_per_producer {
+ for producer_idx in 0..num_producers {
+ assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
+ }
+ }
+ }
+
+ fn consumer_thread(
+ input_queue: Arc<Mutex<Queue>>,
+ empty_condition: Arc<Condvar>,
+ full_condition: Arc<Condvar>,
+ timeout: Timeout,
+ notify_style: NotifyStyle,
+ output_queue: Arc<Mutex<Vec<usize>>>,
+ max_queue_size: usize,
+ ) -> thread::JoinHandle<()> {
+ thread::spawn(move || loop {
+ let (should_notify, result) = {
+ let mut queue = input_queue.lock();
+ wait(
+ &*empty_condition,
+ &mut queue,
+ |state| -> bool { !state.items.is_empty() || !state.should_continue },
+ &timeout,
+ );
+ if queue.items.is_empty() && !queue.should_continue {
+ return;
+ }
+ let should_notify = queue.items.len() == max_queue_size;
+ let result = queue.items.pop_front();
+ std::mem::drop(queue);
+ (should_notify, result)
+ };
+ notify(notify_style, &*full_condition, should_notify);
+
+ if let Some(result) = result {
+ output_queue.lock().push(result);
+ }
+ })
+ }
+
+ fn producer_thread(
+ num_messages: usize,
+ queue: Arc<Mutex<Queue>>,
+ empty_condition: Arc<Condvar>,
+ full_condition: Arc<Condvar>,
+ timeout: Timeout,
+ notify_style: NotifyStyle,
+ max_queue_size: usize,
+ ) -> thread::JoinHandle<()> {
+ thread::spawn(move || {
+ for message in 0..num_messages {
+ let should_notify = {
+ let mut queue = queue.lock();
+ wait(
+ &*full_condition,
+ &mut queue,
+ |state| state.items.len() < max_queue_size,
+ &timeout,
+ );
+ let should_notify = queue.items.is_empty();
+ queue.items.push_back(message);
+ std::mem::drop(queue);
+ should_notify
+ };
+ notify(notify_style, &*empty_condition, should_notify);
+ }
+ })
+ }
+
+ macro_rules! run_queue_tests {
+ ( $( $name:ident(
+ num_producers: $num_producers:expr,
+ num_consumers: $num_consumers:expr,
+ max_queue_size: $max_queue_size:expr,
+ messages_per_producer: $messages_per_producer:expr,
+ notification_style: $notification_style:expr,
+ timeout: $timeout:expr,
+ delay_seconds: $delay_seconds:expr);
+ )* ) => {
+ $(#[test]
+ fn $name() {
+ let delay = Duration::from_secs($delay_seconds);
+ run_queue_test(
+ $num_producers,
+ $num_consumers,
+ $max_queue_size,
+ $messages_per_producer,
+ $notification_style,
+ $timeout,
+ delay,
+ );
+ })*
+ };
+ }
+
+ run_queue_tests! {
+ sanity_check_queue(
+ num_producers: 1,
+ num_consumers: 1,
+ max_queue_size: 1,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Bounded(Duration::from_secs(1)),
+ delay_seconds: 0
+ );
+ sanity_check_queue_timeout(
+ num_producers: 1,
+ num_consumers: 1,
+ max_queue_size: 1,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ new_test_without_timeout_5(
+ num_producers: 1,
+ num_consumers: 5,
+ max_queue_size: 1,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ one_producer_one_consumer_one_slot(
+ num_producers: 1,
+ num_consumers: 1,
+ max_queue_size: 1,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ one_producer_one_consumer_one_slot_timeout(
+ num_producers: 1,
+ num_consumers: 1,
+ max_queue_size: 1,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 1
+ );
+ one_producer_one_consumer_hundred_slots(
+ num_producers: 1,
+ num_consumers: 1,
+ max_queue_size: 100,
+ messages_per_producer: 1_000_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ ten_producers_one_consumer_one_slot(
+ num_producers: 10,
+ num_consumers: 1,
+ max_queue_size: 1,
+ messages_per_producer: 10000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ ten_producers_one_consumer_hundred_slots_notify_all(
+ num_producers: 10,
+ num_consumers: 1,
+ max_queue_size: 100,
+ messages_per_producer: 10000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ ten_producers_one_consumer_hundred_slots_notify_one(
+ num_producers: 10,
+ num_consumers: 1,
+ max_queue_size: 100,
+ messages_per_producer: 10000,
+ notification_style: NotifyStyle::One,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ one_producer_ten_consumers_one_slot(
+ num_producers: 1,
+ num_consumers: 10,
+ max_queue_size: 1,
+ messages_per_producer: 10000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ one_producer_ten_consumers_hundred_slots_notify_all(
+ num_producers: 1,
+ num_consumers: 10,
+ max_queue_size: 100,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ one_producer_ten_consumers_hundred_slots_notify_one(
+ num_producers: 1,
+ num_consumers: 10,
+ max_queue_size: 100,
+ messages_per_producer: 100_000,
+ notification_style: NotifyStyle::One,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ ten_producers_ten_consumers_one_slot(
+ num_producers: 10,
+ num_consumers: 10,
+ max_queue_size: 1,
+ messages_per_producer: 50000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ ten_producers_ten_consumers_hundred_slots_notify_all(
+ num_producers: 10,
+ num_consumers: 10,
+ max_queue_size: 100,
+ messages_per_producer: 50000,
+ notification_style: NotifyStyle::All,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ ten_producers_ten_consumers_hundred_slots_notify_one(
+ num_producers: 10,
+ num_consumers: 10,
+ max_queue_size: 100,
+ messages_per_producer: 50000,
+ notification_style: NotifyStyle::One,
+ timeout: Timeout::Forever,
+ delay_seconds: 0
+ );
+ }
+}