//! A restricted channel to pass data from signal handler. //! //! When trying to communicate data from signal handler to the outside world, one can use an atomic //! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for //! larger data. //! //! This module provides a channel that can be used for that purpose. It is used by certain //! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom //! actions. In general, this is not a ready-made end-user API. //! //! # How does it work //! //! Each channel has a fixed number of slots and two queues (one for empty slots, one for full //! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the //! full one. Outside of signal handler, it can take the value out of the full queue and return the //! slot to the empty queue. //! //! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are //! stored in an atomic variable. //! //! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or //! filled). //! //! # Fallible allocation of a slot //! //! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In //! such case, there's no way to send the new value out of the handler (there's no way to safely //! wait for a slot to appear, because the handler can be blocking the thread that is responsible //! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds //! of signals together if they are not consumed by application fast enough and there are no free //! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole //! system will yield a signal. //! //! This assumes that separate signals don't share the same buffer and that there's only one reader //! (using multiple readers is still safe, but it is possible that all slots would be inside the //! readers, but already empty, so the above argument would not hold). // TODO: Other sizes? Does anyone need more than 5 slots? use std::cell::UnsafeCell; use std::sync::atomic::{AtomicU16, Ordering}; const SLOTS: usize = 5; const BITS: u16 = 3; const MASK: u16 = 0b111; fn get(n: u16, idx: u16) -> u16 { (n >> (BITS * idx)) & MASK } fn set(n: u16, idx: u16, v: u16) -> u16 { let v = v << (BITS * idx); let mask = MASK << (BITS * idx); (n & !mask) | v } fn enqueue(q: &AtomicU16, val: u16) { let mut current = q.load(Ordering::Relaxed); loop { let empty = (0..SLOTS as u16) .find(|i| get(current, *i) == 0) .expect("No empty slot available"); let modified = set(current, empty, val); match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) { Ok(_) => break, Err(changed) => current = changed, // And retry with the changed value } } } fn dequeue(q: &AtomicU16) -> Option { let mut current = q.load(Ordering::Relaxed); loop { let val = current & MASK; // It's completely empty if val == 0 { break None; } let modified = current >> BITS; match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) { Ok(_) => break Some(val), Err(changed) => current = changed, } } } /// A restricted async-signal-safe channel /// /// This is a bit like the usual channel used for inter-thread communication, but with several /// restrictions: /// /// * There's a limited number of slots (currently 5). /// * There's no way to wait for a place in it or for a value. If value is not available, `None` is /// returned. If there's no space for a value, the value is silently dropped. /// /// In exchange for that, all the operations on that channel are async-signal-safe. That means it /// is possible to use it to communicate between a signal handler and the rest of the world with it /// (specifically, it's designed to send information from the handler to the rest of the /// application). The throwing out of values when full is in line with collating of the same type /// in kernel (you should not use the same channel for multiple different signals). /// /// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC /// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal /// at the same time). The channel is not responsible for wakeups. /// /// While the channel is async-signal-safe, you still need to make sure *creating* of the values is /// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc). /// /// The code was *not* tuned for performance (signals are not expected to happen often). pub struct Channel { storage: [UnsafeCell>; SLOTS], empty: AtomicU16, full: AtomicU16, } impl Channel { /// Creates a new channel with nothing in it. pub fn new() -> Self { let storage = Default::default(); let me = Self { storage, empty: AtomicU16::new(0), full: AtomicU16::new(0), }; for i in 1..SLOTS + 1 { enqueue(&me.empty, i as u16); } me } /// Inserts a value into the channel. /// /// If the value doesn't fit, it is silently dropped. Never blocks. pub fn send(&self, val: T) { if let Some(empty_idx) = dequeue(&self.empty) { unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) }; enqueue(&self.full, empty_idx); } } /// Takes a value from the channel. /// /// Or returns `None` if the channel is empty. Never blocks. pub fn recv(&self) -> Option { dequeue(&self.full).map(|idx| { let result = unsafe { &mut *self.storage[idx as usize - 1].get() } .take() .expect("Full slot with nothing in it"); enqueue(&self.empty, idx); result }) } } impl Default for Channel { fn default() -> Self { Self::new() } } unsafe impl Send for Channel {} // Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs // on them. unsafe impl Sync for Channel {} #[cfg(test)] mod tests { use std::sync::Arc; use std::thread; use super::*; #[test] fn new_empty() { let channel = Channel::::new(); assert!(channel.recv().is_none()); assert!(channel.recv().is_none()); } #[test] fn pass_value() { let channel = Channel::new(); channel.send(42); assert_eq!(42, channel.recv().unwrap()); assert!(channel.recv().is_none()); } #[test] fn multiple() { let channel = Channel::new(); for i in 0..1000 { channel.send(i); assert_eq!(i, channel.recv().unwrap()); assert!(channel.recv().is_none()); } } #[test] fn overflow() { let channel = Channel::new(); for i in 0..10 { channel.send(i); } for i in 0..5 { assert_eq!(i, channel.recv().unwrap()); } assert!(channel.recv().is_none()); } #[test] fn multi_thread() { let channel = Arc::new(Channel::::new()); let sender = thread::spawn({ let channel = Arc::clone(&channel); move || { for i in 0..4 { channel.send(i); } } }); let mut results = Vec::new(); while results.len() < 4 { results.extend(channel.recv()); } assert_eq!(vec![0, 1, 2, 3], results); sender.join().unwrap(); } }