//! Bounded channel based on a preallocated array. //! //! This flavor has a fixed, positive capacity. //! //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. //! //! Source: //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue //! - https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub //! //! Copyright & License: //! - Copyright (c) 2010-2011 Dmitry Vyukov //! - Simplified BSD License and Apache License, Version 2.0 //! - http://www.1024cores.net/home/code-license use std::cell::UnsafeCell; use std::marker::PhantomData; use std::mem::{self, MaybeUninit}; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; use crate::waker::SyncWaker; /// A slot in a channel. struct Slot { /// The current stamp. stamp: AtomicUsize, /// The message in this slot. msg: UnsafeCell>, } /// The token type for the array flavor. #[derive(Debug)] pub struct ArrayToken { /// Slot to read from or write to. slot: *const u8, /// Stamp to store into the slot after reading or writing. stamp: usize, } impl Default for ArrayToken { #[inline] fn default() -> Self { ArrayToken { slot: ptr::null(), stamp: 0, } } } /// Bounded channel based on a preallocated array. pub struct Channel { /// The head of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but /// packed into a single `usize`. The lower bits represent the index, while the upper bits /// represent the lap. The mark bit in the head is always zero. /// /// Messages are popped from the head of the channel. head: CachePadded, /// The tail of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but /// packed into a single `usize`. The lower bits represent the index, while the upper bits /// represent the lap. The mark bit indicates that the channel is disconnected. /// /// Messages are pushed into the tail of the channel. tail: CachePadded, /// The buffer holding slots. buffer: *mut Slot, /// The channel capacity. cap: usize, /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. one_lap: usize, /// If this bit is set in the tail, that means the channel is disconnected. mark_bit: usize, /// Senders waiting while the channel is full. senders: SyncWaker, /// Receivers waiting while the channel is empty and not disconnected. receivers: SyncWaker, /// Indicates that dropping a `Channel` may drop values of type `T`. _marker: PhantomData, } impl Channel { /// Creates a bounded channel of capacity `cap`. pub fn with_capacity(cap: usize) -> Self { assert!(cap > 0, "capacity must be positive"); // Compute constants `mark_bit` and `one_lap`. let mark_bit = (cap + 1).next_power_of_two(); let one_lap = mark_bit * 2; // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. let head = 0; // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. let tail = 0; // Allocate a buffer of `cap` slots initialized // with stamps. let buffer = { let mut boxed: Box<[Slot]> = (0..cap) .map(|i| { // Set the stamp to `{ lap: 0, mark: 0, index: i }`. Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()), } }) .collect(); let ptr = boxed.as_mut_ptr(); mem::forget(boxed); ptr }; Channel { buffer, cap, one_lap, mark_bit, head: CachePadded::new(AtomicUsize::new(head)), tail: CachePadded::new(AtomicUsize::new(tail)), senders: SyncWaker::new(), receivers: SyncWaker::new(), _marker: PhantomData, } } /// Returns a receiver handle to the channel. pub fn receiver(&self) -> Receiver<'_, T> { Receiver(self) } /// Returns a sender handle to the channel. pub fn sender(&self) -> Sender<'_, T> { Sender(self) } /// Attempts to reserve a slot for sending a message. fn start_send(&self, token: &mut Token) -> bool { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); loop { // Check if the channel is disconnected. if tail & self.mark_bit != 0 { token.array.slot = ptr::null(); token.array.stamp = 0; return true; } // Deconstruct the tail. let index = tail & (self.mark_bit - 1); let lap = tail & !(self.one_lap - 1); // Inspect the corresponding slot. let slot = unsafe { &*self.buffer.add(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the tail and the stamp match, we may attempt to push. if tail == stamp { let new_tail = if index + 1 < self.cap { // Same lap, incremented index. // Set to `{ lap: lap, mark: 0, index: index + 1 }`. tail + 1 } else { // One lap forward, index wraps around to zero. // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. lap.wrapping_add(self.one_lap) }; // Try moving the tail. match self.tail.compare_exchange_weak( tail, new_tail, Ordering::SeqCst, Ordering::Relaxed, ) { Ok(_) => { // Prepare the token for the follow-up call to `write`. token.array.slot = slot as *const Slot as *const u8; token.array.stamp = tail + 1; return true; } Err(t) => { tail = t; backoff.spin(); } } } else if stamp.wrapping_add(self.one_lap) == tail + 1 { atomic::fence(Ordering::SeqCst); let head = self.head.load(Ordering::Relaxed); // If the head lags one lap behind the tail as well... if head.wrapping_add(self.one_lap) == tail { // ...then the channel is full. return false; } backoff.spin(); tail = self.tail.load(Ordering::Relaxed); } else { // Snooze because we need to wait for the stamp to get updated. backoff.snooze(); tail = self.tail.load(Ordering::Relaxed); } } } /// Writes a message into the channel. pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { // If there is no slot, the channel is disconnected. if token.array.slot.is_null() { return Err(msg); } let slot: &Slot = &*(token.array.slot as *const Slot); // Write the message into the slot and update the stamp. slot.msg.get().write(MaybeUninit::new(msg)); slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping receiver. self.receivers.notify(); Ok(()) } /// Attempts to reserve a slot for receiving a message. fn start_recv(&self, token: &mut Token) -> bool { let backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); loop { // Deconstruct the head. let index = head & (self.mark_bit - 1); let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. let slot = unsafe { &*self.buffer.add(index) }; let stamp = slot.stamp.load(Ordering::Acquire); // If the the stamp is ahead of the head by 1, we may attempt to pop. if head + 1 == stamp { let new = if index + 1 < self.cap { // Same lap, incremented index. // Set to `{ lap: lap, mark: 0, index: index + 1 }`. head + 1 } else { // One lap forward, index wraps around to zero. // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. lap.wrapping_add(self.one_lap) }; // Try moving the head. match self.head.compare_exchange_weak( head, new, Ordering::SeqCst, Ordering::Relaxed, ) { Ok(_) => { // Prepare the token for the follow-up call to `read`. token.array.slot = slot as *const Slot as *const u8; token.array.stamp = head.wrapping_add(self.one_lap); return true; } Err(h) => { head = h; backoff.spin(); } } } else if stamp == head { atomic::fence(Ordering::SeqCst); let tail = self.tail.load(Ordering::Relaxed); // If the tail equals the head, that means the channel is empty. if (tail & !self.mark_bit) == head { // If the channel is disconnected... if tail & self.mark_bit != 0 { // ...then receive an error. token.array.slot = ptr::null(); token.array.stamp = 0; return true; } else { // Otherwise, the receive operation is not ready. return false; } } backoff.spin(); head = self.head.load(Ordering::Relaxed); } else { // Snooze because we need to wait for the stamp to get updated. backoff.snooze(); head = self.head.load(Ordering::Relaxed); } } } /// Reads a message from the channel. pub unsafe fn read(&self, token: &mut Token) -> Result { if token.array.slot.is_null() { // The channel is disconnected. return Err(()); } let slot: &Slot = &*(token.array.slot as *const Slot); // Read the message from the slot and update the stamp. let msg = slot.msg.get().read().assume_init(); slot.stamp.store(token.array.stamp, Ordering::Release); // Wake a sleeping sender. self.senders.notify(); Ok(msg) } /// Attempts to send a message into the channel. pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { let token = &mut Token::default(); if self.start_send(token) { unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } } else { Err(TrySendError::Full(msg)) } } /// Sends a message into the channel. pub fn send(&self, msg: T, deadline: Option) -> Result<(), SendTimeoutError> { let token = &mut Token::default(); loop { // Try sending a message several times. let backoff = Backoff::new(); loop { if self.start_send(token) { let res = unsafe { self.write(token, msg) }; return res.map_err(SendTimeoutError::Disconnected); } if backoff.is_completed() { break; } else { backoff.snooze(); } } if let Some(d) = deadline { if Instant::now() >= d { return Err(SendTimeoutError::Timeout(msg)); } } Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); self.senders.register(oper, cx); // Has the channel become ready just now? if !self.is_full() || self.is_disconnected() { let _ = cx.try_select(Selected::Aborted); } // Block the current thread. let sel = cx.wait_until(deadline); match sel { Selected::Waiting => unreachable!(), Selected::Aborted | Selected::Disconnected => { self.senders.unregister(oper).unwrap(); } Selected::Operation(_) => {} } }); } } /// Attempts to receive a message without blocking. pub fn try_recv(&self) -> Result { let token = &mut Token::default(); if self.start_recv(token) { unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } } else { Err(TryRecvError::Empty) } } /// Receives a message from the channel. pub fn recv(&self, deadline: Option) -> Result { let token = &mut Token::default(); loop { // Try receiving a message several times. let backoff = Backoff::new(); loop { if self.start_recv(token) { let res = unsafe { self.read(token) }; return res.map_err(|_| RecvTimeoutError::Disconnected); } if backoff.is_completed() { break; } else { backoff.snooze(); } } if let Some(d) = deadline { if Instant::now() >= d { return Err(RecvTimeoutError::Timeout); } } Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); self.receivers.register(oper, cx); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { let _ = cx.try_select(Selected::Aborted); } // Block the current thread. let sel = cx.wait_until(deadline); match sel { Selected::Waiting => unreachable!(), Selected::Aborted | Selected::Disconnected => { self.receivers.unregister(oper).unwrap(); // If the channel was disconnected, we still have to check for remaining // messages. } Selected::Operation(_) => {} } }); } } /// Returns the current number of messages inside the channel. pub fn len(&self) -> usize { loop { // Load the tail, then load the head. let tail = self.tail.load(Ordering::SeqCst); let head = self.head.load(Ordering::SeqCst); // If the tail didn't change, we've got consistent values to work with. if self.tail.load(Ordering::SeqCst) == tail { let hix = head & (self.mark_bit - 1); let tix = tail & (self.mark_bit - 1); return if hix < tix { tix - hix } else if hix > tix { self.cap - hix + tix } else if (tail & !self.mark_bit) == head { 0 } else { self.cap }; } } } /// Returns the capacity of the channel. pub fn capacity(&self) -> Option { Some(self.cap) } /// Disconnects the channel and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. pub fn disconnect(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { self.senders.disconnect(); self.receivers.disconnect(); true } else { false } } /// Returns `true` if the channel is disconnected. pub fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 } /// Returns `true` if the channel is empty. pub fn is_empty(&self) -> bool { let head = self.head.load(Ordering::SeqCst); let tail = self.tail.load(Ordering::SeqCst); // Is the tail equal to the head? // // Note: If the head changes just before we load the tail, that means there was a moment // when the channel was not empty, so it is safe to just return `false`. (tail & !self.mark_bit) == head } /// Returns `true` if the channel is full. pub fn is_full(&self) -> bool { let tail = self.tail.load(Ordering::SeqCst); let head = self.head.load(Ordering::SeqCst); // Is the head lagging one lap behind tail? // // Note: If the tail changes just before we load the head, that means there was a moment // when the channel was not full, so it is safe to just return `false`. head.wrapping_add(self.one_lap) == tail & !self.mark_bit } } impl Drop for Channel { fn drop(&mut self) { // Get the index of the head. let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); // Loop over all slots that hold a message and drop them. for i in 0..self.len() { // Compute the index of the next slot holding a message. let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap }; unsafe { let p = { let slot = &mut *self.buffer.add(index); let msg = &mut *slot.msg.get(); msg.as_mut_ptr() }; p.drop_in_place(); } } // Finally, deallocate the buffer, but don't run any destructors. unsafe { // Create a slice from the buffer to make // a fat pointer. Then, use Box::from_raw // to deallocate it. let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot]; Box::from_raw(ptr); } } } /// Receiver handle to a channel. pub struct Receiver<'a, T>(&'a Channel); /// Sender handle to a channel. pub struct Sender<'a, T>(&'a Channel); impl SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { self.0.start_recv(token) } fn deadline(&self) -> Option { None } fn register(&self, oper: Operation, cx: &Context) -> bool { self.0.receivers.register(oper, cx); self.is_ready() } fn unregister(&self, oper: Operation) { self.0.receivers.unregister(oper); } fn accept(&self, token: &mut Token, _cx: &Context) -> bool { self.try_select(token) } fn is_ready(&self) -> bool { !self.0.is_empty() || self.0.is_disconnected() } fn watch(&self, oper: Operation, cx: &Context) -> bool { self.0.receivers.watch(oper, cx); self.is_ready() } fn unwatch(&self, oper: Operation) { self.0.receivers.unwatch(oper); } } impl SelectHandle for Sender<'_, T> { fn try_select(&self, token: &mut Token) -> bool { self.0.start_send(token) } fn deadline(&self) -> Option { None } fn register(&self, oper: Operation, cx: &Context) -> bool { self.0.senders.register(oper, cx); self.is_ready() } fn unregister(&self, oper: Operation) { self.0.senders.unregister(oper); } fn accept(&self, token: &mut Token, _cx: &Context) -> bool { self.try_select(token) } fn is_ready(&self) -> bool { !self.0.is_full() || self.0.is_disconnected() } fn watch(&self, oper: Operation, cx: &Context) -> bool { self.0.senders.watch(oper, cx); self.is_ready() } fn unwatch(&self, oper: Operation) { self.0.senders.unwatch(oper); } }