diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio-stream/src/stream_map.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_map.rs')
-rw-r--r-- | third_party/rust/tokio-stream/src/stream_map.rs | 690 |
1 files changed, 690 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_map.rs b/third_party/rust/tokio-stream/src/stream_map.rs new file mode 100644 index 0000000000..215980474b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_map.rs @@ -0,0 +1,690 @@ +use crate::Stream; + +use std::borrow::Borrow; +use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Combine many streams into one, indexing each source stream with a unique +/// key. +/// +/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source +/// streams into a single merged stream that yields values in the order that +/// they arrive from the source streams. However, `StreamMap` has a lot more +/// flexibility in usage patterns. +/// +/// `StreamMap` can: +/// +/// * Merge an arbitrary number of streams. +/// * Track which source stream the value was received from. +/// * Handle inserting and removing streams from the set of managed streams at +/// any point during iteration. +/// +/// All source streams held by `StreamMap` are indexed using a key. This key is +/// included with the value when a source stream yields a value. The key is also +/// used to remove the stream from the `StreamMap` before the stream has +/// completed streaming. +/// +/// # `Unpin` +/// +/// Because the `StreamMap` API moves streams during runtime, both streams and +/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a +/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to +/// pin the stream in the heap. +/// +/// # Implementation +/// +/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this +/// internal implementation detail will persist in future versions, but it is +/// important to know the runtime implications. In general, `StreamMap` works +/// best with a "smallish" number of streams as all entries are scanned on +/// insert, remove, and polling. In cases where a large number of streams need +/// to be merged, it may be advisable to use tasks sending values on a shared +/// [`mpsc`] channel. +/// +/// [`StreamExt::merge`]: crate::StreamExt::merge +/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html +/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html +/// [`Box::pin`]: std::boxed::Box::pin +/// +/// # Examples +/// +/// Merging two streams, then remove them after receiving the first value +/// +/// ``` +/// use tokio_stream::{StreamExt, StreamMap, Stream}; +/// use tokio::sync::mpsc; +/// use std::pin::Pin; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx1, mut rx1) = mpsc::channel::<usize>(10); +/// let (tx2, mut rx2) = mpsc::channel::<usize>(10); +/// +/// // Convert the channels to a `Stream`. +/// let rx1 = Box::pin(async_stream::stream! { +/// while let Some(item) = rx1.recv().await { +/// yield item; +/// } +/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; +/// +/// let rx2 = Box::pin(async_stream::stream! { +/// while let Some(item) = rx2.recv().await { +/// yield item; +/// } +/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; +/// +/// tokio::spawn(async move { +/// tx1.send(1).await.unwrap(); +/// +/// // This value will never be received. The send may or may not return +/// // `Err` depending on if the remote end closed first or not. +/// let _ = tx1.send(2).await; +/// }); +/// +/// tokio::spawn(async move { +/// tx2.send(3).await.unwrap(); +/// let _ = tx2.send(4).await; +/// }); +/// +/// let mut map = StreamMap::new(); +/// +/// // Insert both streams +/// map.insert("one", rx1); +/// map.insert("two", rx2); +/// +/// // Read twice +/// for _ in 0..2 { +/// let (key, val) = map.next().await.unwrap(); +/// +/// if key == "one" { +/// assert_eq!(val, 1); +/// } else { +/// assert_eq!(val, 3); +/// } +/// +/// // Remove the stream to prevent reading the next value +/// map.remove(key); +/// } +/// } +/// ``` +/// +/// This example models a read-only client to a chat system with channels. The +/// client sends commands to join and leave channels. `StreamMap` is used to +/// manage active channel subscriptions. +/// +/// For simplicity, messages are displayed with `println!`, but they could be +/// sent to the client over a socket. +/// +/// ```no_run +/// use tokio_stream::{Stream, StreamExt, StreamMap}; +/// +/// enum Command { +/// Join(String), +/// Leave(String), +/// } +/// +/// fn commands() -> impl Stream<Item = Command> { +/// // Streams in user commands by parsing `stdin`. +/// # tokio_stream::pending() +/// } +/// +/// // Join a channel, returns a stream of messages received on the channel. +/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { +/// // left as an exercise to the reader +/// # tokio_stream::pending() +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let mut channels = StreamMap::new(); +/// +/// // Input commands (join / leave channels). +/// let cmds = commands(); +/// tokio::pin!(cmds); +/// +/// loop { +/// tokio::select! { +/// Some(cmd) = cmds.next() => { +/// match cmd { +/// Command::Join(chan) => { +/// // Join the channel and add it to the `channels` +/// // stream map +/// let msgs = join(&chan); +/// channels.insert(chan, msgs); +/// } +/// Command::Leave(chan) => { +/// channels.remove(&chan); +/// } +/// } +/// } +/// Some((chan, msg)) = channels.next() => { +/// // Received a message, display it on stdout with the channel +/// // it originated from. +/// println!("{}: {}", chan, msg); +/// } +/// // Both the `commands` stream and the `channels` stream are +/// // complete. There is no more work to do, so leave the loop. +/// else => break, +/// } +/// } +/// } +/// ``` +#[derive(Debug)] +pub struct StreamMap<K, V> { + /// Streams stored in the map + entries: Vec<(K, V)>, +} + +impl<K, V> StreamMap<K, V> { + /// An iterator visiting all key-value pairs in arbitrary order. + /// + /// The iterator element type is &'a (K, V). + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for (key, stream) in map.iter() { + /// println!("({}, {:?})", key, stream); + /// } + /// ``` + pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { + self.entries.iter() + } + + /// An iterator visiting all key-value pairs mutably in arbitrary order. + /// + /// The iterator element type is &'a mut (K, V). + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for (key, stream) in map.iter_mut() { + /// println!("({}, {:?})", key, stream); + /// } + /// ``` + pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { + self.entries.iter_mut() + } + + /// Creates an empty `StreamMap`. + /// + /// The stream map is initially created with a capacity of `0`, so it will + /// not allocate until it is first inserted into. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); + /// ``` + pub fn new() -> StreamMap<K, V> { + StreamMap { entries: vec![] } + } + + /// Creates an empty `StreamMap` with the specified capacity. + /// + /// The stream map will be able to hold at least `capacity` elements without + /// reallocating. If `capacity` is 0, the stream map will not allocate. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); + /// ``` + pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { + StreamMap { + entries: Vec::with_capacity(capacity), + } + } + + /// Returns an iterator visiting all keys in arbitrary order. + /// + /// The iterator element type is &'a K. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for key in map.keys() { + /// println!("{}", key); + /// } + /// ``` + pub fn keys(&self) -> impl Iterator<Item = &K> { + self.iter().map(|(k, _)| k) + } + + /// An iterator visiting all values in arbitrary order. + /// + /// The iterator element type is &'a V. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for stream in map.values() { + /// println!("{:?}", stream); + /// } + /// ``` + pub fn values(&self) -> impl Iterator<Item = &V> { + self.iter().map(|(_, v)| v) + } + + /// An iterator visiting all values mutably in arbitrary order. + /// + /// The iterator element type is &'a mut V. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for stream in map.values_mut() { + /// println!("{:?}", stream); + /// } + /// ``` + pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { + self.iter_mut().map(|(_, v)| v) + } + + /// Returns the number of streams the map can hold without reallocating. + /// + /// This number is a lower bound; the `StreamMap` might be able to hold + /// more, but is guaranteed to be able to hold at least this many. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); + /// assert!(map.capacity() >= 100); + /// ``` + pub fn capacity(&self) -> usize { + self.entries.capacity() + } + + /// Returns the number of streams in the map. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// assert_eq!(a.len(), 0); + /// a.insert(1, pending::<i32>()); + /// assert_eq!(a.len(), 1); + /// ``` + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Returns `true` if the map contains no elements. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// assert!(a.is_empty()); + /// a.insert(1, pending::<i32>()); + /// assert!(!a.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Clears the map, removing all key-stream pairs. Keeps the allocated + /// memory for reuse. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// a.insert(1, pending::<i32>()); + /// a.clear(); + /// assert!(a.is_empty()); + /// ``` + pub fn clear(&mut self) { + self.entries.clear(); + } + + /// Insert a key-stream pair into the map. + /// + /// If the map did not have this key present, `None` is returned. + /// + /// If the map did have this key present, the new `stream` replaces the old + /// one and the old stream is returned. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// assert!(map.insert(37, pending::<i32>()).is_none()); + /// assert!(!map.is_empty()); + /// + /// map.insert(37, pending()); + /// assert!(map.insert(37, pending()).is_some()); + /// ``` + pub fn insert(&mut self, k: K, stream: V) -> Option<V> + where + K: Hash + Eq, + { + let ret = self.remove(&k); + self.entries.push((k, stream)); + + ret + } + + /// Removes a key from the map, returning the stream at the key if the key was previously in the map. + /// + /// The key may be any borrowed form of the map's key type, but `Hash` and + /// `Eq` on the borrowed form must match those for the key type. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// map.insert(1, pending::<i32>()); + /// assert!(map.remove(&1).is_some()); + /// assert!(map.remove(&1).is_none()); + /// ``` + pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> + where + K: Borrow<Q>, + Q: Hash + Eq, + { + for i in 0..self.entries.len() { + if self.entries[i].0.borrow() == k { + return Some(self.entries.swap_remove(i).1); + } + } + + None + } + + /// Returns `true` if the map contains a stream for the specified key. + /// + /// The key may be any borrowed form of the map's key type, but `Hash` and + /// `Eq` on the borrowed form must match those for the key type. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// map.insert(1, pending::<i32>()); + /// assert_eq!(map.contains_key(&1), true); + /// assert_eq!(map.contains_key(&2), false); + /// ``` + pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool + where + K: Borrow<Q>, + Q: Hash + Eq, + { + for i in 0..self.entries.len() { + if self.entries[i].0.borrow() == k { + return true; + } + } + + false + } +} + +impl<K, V> StreamMap<K, V> +where + K: Unpin, + V: Stream + Unpin, +{ + /// Polls the next value, includes the vec entry index + fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { + use Poll::*; + + let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; + let mut idx = start; + + for _ in 0..self.entries.len() { + let (_, stream) = &mut self.entries[idx]; + + match Pin::new(stream).poll_next(cx) { + Ready(Some(val)) => return Ready(Some((idx, val))), + Ready(None) => { + // Remove the entry + self.entries.swap_remove(idx); + + // Check if this was the last entry, if so the cursor needs + // to wrap + if idx == self.entries.len() { + idx = 0; + } else if idx < start && start <= self.entries.len() { + // The stream being swapped into the current index has + // already been polled, so skip it. + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + Pending => { + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + } + + // If the map is empty, then the stream is complete. + if self.entries.is_empty() { + Ready(None) + } else { + Pending + } + } +} + +impl<K, V> Default for StreamMap<K, V> { + fn default() -> Self { + Self::new() + } +} + +impl<K, V> Stream for StreamMap<K, V> +where + K: Clone + Unpin, + V: Stream + Unpin, +{ + type Item = (K, V::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { + let key = self.entries[idx].0.clone(); + Poll::Ready(Some((key, val))) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let mut ret = (0, Some(0)); + + for (_, stream) in &self.entries { + let hint = stream.size_hint(); + + ret.0 += hint.0; + + match (ret.1, hint.1) { + (Some(a), Some(b)) => ret.1 = Some(a + b), + (Some(_), None) => ret.1 = None, + _ => {} + } + } + + ret + } +} + +impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V> +where + K: Hash + Eq, +{ + fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { + let iterator = iter.into_iter(); + let (lower_bound, _) = iterator.size_hint(); + let mut stream_map = Self::with_capacity(lower_bound); + + for (key, value) in iterator { + stream_map.insert(key, value); + } + + stream_map + } +} + +impl<K, V> Extend<(K, V)> for StreamMap<K, V> { + fn extend<T>(&mut self, iter: T) + where + T: IntoIterator<Item = (K, V)>, + { + self.entries.extend(iter); + } +} + +mod rand { + use std::cell::Cell; + + mod loom { + #[cfg(not(loom))] + pub(crate) mod rand { + use std::collections::hash_map::RandomState; + use std::hash::{BuildHasher, Hash, Hasher}; + use std::sync::atomic::AtomicU32; + use std::sync::atomic::Ordering::Relaxed; + + static COUNTER: AtomicU32 = AtomicU32::new(1); + + pub(crate) fn seed() -> u64 { + let rand_state = RandomState::new(); + + let mut hasher = rand_state.build_hasher(); + + // Hash some unique-ish data to generate some new state + COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); + + // Get the seed + hasher.finish() + } + } + + #[cfg(loom)] + pub(crate) mod rand { + pub(crate) fn seed() -> u64 { + 1 + } + } + } + + /// Fast random number generate + /// + /// Implement xorshift64+: 2 32-bit xorshift sequences added together. + /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's + /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> + /// This generator passes the SmallCrush suite, part of TestU01 framework: + /// <http://simul.iro.umontreal.ca/testu01/tu01.html> + #[derive(Debug)] + pub(crate) struct FastRand { + one: Cell<u32>, + two: Cell<u32>, + } + + impl FastRand { + /// Initialize a new, thread-local, fast random number generator. + pub(crate) fn new(seed: u64) -> FastRand { + let one = (seed >> 32) as u32; + let mut two = seed as u32; + + if two == 0 { + // This value cannot be zero + two = 1; + } + + FastRand { + one: Cell::new(one), + two: Cell::new(two), + } + } + + pub(crate) fn fastrand_n(&self, n: u32) -> u32 { + // This is similar to fastrand() % n, but faster. + // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + let mul = (self.fastrand() as u64).wrapping_mul(n as u64); + (mul >> 32) as u32 + } + + fn fastrand(&self) -> u32 { + let mut s1 = self.one.get(); + let s0 = self.two.get(); + + s1 ^= s1 << 17; + s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; + + self.one.set(s0); + self.two.set(s1); + + s0.wrapping_add(s1) + } + } + + // Used by `StreamMap` + pub(crate) fn thread_rng_n(n: u32) -> u32 { + thread_local! { + static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); + } + + THREAD_RNG.with(|rng| rng.fastrand_n(n)) + } +} |