diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:29 +0000 |
commit | 631cd5845e8de329d0e227aaa707d7ea228b8f8f (patch) | |
tree | a1b87c8f8cad01cf18f7c5f57a08f102771ed303 /library/std/src/sync/mpmc | |
parent | Adding debian version 1.69.0+dfsg1-1. (diff) | |
download | rustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.tar.xz rustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.zip |
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sync/mpmc')
-rw-r--r-- | library/std/src/sync/mpmc/array.rs | 107 | ||||
-rw-r--r-- | library/std/src/sync/mpmc/list.rs | 12 | ||||
-rw-r--r-- | library/std/src/sync/mpmc/mod.rs | 4 |
3 files changed, 97 insertions, 26 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index c6bb09b04..492e21d9b 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -25,7 +25,8 @@ struct Slot<T> { /// The current stamp. stamp: AtomicUsize, - /// The message in this slot. + /// The message in this slot. Either read out in `read` or dropped through + /// `discard_all_messages`. msg: UnsafeCell<MaybeUninit<T>>, } @@ -439,14 +440,13 @@ impl<T> Channel<T> { Some(self.cap) } - /// Disconnects the channel and wakes up all blocked senders and receivers. + /// Disconnects senders and wakes up all blocked receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { - self.senders.disconnect(); self.receivers.disconnect(); true } else { @@ -454,6 +454,85 @@ impl<T> Channel<T> { } } + /// Disconnects receivers and wakes up all blocked senders. + /// + /// Returns `true` if this call disconnected the channel. + /// + /// # Safety + /// May only be called once upon dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + pub(crate) unsafe fn disconnect_receivers(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + let disconnected = if tail & self.mark_bit == 0 { + self.senders.disconnect(); + true + } else { + false + }; + + self.discard_all_messages(tail); + disconnected + } + + /// Discards all messages. + /// + /// `tail` should be the current (and therefore last) value of `tail`. + /// + /// # Panicking + /// If a destructor panics, the remaining messages are leaked, matching the + /// behaviour of the unbounded channel. + /// + /// # Safety + /// This method must only be called when dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + unsafe fn discard_all_messages(&self, tail: usize) { + debug_assert!(self.is_disconnected()); + + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + let mut head = self.head.load(Ordering::Relaxed); + let tail = tail & !self.mark_bit; + + let backoff = Backoff::new(); + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may drop the message. + if head + 1 == stamp { + head = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + unsafe { + (*slot.msg.get()).assume_init_drop(); + } + // If the tail equals the head, that means the channel is empty. + } else if tail == head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + backoff.spin_heavy(); + } + } + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 @@ -483,23 +562,3 @@ impl<T> Channel<T> { head.wrapping_add(self.one_lap) == tail & !self.mark_bit } } - -impl<T> Drop for Channel<T> { - fn drop(&mut self) { - // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); - - // Loop over all slots that hold a message and drop them. - for i in 0..self.len() { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap }; - - unsafe { - debug_assert!(index < self.buffer.len()); - let slot = self.buffer.get_unchecked_mut(index); - let msg = &mut *slot.msg.get(); - msg.as_mut_ptr().drop_in_place(); - } - } - } -} diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs index ec6c0726a..406a331a3 100644 --- a/library/std/src/sync/mpmc/list.rs +++ b/library/std/src/sync/mpmc/list.rs @@ -549,6 +549,18 @@ impl<T> Channel<T> { let mut head = self.head.index.load(Ordering::Acquire); let mut block = self.head.block.load(Ordering::Acquire); + // If we're going to be dropping messages we need to synchronize with initialization + if head >> SHIFT != tail >> SHIFT { + // The block can be null here only if a sender is in the process of initializing the + // channel while another sender managed to send a message by inserting it into the + // semi-initialized channel and advanced the tail. + // In that case, just wait until it gets initialized. + while block.is_null() { + backoff.spin_heavy(); + block = self.head.block.load(Ordering::Acquire); + } + } + unsafe { // Drop all messages between head and tail and deallocate the heap-allocated blocks. while head >> SHIFT != tail >> SHIFT { diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs index 7a602cecd..2068dda39 100644 --- a/library/std/src/sync/mpmc/mod.rs +++ b/library/std/src/sync/mpmc/mod.rs @@ -227,7 +227,7 @@ impl<T> Drop for Sender<T> { fn drop(&mut self) { unsafe { match &self.flavor { - SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } @@ -403,7 +403,7 @@ impl<T> Drop for Receiver<T> { fn drop(&mut self) { unsafe { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } |