summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-channel/src/flavors
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/crossbeam-channel/src/flavors')
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/array.rs635
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/at.rs202
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/list.rs745
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/mod.rs17
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/never.rs110
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/tick.rs168
-rw-r--r--third_party/rust/crossbeam-channel/src/flavors/zero.rs495
7 files changed, 2372 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/src/flavors/array.rs b/third_party/rust/crossbeam-channel/src/flavors/array.rs
new file mode 100644
index 0000000000..63b82eb859
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/array.rs
@@ -0,0 +1,635 @@
+//! Bounded channel based on a preallocated array.
+//!
+//! This flavor has a fixed, positive capacity.
+//!
+//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
+//!
+//! Source:
+//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
+//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
+
+use std::cell::UnsafeCell;
+use std::mem::MaybeUninit;
+use std::ptr;
+use std::sync::atomic::{self, AtomicUsize, Ordering};
+use std::time::Instant;
+
+use crossbeam_utils::{Backoff, CachePadded};
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
+use crate::select::{Operation, SelectHandle, Selected, Token};
+use crate::waker::SyncWaker;
+
+/// A slot in a channel.
+struct Slot<T> {
+ /// The current stamp.
+ stamp: AtomicUsize,
+
+ /// The message in this slot.
+ msg: UnsafeCell<MaybeUninit<T>>,
+}
+
+/// The token type for the array flavor.
+#[derive(Debug)]
+pub(crate) struct ArrayToken {
+ /// Slot to read from or write to.
+ slot: *const u8,
+
+ /// Stamp to store into the slot after reading or writing.
+ stamp: usize,
+}
+
+impl Default for ArrayToken {
+ #[inline]
+ fn default() -> Self {
+ ArrayToken {
+ slot: ptr::null(),
+ stamp: 0,
+ }
+ }
+}
+
+/// Bounded channel based on a preallocated array.
+pub(crate) struct Channel<T> {
+ /// The head of the channel.
+ ///
+ /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+ /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+ /// represent the lap. The mark bit in the head is always zero.
+ ///
+ /// Messages are popped from the head of the channel.
+ head: CachePadded<AtomicUsize>,
+
+ /// The tail of the channel.
+ ///
+ /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+ /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+ /// represent the lap. The mark bit indicates that the channel is disconnected.
+ ///
+ /// Messages are pushed into the tail of the channel.
+ tail: CachePadded<AtomicUsize>,
+
+ /// The buffer holding slots.
+ buffer: Box<[Slot<T>]>,
+
+ /// The channel capacity.
+ cap: usize,
+
+ /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
+ one_lap: usize,
+
+ /// If this bit is set in the tail, that means the channel is disconnected.
+ mark_bit: usize,
+
+ /// Senders waiting while the channel is full.
+ senders: SyncWaker,
+
+ /// Receivers waiting while the channel is empty and not disconnected.
+ receivers: SyncWaker,
+}
+
+impl<T> Channel<T> {
+ /// Creates a bounded channel of capacity `cap`.
+ pub(crate) fn with_capacity(cap: usize) -> Self {
+ assert!(cap > 0, "capacity must be positive");
+
+ // Compute constants `mark_bit` and `one_lap`.
+ let mark_bit = (cap + 1).next_power_of_two();
+ let one_lap = mark_bit * 2;
+
+ // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+ let head = 0;
+ // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+ let tail = 0;
+
+ // Allocate a buffer of `cap` slots initialized
+ // with stamps.
+ let buffer: Box<[Slot<T>]> = (0..cap)
+ .map(|i| {
+ // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
+ Slot {
+ stamp: AtomicUsize::new(i),
+ msg: UnsafeCell::new(MaybeUninit::uninit()),
+ }
+ })
+ .collect();
+
+ Channel {
+ buffer,
+ cap,
+ one_lap,
+ mark_bit,
+ head: CachePadded::new(AtomicUsize::new(head)),
+ tail: CachePadded::new(AtomicUsize::new(tail)),
+ senders: SyncWaker::new(),
+ receivers: SyncWaker::new(),
+ }
+ }
+
+ /// Returns a receiver handle to the channel.
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
+ Receiver(self)
+ }
+
+ /// Returns a sender handle to the channel.
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
+ Sender(self)
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.load(Ordering::Relaxed);
+
+ loop {
+ // Check if the channel is disconnected.
+ if tail & self.mark_bit != 0 {
+ token.array.slot = ptr::null();
+ token.array.stamp = 0;
+ return true;
+ }
+
+ // Deconstruct the tail.
+ let index = tail & (self.mark_bit - 1);
+ let lap = tail & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the tail and the stamp match, we may attempt to push.
+ if tail == stamp {
+ let new_tail = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ tail + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ // Try moving the tail.
+ match self.tail.compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => {
+ // Prepare the token for the follow-up call to `write`.
+ token.array.slot = slot as *const Slot<T> as *const u8;
+ token.array.stamp = tail + 1;
+ return true;
+ }
+ Err(t) => {
+ tail = t;
+ backoff.spin();
+ }
+ }
+ } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
+ atomic::fence(Ordering::SeqCst);
+ let head = self.head.load(Ordering::Relaxed);
+
+ // If the head lags one lap behind the tail as well...
+ if head.wrapping_add(self.one_lap) == tail {
+ // ...then the channel is full.
+ return false;
+ }
+
+ backoff.spin();
+ tail = self.tail.load(Ordering::Relaxed);
+ } else {
+ // Snooze because we need to wait for the stamp to get updated.
+ backoff.snooze();
+ tail = self.tail.load(Ordering::Relaxed);
+ }
+ }
+ }
+
+ /// Writes a message into the channel.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.array.slot.is_null() {
+ return Err(msg);
+ }
+
+ let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
+
+ // Write the message into the slot and update the stamp.
+ slot.msg.get().write(MaybeUninit::new(msg));
+ slot.stamp.store(token.array.stamp, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
+ }
+
+ /// Attempts to reserve a slot for receiving a message.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut head = self.head.load(Ordering::Relaxed);
+
+ loop {
+ // Deconstruct the head.
+ let index = head & (self.mark_bit - 1);
+ let lap = head & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the the stamp is ahead of the head by 1, we may attempt to pop.
+ if head + 1 == stamp {
+ let new = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ head + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ // Try moving the head.
+ match self.head.compare_exchange_weak(
+ head,
+ new,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => {
+ // Prepare the token for the follow-up call to `read`.
+ token.array.slot = slot as *const Slot<T> as *const u8;
+ token.array.stamp = head.wrapping_add(self.one_lap);
+ return true;
+ }
+ Err(h) => {
+ head = h;
+ backoff.spin();
+ }
+ }
+ } else if stamp == head {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if (tail & !self.mark_bit) == head {
+ // If the channel is disconnected...
+ if tail & self.mark_bit != 0 {
+ // ...then receive an error.
+ token.array.slot = ptr::null();
+ token.array.stamp = 0;
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
+ }
+ }
+
+ backoff.spin();
+ head = self.head.load(Ordering::Relaxed);
+ } else {
+ // Snooze because we need to wait for the stamp to get updated.
+ backoff.snooze();
+ head = self.head.load(Ordering::Relaxed);
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.array.slot.is_null() {
+ // The channel is disconnected.
+ return Err(());
+ }
+
+ let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
+
+ // Read the message from the slot and update the stamp.
+ let msg = slot.msg.get().read().assume_init();
+ slot.stamp.store(token.array.stamp, Ordering::Release);
+
+ // Wake a sleeping sender.
+ self.senders.notify();
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ if self.start_send(token) {
+ unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ loop {
+ // Try sending a message several times.
+ let backoff = Backoff::new();
+ loop {
+ if self.start_send(token) {
+ let res = unsafe { self.write(token, msg) };
+ return res.map_err(SendTimeoutError::Disconnected);
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.snooze();
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(SendTimeoutError::Timeout(msg));
+ }
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ self.senders.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_full() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.senders.unregister(oper).unwrap();
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ loop {
+ // Try receiving a message several times.
+ let backoff = Backoff::new();
+ loop {
+ if self.start_recv(token) {
+ let res = unsafe { self.read(token) };
+ return res.map_err(|_| RecvTimeoutError::Disconnected);
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.snooze();
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ self.receivers.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_empty() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.receivers.unregister(oper).unwrap();
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ loop {
+ // Load the tail, then load the head.
+ let tail = self.tail.load(Ordering::SeqCst);
+ let head = self.head.load(Ordering::SeqCst);
+
+ // If the tail didn't change, we've got consistent values to work with.
+ if self.tail.load(Ordering::SeqCst) == tail {
+ let hix = head & (self.mark_bit - 1);
+ let tix = tail & (self.mark_bit - 1);
+
+ return if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if (tail & !self.mark_bit) == head {
+ 0
+ } else {
+ self.cap
+ };
+ }
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(self.cap)
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect(&self) -> bool {
+ let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
+
+ if tail & self.mark_bit == 0 {
+ self.senders.disconnect();
+ self.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns `true` if the channel is disconnected.
+ pub(crate) fn is_disconnected(&self) -> bool {
+ self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ let head = self.head.load(Ordering::SeqCst);
+ let tail = self.tail.load(Ordering::SeqCst);
+
+ // Is the tail equal to the head?
+ //
+ // Note: If the head changes just before we load the tail, that means there was a moment
+ // when the channel was not empty, so it is safe to just return `false`.
+ (tail & !self.mark_bit) == head
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ let tail = self.tail.load(Ordering::SeqCst);
+ let head = self.head.load(Ordering::SeqCst);
+
+ // Is the head lagging one lap behind tail?
+ //
+ // Note: If the tail changes just before we load the head, that means there was a moment
+ // when the channel was not full, so it is safe to just return `false`.
+ head.wrapping_add(self.one_lap) == tail & !self.mark_bit
+ }
+}
+
+impl<T> Drop for Channel<T> {
+ fn drop(&mut self) {
+ // Get the index of the head.
+ let head = *self.head.get_mut();
+ let tail = *self.tail.get_mut();
+
+ let hix = head & (self.mark_bit - 1);
+ let tix = tail & (self.mark_bit - 1);
+
+ let len = if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if (tail & !self.mark_bit) == head {
+ 0
+ } else {
+ self.cap
+ };
+
+ // Loop over all slots that hold a message and drop them.
+ for i in 0..len {
+ // Compute the index of the next slot holding a message.
+ let index = if hix + i < self.cap {
+ hix + i
+ } else {
+ hix + i - self.cap
+ };
+
+ unsafe {
+ debug_assert!(index < self.buffer.len());
+ let slot = self.buffer.get_unchecked_mut(index);
+ let msg = &mut *slot.msg.get();
+ msg.as_mut_ptr().drop_in_place();
+ }
+ }
+ }
+}
+
+/// Receiver handle to a channel.
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
+
+/// Sender handle to a channel.
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
+
+impl<T> SelectHandle for Receiver<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_recv(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.receivers.register(oper, cx);
+ self.is_ready()
+ }
+
+ fn unregister(&self, oper: Operation) {
+ self.0.receivers.unregister(oper);
+ }
+
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ fn is_ready(&self) -> bool {
+ !self.0.is_empty() || self.0.is_disconnected()
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.receivers.watch(oper, cx);
+ self.is_ready()
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ self.0.receivers.unwatch(oper);
+ }
+}
+
+impl<T> SelectHandle for Sender<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_send(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.senders.register(oper, cx);
+ self.is_ready()
+ }
+
+ fn unregister(&self, oper: Operation) {
+ self.0.senders.unregister(oper);
+ }
+
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ fn is_ready(&self) -> bool {
+ !self.0.is_full() || self.0.is_disconnected()
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.senders.watch(oper, cx);
+ self.is_ready()
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ self.0.senders.unwatch(oper);
+ }
+}
diff --git a/third_party/rust/crossbeam-channel/src/flavors/at.rs b/third_party/rust/crossbeam-channel/src/flavors/at.rs
new file mode 100644
index 0000000000..ca5ee60f52
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/at.rs
@@ -0,0 +1,202 @@
+//! Channel that delivers a message at a certain moment in time.
+//!
+//! Messages cannot be sent into this kind of channel; they are materialized on demand.
+
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread;
+use std::time::{Duration, Instant};
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, TryRecvError};
+use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
+
+/// Result of a receive operation.
+pub(crate) type AtToken = Option<Instant>;
+
+/// Channel that delivers a message at a certain moment in time
+pub(crate) struct Channel {
+ /// The instant at which the message will be delivered.
+ delivery_time: Instant,
+
+ /// `true` if the message has been received.
+ received: AtomicBool,
+}
+
+impl Channel {
+ /// Creates a channel that delivers a message at a certain instant in time.
+ #[inline]
+ pub(crate) fn new_deadline(when: Instant) -> Self {
+ Channel {
+ delivery_time: when,
+ received: AtomicBool::new(false),
+ }
+ }
+ /// Creates a channel that delivers a message after a certain duration of time.
+ #[inline]
+ pub(crate) fn new_timeout(dur: Duration) -> Self {
+ Self::new_deadline(utils::convert_timeout_to_deadline(dur))
+ }
+
+ /// Attempts to receive a message without blocking.
+ #[inline]
+ pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ // The message has already been received.
+ return Err(TryRecvError::Empty);
+ }
+
+ if Instant::now() < self.delivery_time {
+ // The message was not delivered yet.
+ return Err(TryRecvError::Empty);
+ }
+
+ // Try receiving the message if it is still available.
+ if !self.received.swap(true, Ordering::SeqCst) {
+ // Success! Return delivery time as the message.
+ Ok(self.delivery_time)
+ } else {
+ // The message was already received.
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ #[inline]
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ // The message has already been received.
+ utils::sleep_until(deadline);
+ return Err(RecvTimeoutError::Timeout);
+ }
+
+ // Wait until the message is received or the deadline is reached.
+ loop {
+ let now = Instant::now();
+
+ let deadline = match deadline {
+ // Check if we can receive the next message.
+ _ if now >= self.delivery_time => break,
+ // Check if the timeout deadline has been reached.
+ Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
+
+ // Sleep until one of the above happens
+ Some(d) if d < self.delivery_time => d,
+ _ => self.delivery_time,
+ };
+
+ thread::sleep(deadline - now);
+ }
+
+ // Try receiving the message if it is still available.
+ if !self.received.swap(true, Ordering::SeqCst) {
+ // Success! Return the message, which is the instant at which it was delivered.
+ Ok(self.delivery_time)
+ } else {
+ // The message was already received. Block forever.
+ utils::sleep_until(None);
+ unreachable!()
+ }
+ }
+
+ /// Reads a message from the channel.
+ #[inline]
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
+ token.at.ok_or(())
+ }
+
+ /// Returns `true` if the channel is empty.
+ #[inline]
+ pub(crate) fn is_empty(&self) -> bool {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ return true;
+ }
+
+ // If the delivery time hasn't been reached yet, the channel is empty.
+ if Instant::now() < self.delivery_time {
+ return true;
+ }
+
+ // The delivery time has been reached. The channel is empty only if the message has already
+ // been received.
+ self.received.load(Ordering::SeqCst)
+ }
+
+ /// Returns `true` if the channel is full.
+ #[inline]
+ pub(crate) fn is_full(&self) -> bool {
+ !self.is_empty()
+ }
+
+ /// Returns the number of messages in the channel.
+ #[inline]
+ pub(crate) fn len(&self) -> usize {
+ if self.is_empty() {
+ 0
+ } else {
+ 1
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ #[inline]
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(1)
+ }
+}
+
+impl SelectHandle for Channel {
+ #[inline]
+ fn try_select(&self, token: &mut Token) -> bool {
+ match self.try_recv() {
+ Ok(msg) => {
+ token.at = Some(msg);
+ true
+ }
+ Err(TryRecvError::Disconnected) => {
+ token.at = None;
+ true
+ }
+ Err(TryRecvError::Empty) => false,
+ }
+ }
+
+ #[inline]
+ fn deadline(&self) -> Option<Instant> {
+ // We use relaxed ordering because this is just an optional optimistic check.
+ if self.received.load(Ordering::Relaxed) {
+ None
+ } else {
+ Some(self.delivery_time)
+ }
+ }
+
+ #[inline]
+ fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unregister(&self, _oper: Operation) {}
+
+ #[inline]
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ #[inline]
+ fn is_ready(&self) -> bool {
+ !self.is_empty()
+ }
+
+ #[inline]
+ fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unwatch(&self, _oper: Operation) {}
+}
diff --git a/third_party/rust/crossbeam-channel/src/flavors/list.rs b/third_party/rust/crossbeam-channel/src/flavors/list.rs
new file mode 100644
index 0000000000..6090b8d471
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/list.rs
@@ -0,0 +1,745 @@
+//! Unbounded channel implemented as a linked list.
+
+use std::cell::UnsafeCell;
+use std::marker::PhantomData;
+use std::mem::MaybeUninit;
+use std::ptr;
+use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
+use std::time::Instant;
+
+use crossbeam_utils::{Backoff, CachePadded};
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
+use crate::select::{Operation, SelectHandle, Selected, Token};
+use crate::waker::SyncWaker;
+
+// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
+// following changes by @kleimkuhler:
+//
+// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
+// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
+
+// Bits indicating the state of a slot:
+// * If a message has been written into the slot, `WRITE` is set.
+// * If a message has been read from the slot, `READ` is set.
+// * If the block is being destroyed, `DESTROY` is set.
+const WRITE: usize = 1;
+const READ: usize = 2;
+const DESTROY: usize = 4;
+
+// Each block covers one "lap" of indices.
+const LAP: usize = 32;
+// The maximum number of messages a block can hold.
+const BLOCK_CAP: usize = LAP - 1;
+// How many lower bits are reserved for metadata.
+const SHIFT: usize = 1;
+// Has two different purposes:
+// * If set in head, indicates that the block is not the last one.
+// * If set in tail, indicates that the channel is disconnected.
+const MARK_BIT: usize = 1;
+
+/// A slot in a block.
+struct Slot<T> {
+ /// The message.
+ msg: UnsafeCell<MaybeUninit<T>>,
+
+ /// The state of the slot.
+ state: AtomicUsize,
+}
+
+impl<T> Slot<T> {
+ const UNINIT: Self = Self {
+ msg: UnsafeCell::new(MaybeUninit::uninit()),
+ state: AtomicUsize::new(0),
+ };
+
+ /// Waits until a message is written into the slot.
+ fn wait_write(&self) {
+ let backoff = Backoff::new();
+ while self.state.load(Ordering::Acquire) & WRITE == 0 {
+ backoff.snooze();
+ }
+ }
+}
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+struct Block<T> {
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Slots for messages.
+ slots: [Slot<T>; BLOCK_CAP],
+}
+
+impl<T> Block<T> {
+ /// Creates an empty block.
+ fn new() -> Block<T> {
+ Self {
+ next: AtomicPtr::new(ptr::null_mut()),
+ slots: [Slot::UNINIT; BLOCK_CAP],
+ }
+ }
+
+ /// Waits until the next pointer is set.
+ fn wait_next(&self) -> *mut Block<T> {
+ let backoff = Backoff::new();
+ loop {
+ let next = self.next.load(Ordering::Acquire);
+ if !next.is_null() {
+ return next;
+ }
+ backoff.snooze();
+ }
+ }
+
+ /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
+ unsafe fn destroy(this: *mut Block<T>, start: usize) {
+ // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
+ // begun destruction of the block.
+ for i in start..BLOCK_CAP - 1 {
+ let slot = (*this).slots.get_unchecked(i);
+
+ // Mark the `DESTROY` bit if a thread is still using the slot.
+ if slot.state.load(Ordering::Acquire) & READ == 0
+ && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
+ {
+ // If a thread is still using the slot, it will continue destruction of the block.
+ return;
+ }
+ }
+
+ // No thread is using the block, now it is safe to destroy it.
+ drop(Box::from_raw(this));
+ }
+}
+
+/// A position in a channel.
+#[derive(Debug)]
+struct Position<T> {
+ /// The index in the channel.
+ index: AtomicUsize,
+
+ /// The block in the linked list.
+ block: AtomicPtr<Block<T>>,
+}
+
+/// The token type for the list flavor.
+#[derive(Debug)]
+pub(crate) struct ListToken {
+ /// The block of slots.
+ block: *const u8,
+
+ /// The offset into the block.
+ offset: usize,
+}
+
+impl Default for ListToken {
+ #[inline]
+ fn default() -> Self {
+ ListToken {
+ block: ptr::null(),
+ offset: 0,
+ }
+ }
+}
+
+/// Unbounded channel implemented as a linked list.
+///
+/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
+/// represented as numbers of type `usize` and wrap on overflow.
+///
+/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
+/// improve cache efficiency.
+pub(crate) struct Channel<T> {
+ /// The head of the channel.
+ head: CachePadded<Position<T>>,
+
+ /// The tail of the channel.
+ tail: CachePadded<Position<T>>,
+
+ /// Receivers waiting while the channel is empty and not disconnected.
+ receivers: SyncWaker,
+
+ /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Creates a new unbounded channel.
+ pub(crate) fn new() -> Self {
+ Channel {
+ head: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
+ index: AtomicUsize::new(0),
+ }),
+ tail: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
+ index: AtomicUsize::new(0),
+ }),
+ receivers: SyncWaker::new(),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Returns a receiver handle to the channel.
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
+ Receiver(self)
+ }
+
+ /// Returns a sender handle to the channel.
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
+ Sender(self)
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ let mut block = self.tail.block.load(Ordering::Acquire);
+ let mut next_block = None;
+
+ loop {
+ // Check if the channel is disconnected.
+ if tail & MARK_BIT != 0 {
+ token.list.block = ptr::null();
+ return true;
+ }
+
+ // Calculate the offset of the index into the block.
+ let offset = (tail >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.snooze();
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // If we're going to have to install the next block, allocate it in advance in order to
+ // make the wait for other threads as short as possible.
+ if offset + 1 == BLOCK_CAP && next_block.is_none() {
+ next_block = Some(Box::new(Block::<T>::new()));
+ }
+
+ // If this is the first message to be sent into the channel, we need to allocate the
+ // first block and install it.
+ if block.is_null() {
+ let new = Box::into_raw(Box::new(Block::<T>::new()));
+
+ if self
+ .tail
+ .block
+ .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ self.head.block.store(new, Ordering::Release);
+ block = new;
+ } else {
+ next_block = unsafe { Some(Box::from_raw(new)) };
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+ }
+
+ let new_tail = tail + (1 << SHIFT);
+
+ // Try advancing the tail forward.
+ match self.tail.index.compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, install the next one.
+ if offset + 1 == BLOCK_CAP {
+ let next_block = Box::into_raw(next_block.unwrap());
+ self.tail.block.store(next_block, Ordering::Release);
+ self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
+ (*block).next.store(next_block, Ordering::Release);
+ }
+
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ },
+ Err(t) => {
+ tail = t;
+ block = self.tail.block.load(Ordering::Acquire);
+ backoff.spin();
+ }
+ }
+ }
+ }
+
+ /// Writes a message into the channel.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.list.block.is_null() {
+ return Err(msg);
+ }
+
+ // Write the message into the slot.
+ let block = token.list.block.cast::<Block<T>>();
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.msg.get().write(MaybeUninit::new(msg));
+ slot.state.fetch_or(WRITE, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
+ }
+
+ /// Attempts to reserve a slot for receiving a message.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ loop {
+ // Calculate the offset of the index into the block.
+ let offset = (head >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.snooze();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ let mut new_head = head + (1 << SHIFT);
+
+ if new_head & MARK_BIT == 0 {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if head >> SHIFT == tail >> SHIFT {
+ // If the channel is disconnected...
+ if tail & MARK_BIT != 0 {
+ // ...then receive an error.
+ token.list.block = ptr::null();
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
+ }
+ }
+
+ // If head and tail are not in the same block, set `MARK_BIT` in head.
+ if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
+ new_head |= MARK_BIT;
+ }
+ }
+
+ // The block can be null here only if the first message is being sent into the channel.
+ // In that case, just wait until it gets initialized.
+ if block.is_null() {
+ backoff.snooze();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // Try moving the head index forward.
+ match self.head.index.compare_exchange_weak(
+ head,
+ new_head,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, move to the next one.
+ if offset + 1 == BLOCK_CAP {
+ let next = (*block).wait_next();
+ let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
+ if !(*next).next.load(Ordering::Relaxed).is_null() {
+ next_index |= MARK_BIT;
+ }
+
+ self.head.block.store(next, Ordering::Release);
+ self.head.index.store(next_index, Ordering::Release);
+ }
+
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ },
+ Err(h) => {
+ head = h;
+ block = self.head.block.load(Ordering::Acquire);
+ backoff.spin();
+ }
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.list.block.is_null() {
+ // The channel is disconnected.
+ return Err(());
+ }
+
+ // Read the message.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let msg = slot.msg.get().read().assume_init();
+
+ // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+ // couldn't because we were busy reading from the slot.
+ if offset + 1 == BLOCK_CAP {
+ Block::destroy(block, 0);
+ } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+ Block::destroy(block, offset + 1);
+ }
+
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ self.send(msg, None).map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ _deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ assert!(self.start_send(token));
+ unsafe {
+ self.write(token, msg)
+ .map_err(SendTimeoutError::Disconnected)
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ loop {
+ // Try receiving a message several times.
+ let backoff = Backoff::new();
+ loop {
+ if self.start_recv(token) {
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.snooze();
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ // Prepare for blocking until a sender wakes us up.
+ Context::with(|cx| {
+ let oper = Operation::hook(token);
+ self.receivers.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_empty() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.receivers.unregister(oper).unwrap();
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ loop {
+ // Load the tail index, then load the head index.
+ let mut tail = self.tail.index.load(Ordering::SeqCst);
+ let mut head = self.head.index.load(Ordering::SeqCst);
+
+ // If the tail index didn't change, we've got consistent indices to work with.
+ if self.tail.index.load(Ordering::SeqCst) == tail {
+ // Erase the lower bits.
+ tail &= !((1 << SHIFT) - 1);
+ head &= !((1 << SHIFT) - 1);
+
+ // Fix up indices if they fall onto block ends.
+ if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
+ tail = tail.wrapping_add(1 << SHIFT);
+ }
+ if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Rotate indices so that head falls into the first block.
+ let lap = (head >> SHIFT) / LAP;
+ tail = tail.wrapping_sub((lap * LAP) << SHIFT);
+ head = head.wrapping_sub((lap * LAP) << SHIFT);
+
+ // Remove the lower bits.
+ tail >>= SHIFT;
+ head >>= SHIFT;
+
+ // Return the difference minus the number of blocks between tail and head.
+ return tail - head - tail / LAP;
+ }
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ None
+ }
+
+ /// Disconnects senders and wakes up all blocked receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_senders(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ self.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Disconnects receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ // If receivers are dropped first, discard all messages to free
+ // memory eagerly.
+ self.discard_all_messages();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Discards all messages.
+ ///
+ /// This method should only be called when all receivers are dropped.
+ fn discard_all_messages(&self) {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ loop {
+ let offset = (tail >> SHIFT) % LAP;
+ if offset != BLOCK_CAP {
+ break;
+ }
+
+ // New updates to tail will be rejected by MARK_BIT and aborted unless it's
+ // at boundary. We need to wait for the updates take affect otherwise there
+ // can be memory leaks.
+ backoff.snooze();
+ tail = self.tail.index.load(Ordering::Acquire);
+ }
+
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head >> SHIFT != tail >> SHIFT {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ (*block).wait_next();
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Acquire);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ head &= !MARK_BIT;
+ self.head.block.store(ptr::null_mut(), Ordering::Release);
+ self.head.index.store(head, Ordering::Release);
+ }
+
+ /// Returns `true` if the channel is disconnected.
+ pub(crate) fn is_disconnected(&self) -> bool {
+ self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ let head = self.head.index.load(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::SeqCst);
+ head >> SHIFT == tail >> SHIFT
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ false
+ }
+}
+
+impl<T> Drop for Channel<T> {
+ fn drop(&mut self) {
+ let mut head = *self.head.index.get_mut();
+ let mut tail = *self.tail.index.get_mut();
+ let mut block = *self.head.block.get_mut();
+
+ // Erase the lower bits.
+ head &= !((1 << SHIFT) - 1);
+ tail &= !((1 << SHIFT) - 1);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head != tail {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ // Deallocate the block and move to the next one.
+ let next = *(*block).next.get_mut();
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ }
+}
+
+/// Receiver handle to a channel.
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
+
+/// Sender handle to a channel.
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
+
+impl<T> SelectHandle for Receiver<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_recv(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.receivers.register(oper, cx);
+ self.is_ready()
+ }
+
+ fn unregister(&self, oper: Operation) {
+ self.0.receivers.unregister(oper);
+ }
+
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ fn is_ready(&self) -> bool {
+ !self.0.is_empty() || self.0.is_disconnected()
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ self.0.receivers.watch(oper, cx);
+ self.is_ready()
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ self.0.receivers.unwatch(oper);
+ }
+}
+
+impl<T> SelectHandle for Sender<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_send(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ fn unregister(&self, _oper: Operation) {}
+
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ fn is_ready(&self) -> bool {
+ true
+ }
+
+ fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ fn unwatch(&self, _oper: Operation) {}
+}
diff --git a/third_party/rust/crossbeam-channel/src/flavors/mod.rs b/third_party/rust/crossbeam-channel/src/flavors/mod.rs
new file mode 100644
index 0000000000..0314bf4209
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/mod.rs
@@ -0,0 +1,17 @@
+//! Channel flavors.
+//!
+//! There are six flavors:
+//!
+//! 1. `at` - Channel that delivers a message after a certain amount of time.
+//! 2. `array` - Bounded channel based on a preallocated array.
+//! 3. `list` - Unbounded channel implemented as a linked list.
+//! 4. `never` - Channel that never delivers messages.
+//! 5. `tick` - Channel that delivers messages periodically.
+//! 6. `zero` - Zero-capacity channel.
+
+pub(crate) mod array;
+pub(crate) mod at;
+pub(crate) mod list;
+pub(crate) mod never;
+pub(crate) mod tick;
+pub(crate) mod zero;
diff --git a/third_party/rust/crossbeam-channel/src/flavors/never.rs b/third_party/rust/crossbeam-channel/src/flavors/never.rs
new file mode 100644
index 0000000000..277a61dc1c
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/never.rs
@@ -0,0 +1,110 @@
+//! Channel that never delivers messages.
+//!
+//! Messages cannot be sent into this kind of channel.
+
+use std::marker::PhantomData;
+use std::time::Instant;
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, TryRecvError};
+use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
+
+/// This flavor doesn't need a token.
+pub(crate) type NeverToken = ();
+
+/// Channel that never delivers messages.
+pub(crate) struct Channel<T> {
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Creates a channel that never delivers messages.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ Channel {
+ _marker: PhantomData,
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ #[inline]
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ Err(TryRecvError::Empty)
+ }
+
+ /// Receives a message from the channel.
+ #[inline]
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ utils::sleep_until(deadline);
+ Err(RecvTimeoutError::Timeout)
+ }
+
+ /// Reads a message from the channel.
+ #[inline]
+ pub(crate) unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> {
+ Err(())
+ }
+
+ /// Returns `true` if the channel is empty.
+ #[inline]
+ pub(crate) fn is_empty(&self) -> bool {
+ true
+ }
+
+ /// Returns `true` if the channel is full.
+ #[inline]
+ pub(crate) fn is_full(&self) -> bool {
+ true
+ }
+
+ /// Returns the number of messages in the channel.
+ #[inline]
+ pub(crate) fn len(&self) -> usize {
+ 0
+ }
+
+ /// Returns the capacity of the channel.
+ #[inline]
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(0)
+ }
+}
+
+impl<T> SelectHandle for Channel<T> {
+ #[inline]
+ fn try_select(&self, _token: &mut Token) -> bool {
+ false
+ }
+
+ #[inline]
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ #[inline]
+ fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unregister(&self, _oper: Operation) {}
+
+ #[inline]
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ #[inline]
+ fn is_ready(&self) -> bool {
+ false
+ }
+
+ #[inline]
+ fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unwatch(&self, _oper: Operation) {}
+}
diff --git a/third_party/rust/crossbeam-channel/src/flavors/tick.rs b/third_party/rust/crossbeam-channel/src/flavors/tick.rs
new file mode 100644
index 0000000000..4201b6eb0b
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/tick.rs
@@ -0,0 +1,168 @@
+//! Channel that delivers messages periodically.
+//!
+//! Messages cannot be sent into this kind of channel; they are materialized on demand.
+
+use std::thread;
+use std::time::{Duration, Instant};
+
+use crossbeam_utils::atomic::AtomicCell;
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, TryRecvError};
+use crate::select::{Operation, SelectHandle, Token};
+use crate::utils;
+
+/// Result of a receive operation.
+pub(crate) type TickToken = Option<Instant>;
+
+/// Channel that delivers messages periodically.
+pub(crate) struct Channel {
+ /// The instant at which the next message will be delivered.
+ delivery_time: AtomicCell<Instant>,
+
+ /// The time interval in which messages get delivered.
+ duration: Duration,
+}
+
+impl Channel {
+ /// Creates a channel that delivers messages periodically.
+ #[inline]
+ pub(crate) fn new(dur: Duration) -> Self {
+ Channel {
+ delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)),
+ duration: dur,
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ #[inline]
+ pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
+ loop {
+ let now = Instant::now();
+ let delivery_time = self.delivery_time.load();
+
+ if now < delivery_time {
+ return Err(TryRecvError::Empty);
+ }
+
+ if self
+ .delivery_time
+ .compare_exchange(delivery_time, now + self.duration)
+ .is_ok()
+ {
+ return Ok(delivery_time);
+ }
+ }
+ }
+
+ /// Receives a message from the channel.
+ #[inline]
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
+ loop {
+ let delivery_time = self.delivery_time.load();
+ let now = Instant::now();
+
+ if let Some(d) = deadline {
+ if d < delivery_time {
+ if now < d {
+ thread::sleep(d - now);
+ }
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ if self
+ .delivery_time
+ .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
+ .is_ok()
+ {
+ if now < delivery_time {
+ thread::sleep(delivery_time - now);
+ }
+ return Ok(delivery_time);
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ #[inline]
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
+ token.tick.ok_or(())
+ }
+
+ /// Returns `true` if the channel is empty.
+ #[inline]
+ pub(crate) fn is_empty(&self) -> bool {
+ Instant::now() < self.delivery_time.load()
+ }
+
+ /// Returns `true` if the channel is full.
+ #[inline]
+ pub(crate) fn is_full(&self) -> bool {
+ !self.is_empty()
+ }
+
+ /// Returns the number of messages in the channel.
+ #[inline]
+ pub(crate) fn len(&self) -> usize {
+ if self.is_empty() {
+ 0
+ } else {
+ 1
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ #[inline]
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(1)
+ }
+}
+
+impl SelectHandle for Channel {
+ #[inline]
+ fn try_select(&self, token: &mut Token) -> bool {
+ match self.try_recv() {
+ Ok(msg) => {
+ token.tick = Some(msg);
+ true
+ }
+ Err(TryRecvError::Disconnected) => {
+ token.tick = None;
+ true
+ }
+ Err(TryRecvError::Empty) => false,
+ }
+ }
+
+ #[inline]
+ fn deadline(&self) -> Option<Instant> {
+ Some(self.delivery_time.load())
+ }
+
+ #[inline]
+ fn register(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unregister(&self, _oper: Operation) {}
+
+ #[inline]
+ fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
+ self.try_select(token)
+ }
+
+ #[inline]
+ fn is_ready(&self) -> bool {
+ !self.is_empty()
+ }
+
+ #[inline]
+ fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
+ self.is_ready()
+ }
+
+ #[inline]
+ fn unwatch(&self, _oper: Operation) {}
+}
diff --git a/third_party/rust/crossbeam-channel/src/flavors/zero.rs b/third_party/rust/crossbeam-channel/src/flavors/zero.rs
new file mode 100644
index 0000000000..aae2ea3002
--- /dev/null
+++ b/third_party/rust/crossbeam-channel/src/flavors/zero.rs
@@ -0,0 +1,495 @@
+//! Zero-capacity channel.
+//!
+//! This kind of channel is also known as *rendezvous* channel.
+
+use std::cell::UnsafeCell;
+use std::marker::PhantomData;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Mutex;
+use std::time::Instant;
+use std::{fmt, ptr};
+
+use crossbeam_utils::Backoff;
+
+use crate::context::Context;
+use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
+use crate::select::{Operation, SelectHandle, Selected, Token};
+use crate::waker::Waker;
+
+/// A pointer to a packet.
+pub(crate) struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+ fn default() -> Self {
+ Self(ptr::null_mut())
+ }
+}
+
+impl fmt::Debug for ZeroToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&(self.0 as usize), f)
+ }
+}
+
+/// A slot for passing one message from a sender to a receiver.
+struct Packet<T> {
+ /// Equals `true` if the packet is allocated on the stack.
+ on_stack: bool,
+
+ /// Equals `true` once the packet is ready for reading or writing.
+ ready: AtomicBool,
+
+ /// The message.
+ msg: UnsafeCell<Option<T>>,
+}
+
+impl<T> Packet<T> {
+ /// Creates an empty packet on the stack.
+ fn empty_on_stack() -> Packet<T> {
+ Packet {
+ on_stack: true,
+ ready: AtomicBool::new(false),
+ msg: UnsafeCell::new(None),
+ }
+ }
+
+ /// Creates an empty packet on the heap.
+ fn empty_on_heap() -> Box<Packet<T>> {
+ Box::new(Packet {
+ on_stack: false,
+ ready: AtomicBool::new(false),
+ msg: UnsafeCell::new(None),
+ })
+ }
+
+ /// Creates a packet on the stack, containing a message.
+ fn message_on_stack(msg: T) -> Packet<T> {
+ Packet {
+ on_stack: true,
+ ready: AtomicBool::new(false),
+ msg: UnsafeCell::new(Some(msg)),
+ }
+ }
+
+ /// Waits until the packet becomes ready for reading or writing.
+ fn wait_ready(&self) {
+ let backoff = Backoff::new();
+ while !self.ready.load(Ordering::Acquire) {
+ backoff.snooze();
+ }
+ }
+}
+
+/// Inner representation of a zero-capacity channel.
+struct Inner {
+ /// Senders waiting to pair up with a receive operation.
+ senders: Waker,
+
+ /// Receivers waiting to pair up with a send operation.
+ receivers: Waker,
+
+ /// Equals `true` when the channel is disconnected.
+ is_disconnected: bool,
+}
+
+/// Zero-capacity channel.
+pub(crate) struct Channel<T> {
+ /// Inner representation of the channel.
+ inner: Mutex<Inner>,
+
+ /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Constructs a new zero-capacity channel.
+ pub(crate) fn new() -> Self {
+ Channel {
+ inner: Mutex::new(Inner {
+ senders: Waker::new(),
+ receivers: Waker::new(),
+ is_disconnected: false,
+ }),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Returns a receiver handle to the channel.
+ pub(crate) fn receiver(&self) -> Receiver<'_, T> {
+ Receiver(self)
+ }
+
+ /// Returns a sender handle to the channel.
+ pub(crate) fn sender(&self) -> Sender<'_, T> {
+ Sender(self)
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ true
+ } else if inner.is_disconnected {
+ token.zero.0 = ptr::null_mut();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Writes a message into the packet.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero.0.is_null() {
+ return Err(msg);
+ }
+
+ let packet = &*(token.zero.0 as *const Packet<T>);
+ packet.msg.get().write(Some(msg));
+ packet.ready.store(true, Ordering::Release);
+ Ok(())
+ }
+
+ /// Attempts to pair up with a sender.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ true
+ } else if inner.is_disconnected {
+ token.zero.0 = ptr::null_mut();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Reads a message from the packet.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero.0.is_null() {
+ return Err(());
+ }
+
+ let packet = &*(token.zero.0 as *const Packet<T>);
+
+ if packet.on_stack {
+ // The message has been in the packet from the beginning, so there is no need to wait
+ // for it. However, after reading the message, we need to set `ready` to `true` in
+ // order to signal that the packet can be destroyed.
+ let msg = packet.msg.get().replace(None).unwrap();
+ packet.ready.store(true, Ordering::Release);
+ Ok(msg)
+ } else {
+ // Wait until the message becomes available, then read it and destroy the
+ // heap-allocated packet.
+ packet.wait_ready();
+ let msg = packet.msg.get().replace(None).unwrap();
+ drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
+ Ok(msg)
+ }
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ Ok(())
+ } else if inner.is_disconnected {
+ Err(TrySendError::Disconnected(msg))
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ return Ok(());
+ }
+
+ if inner.is_disconnected {
+ return Err(SendTimeoutError::Disconnected(msg));
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ let mut packet = Packet::<T>::message_on_stack(msg);
+ inner
+ .senders
+ .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
+ inner.receivers.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Timeout(msg))
+ }
+ Selected::Disconnected => {
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Disconnected(msg))
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is read, then drop the packet.
+ packet.wait_ready();
+ Ok(())
+ }
+ }
+ })
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else if inner.is_disconnected {
+ Err(TryRecvError::Disconnected)
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if inner.is_disconnected {
+ return Err(RecvTimeoutError::Disconnected);
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ let mut packet = Packet::<T>::empty_on_stack();
+ inner.receivers.register_with_packet(
+ oper,
+ &mut packet as *mut Packet<T> as *mut (),
+ cx,
+ );
+ inner.senders.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner
+ .lock()
+ .unwrap()
+ .receivers
+ .unregister(oper)
+ .unwrap();
+ Err(RecvTimeoutError::Timeout)
+ }
+ Selected::Disconnected => {
+ self.inner
+ .lock()
+ .unwrap()
+ .receivers
+ .unregister(oper)
+ .unwrap();
+ Err(RecvTimeoutError::Disconnected)
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is provided, then read it.
+ packet.wait_ready();
+ unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
+ }
+ }
+ })
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect(&self) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ if !inner.is_disconnected {
+ inner.is_disconnected = true;
+ inner.senders.disconnect();
+ inner.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ 0
+ }
+
+ /// Returns the capacity of the channel.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(0)
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ true
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ true
+ }
+}
+
+/// Receiver handle to a channel.
+pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
+
+/// Sender handle to a channel.
+pub(crate) struct Sender<'a, T>(&'a Channel<T>);
+
+impl<T> SelectHandle for Receiver<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_recv(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ let packet = Box::into_raw(Packet::<T>::empty_on_heap());
+
+ let mut inner = self.0.inner.lock().unwrap();
+ inner
+ .receivers
+ .register_with_packet(oper, packet.cast::<()>(), cx);
+ inner.senders.notify();
+ inner.senders.can_select() || inner.is_disconnected
+ }
+
+ fn unregister(&self, oper: Operation) {
+ if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
+ unsafe {
+ drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
+ }
+ }
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ token.zero.0 = cx.wait_packet();
+ true
+ }
+
+ fn is_ready(&self) -> bool {
+ let inner = self.0.inner.lock().unwrap();
+ inner.senders.can_select() || inner.is_disconnected
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ let mut inner = self.0.inner.lock().unwrap();
+ inner.receivers.watch(oper, cx);
+ inner.senders.can_select() || inner.is_disconnected
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ let mut inner = self.0.inner.lock().unwrap();
+ inner.receivers.unwatch(oper);
+ }
+}
+
+impl<T> SelectHandle for Sender<'_, T> {
+ fn try_select(&self, token: &mut Token) -> bool {
+ self.0.start_send(token)
+ }
+
+ fn deadline(&self) -> Option<Instant> {
+ None
+ }
+
+ fn register(&self, oper: Operation, cx: &Context) -> bool {
+ let packet = Box::into_raw(Packet::<T>::empty_on_heap());
+
+ let mut inner = self.0.inner.lock().unwrap();
+ inner
+ .senders
+ .register_with_packet(oper, packet.cast::<()>(), cx);
+ inner.receivers.notify();
+ inner.receivers.can_select() || inner.is_disconnected
+ }
+
+ fn unregister(&self, oper: Operation) {
+ if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
+ unsafe {
+ drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
+ }
+ }
+ }
+
+ fn accept(&self, token: &mut Token, cx: &Context) -> bool {
+ token.zero.0 = cx.wait_packet();
+ true
+ }
+
+ fn is_ready(&self) -> bool {
+ let inner = self.0.inner.lock().unwrap();
+ inner.receivers.can_select() || inner.is_disconnected
+ }
+
+ fn watch(&self, oper: Operation, cx: &Context) -> bool {
+ let mut inner = self.0.inner.lock().unwrap();
+ inner.senders.watch(oper, cx);
+ inner.receivers.can_select() || inner.is_disconnected
+ }
+
+ fn unwatch(&self, oper: Operation) {
+ let mut inner = self.0.inner.lock().unwrap();
+ inner.senders.unwatch(oper);
+ }
+}