//! Reference counter for channels. use std::isize; use std::ops; use std::process; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; /// Reference counter internals. struct Counter { /// The number of senders associated with the channel. senders: AtomicUsize, /// The number of receivers associated with the channel. receivers: AtomicUsize, /// Set to `true` if the last sender or the last receiver reference deallocates the channel. destroy: AtomicBool, /// The internal channel. chan: C, } /// Wraps a channel into the reference counter. pub fn new(chan: C) -> (Sender, Receiver) { let counter = Box::into_raw(Box::new(Counter { senders: AtomicUsize::new(1), receivers: AtomicUsize::new(1), destroy: AtomicBool::new(false), chan, })); let s = Sender { counter }; let r = Receiver { counter }; (s, r) } /// The sending side. pub struct Sender { counter: *mut Counter, } impl Sender { /// Returns the internal `Counter`. fn counter(&self) -> &Counter { unsafe { &*self.counter } } /// Acquires another sender reference. pub fn acquire(&self) -> Sender { let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); // Cloning senders and calling `mem::forget` on the clones could potentially overflow the // counter. It's very difficult to recover sensibly from such degenerate scenarios so we // just abort when the count becomes very large. if count > isize::MAX as usize { process::abort(); } Sender { counter: self.counter, } } /// Releases the sender reference. /// /// Function `disconnect` will be called if this is the last sender reference. pub unsafe fn release bool>(&self, disconnect: F) { if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { disconnect(&self.counter().chan); if self.counter().destroy.swap(true, Ordering::AcqRel) { drop(Box::from_raw(self.counter)); } } } } impl ops::Deref for Sender { type Target = C; fn deref(&self) -> &C { &self.counter().chan } } impl PartialEq for Sender { fn eq(&self, other: &Sender) -> bool { self.counter == other.counter } } /// The receiving side. pub struct Receiver { counter: *mut Counter, } impl Receiver { /// Returns the internal `Counter`. fn counter(&self) -> &Counter { unsafe { &*self.counter } } /// Acquires another receiver reference. pub fn acquire(&self) -> Receiver { let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the // counter. It's very difficult to recover sensibly from such degenerate scenarios so we // just abort when the count becomes very large. if count > isize::MAX as usize { process::abort(); } Receiver { counter: self.counter, } } /// Releases the receiver reference. /// /// Function `disconnect` will be called if this is the last receiver reference. pub unsafe fn release bool>(&self, disconnect: F) { if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { disconnect(&self.counter().chan); if self.counter().destroy.swap(true, Ordering::AcqRel) { drop(Box::from_raw(self.counter)); } } } } impl ops::Deref for Receiver { type Target = C; fn deref(&self) -> &C { &self.counter().chan } } impl PartialEq for Receiver { fn eq(&self, other: &Receiver) -> bool { self.counter == other.counter } }