use crate::Stream; use std::borrow::Borrow; use std::hash::Hash; use std::pin::Pin; use std::task::{Context, Poll}; /// Combine many streams into one, indexing each source stream with a unique /// key. /// /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source /// streams into a single merged stream that yields values in the order that /// they arrive from the source streams. However, `StreamMap` has a lot more /// flexibility in usage patterns. /// /// `StreamMap` can: /// /// * Merge an arbitrary number of streams. /// * Track which source stream the value was received from. /// * Handle inserting and removing streams from the set of managed streams at /// any point during iteration. /// /// All source streams held by `StreamMap` are indexed using a key. This key is /// included with the value when a source stream yields a value. The key is also /// used to remove the stream from the `StreamMap` before the stream has /// completed streaming. /// /// # `Unpin` /// /// Because the `StreamMap` API moves streams during runtime, both streams and /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to /// pin the stream in the heap. /// /// # Implementation /// /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this /// internal implementation detail will persist in future versions, but it is /// important to know the runtime implications. In general, `StreamMap` works /// best with a "smallish" number of streams as all entries are scanned on /// insert, remove, and polling. In cases where a large number of streams need /// to be merged, it may be advisable to use tasks sending values on a shared /// [`mpsc`] channel. /// /// [`StreamExt::merge`]: crate::StreamExt::merge /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html /// [`Box::pin`]: std::boxed::Box::pin /// /// # Examples /// /// Merging two streams, then remove them after receiving the first value /// /// ``` /// use tokio_stream::{StreamExt, StreamMap, Stream}; /// use tokio::sync::mpsc; /// use std::pin::Pin; /// /// #[tokio::main] /// async fn main() { /// let (tx1, mut rx1) = mpsc::channel::(10); /// let (tx2, mut rx2) = mpsc::channel::(10); /// /// // Convert the channels to a `Stream`. /// let rx1 = Box::pin(async_stream::stream! { /// while let Some(item) = rx1.recv().await { /// yield item; /// } /// }) as Pin + Send>>; /// /// let rx2 = Box::pin(async_stream::stream! { /// while let Some(item) = rx2.recv().await { /// yield item; /// } /// }) as Pin + Send>>; /// /// tokio::spawn(async move { /// tx1.send(1).await.unwrap(); /// /// // This value will never be received. The send may or may not return /// // `Err` depending on if the remote end closed first or not. /// let _ = tx1.send(2).await; /// }); /// /// tokio::spawn(async move { /// tx2.send(3).await.unwrap(); /// let _ = tx2.send(4).await; /// }); /// /// let mut map = StreamMap::new(); /// /// // Insert both streams /// map.insert("one", rx1); /// map.insert("two", rx2); /// /// // Read twice /// for _ in 0..2 { /// let (key, val) = map.next().await.unwrap(); /// /// if key == "one" { /// assert_eq!(val, 1); /// } else { /// assert_eq!(val, 3); /// } /// /// // Remove the stream to prevent reading the next value /// map.remove(key); /// } /// } /// ``` /// /// This example models a read-only client to a chat system with channels. The /// client sends commands to join and leave channels. `StreamMap` is used to /// manage active channel subscriptions. /// /// For simplicity, messages are displayed with `println!`, but they could be /// sent to the client over a socket. /// /// ```no_run /// use tokio_stream::{Stream, StreamExt, StreamMap}; /// /// enum Command { /// Join(String), /// Leave(String), /// } /// /// fn commands() -> impl Stream { /// // Streams in user commands by parsing `stdin`. /// # tokio_stream::pending() /// } /// /// // Join a channel, returns a stream of messages received on the channel. /// fn join(channel: &str) -> impl Stream + Unpin { /// // left as an exercise to the reader /// # tokio_stream::pending() /// } /// /// #[tokio::main] /// async fn main() { /// let mut channels = StreamMap::new(); /// /// // Input commands (join / leave channels). /// let cmds = commands(); /// tokio::pin!(cmds); /// /// loop { /// tokio::select! { /// Some(cmd) = cmds.next() => { /// match cmd { /// Command::Join(chan) => { /// // Join the channel and add it to the `channels` /// // stream map /// let msgs = join(&chan); /// channels.insert(chan, msgs); /// } /// Command::Leave(chan) => { /// channels.remove(&chan); /// } /// } /// } /// Some((chan, msg)) = channels.next() => { /// // Received a message, display it on stdout with the channel /// // it originated from. /// println!("{}: {}", chan, msg); /// } /// // Both the `commands` stream and the `channels` stream are /// // complete. There is no more work to do, so leave the loop. /// else => break, /// } /// } /// } /// ``` #[derive(Debug)] pub struct StreamMap { /// Streams stored in the map entries: Vec<(K, V)>, } impl StreamMap { /// An iterator visiting all key-value pairs in arbitrary order. /// /// The iterator element type is &'a (K, V). /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// /// map.insert("a", pending::()); /// map.insert("b", pending()); /// map.insert("c", pending()); /// /// for (key, stream) in map.iter() { /// println!("({}, {:?})", key, stream); /// } /// ``` pub fn iter(&self) -> impl Iterator { self.entries.iter() } /// An iterator visiting all key-value pairs mutably in arbitrary order. /// /// The iterator element type is &'a mut (K, V). /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// /// map.insert("a", pending::()); /// map.insert("b", pending()); /// map.insert("c", pending()); /// /// for (key, stream) in map.iter_mut() { /// println!("({}, {:?})", key, stream); /// } /// ``` pub fn iter_mut(&mut self) -> impl Iterator { self.entries.iter_mut() } /// Creates an empty `StreamMap`. /// /// The stream map is initially created with a capacity of `0`, so it will /// not allocate until it is first inserted into. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, Pending}; /// /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); /// ``` pub fn new() -> StreamMap { StreamMap { entries: vec![] } } /// Creates an empty `StreamMap` with the specified capacity. /// /// The stream map will be able to hold at least `capacity` elements without /// reallocating. If `capacity` is 0, the stream map will not allocate. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, Pending}; /// /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); /// ``` pub fn with_capacity(capacity: usize) -> StreamMap { StreamMap { entries: Vec::with_capacity(capacity), } } /// Returns an iterator visiting all keys in arbitrary order. /// /// The iterator element type is &'a K. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// /// map.insert("a", pending::()); /// map.insert("b", pending()); /// map.insert("c", pending()); /// /// for key in map.keys() { /// println!("{}", key); /// } /// ``` pub fn keys(&self) -> impl Iterator { self.iter().map(|(k, _)| k) } /// An iterator visiting all values in arbitrary order. /// /// The iterator element type is &'a V. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// /// map.insert("a", pending::()); /// map.insert("b", pending()); /// map.insert("c", pending()); /// /// for stream in map.values() { /// println!("{:?}", stream); /// } /// ``` pub fn values(&self) -> impl Iterator { self.iter().map(|(_, v)| v) } /// An iterator visiting all values mutably in arbitrary order. /// /// The iterator element type is &'a mut V. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// /// map.insert("a", pending::()); /// map.insert("b", pending()); /// map.insert("c", pending()); /// /// for stream in map.values_mut() { /// println!("{:?}", stream); /// } /// ``` pub fn values_mut(&mut self) -> impl Iterator { self.iter_mut().map(|(_, v)| v) } /// Returns the number of streams the map can hold without reallocating. /// /// This number is a lower bound; the `StreamMap` might be able to hold /// more, but is guaranteed to be able to hold at least this many. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, Pending}; /// /// let map: StreamMap> = StreamMap::with_capacity(100); /// assert!(map.capacity() >= 100); /// ``` pub fn capacity(&self) -> usize { self.entries.capacity() } /// Returns the number of streams in the map. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut a = StreamMap::new(); /// assert_eq!(a.len(), 0); /// a.insert(1, pending::()); /// assert_eq!(a.len(), 1); /// ``` pub fn len(&self) -> usize { self.entries.len() } /// Returns `true` if the map contains no elements. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut a = StreamMap::new(); /// assert!(a.is_empty()); /// a.insert(1, pending::()); /// assert!(!a.is_empty()); /// ``` pub fn is_empty(&self) -> bool { self.entries.is_empty() } /// Clears the map, removing all key-stream pairs. Keeps the allocated /// memory for reuse. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut a = StreamMap::new(); /// a.insert(1, pending::()); /// a.clear(); /// assert!(a.is_empty()); /// ``` pub fn clear(&mut self) { self.entries.clear(); } /// Insert a key-stream pair into the map. /// /// If the map did not have this key present, `None` is returned. /// /// If the map did have this key present, the new `stream` replaces the old /// one and the old stream is returned. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// /// assert!(map.insert(37, pending::()).is_none()); /// assert!(!map.is_empty()); /// /// map.insert(37, pending()); /// assert!(map.insert(37, pending()).is_some()); /// ``` pub fn insert(&mut self, k: K, stream: V) -> Option where K: Hash + Eq, { let ret = self.remove(&k); self.entries.push((k, stream)); ret } /// Removes a key from the map, returning the stream at the key if the key was previously in the map. /// /// The key may be any borrowed form of the map's key type, but `Hash` and /// `Eq` on the borrowed form must match those for the key type. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// map.insert(1, pending::()); /// assert!(map.remove(&1).is_some()); /// assert!(map.remove(&1).is_none()); /// ``` pub fn remove(&mut self, k: &Q) -> Option where K: Borrow, Q: Hash + Eq, { for i in 0..self.entries.len() { if self.entries[i].0.borrow() == k { return Some(self.entries.swap_remove(i).1); } } None } /// Returns `true` if the map contains a stream for the specified key. /// /// The key may be any borrowed form of the map's key type, but `Hash` and /// `Eq` on the borrowed form must match those for the key type. /// /// # Examples /// /// ``` /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// map.insert(1, pending::()); /// assert_eq!(map.contains_key(&1), true); /// assert_eq!(map.contains_key(&2), false); /// ``` pub fn contains_key(&self, k: &Q) -> bool where K: Borrow, Q: Hash + Eq, { for i in 0..self.entries.len() { if self.entries[i].0.borrow() == k { return true; } } false } } impl StreamMap where K: Unpin, V: Stream + Unpin, { /// Polls the next value, includes the vec entry index fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll> { use Poll::*; let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; let mut idx = start; for _ in 0..self.entries.len() { let (_, stream) = &mut self.entries[idx]; match Pin::new(stream).poll_next(cx) { Ready(Some(val)) => return Ready(Some((idx, val))), Ready(None) => { // Remove the entry self.entries.swap_remove(idx); // Check if this was the last entry, if so the cursor needs // to wrap if idx == self.entries.len() { idx = 0; } else if idx < start && start <= self.entries.len() { // The stream being swapped into the current index has // already been polled, so skip it. idx = idx.wrapping_add(1) % self.entries.len(); } } Pending => { idx = idx.wrapping_add(1) % self.entries.len(); } } } // If the map is empty, then the stream is complete. if self.entries.is_empty() { Ready(None) } else { Pending } } } impl Default for StreamMap { fn default() -> Self { Self::new() } } impl Stream for StreamMap where K: Clone + Unpin, V: Stream + Unpin, { type Item = (K, V::Item); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { let key = self.entries[idx].0.clone(); Poll::Ready(Some((key, val))) } else { Poll::Ready(None) } } fn size_hint(&self) -> (usize, Option) { let mut ret = (0, Some(0)); for (_, stream) in &self.entries { let hint = stream.size_hint(); ret.0 += hint.0; match (ret.1, hint.1) { (Some(a), Some(b)) => ret.1 = Some(a + b), (Some(_), None) => ret.1 = None, _ => {} } } ret } } impl std::iter::FromIterator<(K, V)> for StreamMap where K: Hash + Eq, { fn from_iter>(iter: T) -> Self { let iterator = iter.into_iter(); let (lower_bound, _) = iterator.size_hint(); let mut stream_map = Self::with_capacity(lower_bound); for (key, value) in iterator { stream_map.insert(key, value); } stream_map } } impl Extend<(K, V)> for StreamMap { fn extend(&mut self, iter: T) where T: IntoIterator, { self.entries.extend(iter); } } mod rand { use std::cell::Cell; mod loom { #[cfg(not(loom))] pub(crate) mod rand { use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hash, Hasher}; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering::Relaxed; static COUNTER: AtomicU32 = AtomicU32::new(1); pub(crate) fn seed() -> u64 { let rand_state = RandomState::new(); let mut hasher = rand_state.build_hasher(); // Hash some unique-ish data to generate some new state COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); // Get the seed hasher.finish() } } #[cfg(loom)] pub(crate) mod rand { pub(crate) fn seed() -> u64 { 1 } } } /// Fast random number generate /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's /// Xorshift paper: /// This generator passes the SmallCrush suite, part of TestU01 framework: /// #[derive(Debug)] pub(crate) struct FastRand { one: Cell, two: Cell, } impl FastRand { /// Initialize a new, thread-local, fast random number generator. pub(crate) fn new(seed: u64) -> FastRand { let one = (seed >> 32) as u32; let mut two = seed as u32; if two == 0 { // This value cannot be zero two = 1; } FastRand { one: Cell::new(one), two: Cell::new(two), } } pub(crate) fn fastrand_n(&self, n: u32) -> u32 { // This is similar to fastrand() % n, but faster. // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ let mul = (self.fastrand() as u64).wrapping_mul(n as u64); (mul >> 32) as u32 } fn fastrand(&self) -> u32 { let mut s1 = self.one.get(); let s0 = self.two.get(); s1 ^= s1 << 17; s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; self.one.set(s0); self.two.set(s1); s0.wrapping_add(s1) } } // Used by `StreamMap` pub(crate) fn thread_rng_n(n: u32) -> u32 { thread_local! { static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); } THREAD_RNG.with(|rng| rng.fastrand_n(n)) } }