summaryrefslogtreecommitdiffstats
path: root/library/std/src/sync/mpmc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:29 +0000
commit631cd5845e8de329d0e227aaa707d7ea228b8f8f (patch)
treea1b87c8f8cad01cf18f7c5f57a08f102771ed303 /library/std/src/sync/mpmc
parentAdding debian version 1.69.0+dfsg1-1. (diff)
downloadrustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.tar.xz
rustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.zip
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sync/mpmc')
-rw-r--r--library/std/src/sync/mpmc/array.rs107
-rw-r--r--library/std/src/sync/mpmc/list.rs12
-rw-r--r--library/std/src/sync/mpmc/mod.rs4
3 files changed, 97 insertions, 26 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
index c6bb09b04..492e21d9b 100644
--- a/library/std/src/sync/mpmc/array.rs
+++ b/library/std/src/sync/mpmc/array.rs
@@ -25,7 +25,8 @@ struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
- /// The message in this slot.
+ /// The message in this slot. Either read out in `read` or dropped through
+ /// `discard_all_messages`.
msg: UnsafeCell<MaybeUninit<T>>,
}
@@ -439,14 +440,13 @@ impl<T> Channel<T> {
Some(self.cap)
}
- /// Disconnects the channel and wakes up all blocked senders and receivers.
+ /// Disconnects senders and wakes up all blocked receivers.
///
/// Returns `true` if this call disconnected the channel.
- pub(crate) fn disconnect(&self) -> bool {
+ pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
if tail & self.mark_bit == 0 {
- self.senders.disconnect();
self.receivers.disconnect();
true
} else {
@@ -454,6 +454,85 @@ impl<T> Channel<T> {
}
}
+ /// Disconnects receivers and wakes up all blocked senders.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ ///
+ /// # Safety
+ /// May only be called once upon dropping the last receiver. The
+ /// destruction of all other receivers must have been observed with acquire
+ /// ordering or stronger.
+ pub(crate) unsafe fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
+ let disconnected = if tail & self.mark_bit == 0 {
+ self.senders.disconnect();
+ true
+ } else {
+ false
+ };
+
+ self.discard_all_messages(tail);
+ disconnected
+ }
+
+ /// Discards all messages.
+ ///
+ /// `tail` should be the current (and therefore last) value of `tail`.
+ ///
+ /// # Panicking
+ /// If a destructor panics, the remaining messages are leaked, matching the
+ /// behaviour of the unbounded channel.
+ ///
+ /// # Safety
+ /// This method must only be called when dropping the last receiver. The
+ /// destruction of all other receivers must have been observed with acquire
+ /// ordering or stronger.
+ unsafe fn discard_all_messages(&self, tail: usize) {
+ debug_assert!(self.is_disconnected());
+
+ // Only receivers modify `head`, so since we are the last one,
+ // this value will not change and will not be observed (since
+ // no new messages can be sent after disconnection).
+ let mut head = self.head.load(Ordering::Relaxed);
+ let tail = tail & !self.mark_bit;
+
+ let backoff = Backoff::new();
+ loop {
+ // Deconstruct the head.
+ let index = head & (self.mark_bit - 1);
+ let lap = head & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the stamp is ahead of the head by 1, we may drop the message.
+ if head + 1 == stamp {
+ head = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ head + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ unsafe {
+ (*slot.msg.get()).assume_init_drop();
+ }
+ // If the tail equals the head, that means the channel is empty.
+ } else if tail == head {
+ return;
+ // Otherwise, a sender is about to write into the slot, so we need
+ // to wait for it to update the stamp.
+ } else {
+ backoff.spin_heavy();
+ }
+ }
+ }
+
/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
@@ -483,23 +562,3 @@ impl<T> Channel<T> {
head.wrapping_add(self.one_lap) == tail & !self.mark_bit
}
}
-
-impl<T> Drop for Channel<T> {
- fn drop(&mut self) {
- // Get the index of the head.
- let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
-
- // Loop over all slots that hold a message and drop them.
- for i in 0..self.len() {
- // Compute the index of the next slot holding a message.
- let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap };
-
- unsafe {
- debug_assert!(index < self.buffer.len());
- let slot = self.buffer.get_unchecked_mut(index);
- let msg = &mut *slot.msg.get();
- msg.as_mut_ptr().drop_in_place();
- }
- }
- }
-}
diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs
index ec6c0726a..406a331a3 100644
--- a/library/std/src/sync/mpmc/list.rs
+++ b/library/std/src/sync/mpmc/list.rs
@@ -549,6 +549,18 @@ impl<T> Channel<T> {
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
+ // If we're going to be dropping messages we need to synchronize with initialization
+ if head >> SHIFT != tail >> SHIFT {
+ // The block can be null here only if a sender is in the process of initializing the
+ // channel while another sender managed to send a message by inserting it into the
+ // semi-initialized channel and advanced the tail.
+ // In that case, just wait until it gets initialized.
+ while block.is_null() {
+ backoff.spin_heavy();
+ block = self.head.block.load(Ordering::Acquire);
+ }
+ }
+
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs
index 7a602cecd..2068dda39 100644
--- a/library/std/src/sync/mpmc/mod.rs
+++ b/library/std/src/sync/mpmc/mod.rs
@@ -227,7 +227,7 @@ impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
- SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
@@ -403,7 +403,7 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
- ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}