use super::*; use slab; use indexmap::{self, IndexMap}; use std::convert::Infallible; use std::fmt; use std::marker::PhantomData; use std::ops; /// Storage for streams #[derive(Debug)] pub(super) struct Store { slab: slab::Slab, ids: IndexMap, } /// "Pointer" to an entry in the store pub(super) struct Ptr<'a> { key: Key, store: &'a mut Store, } /// References an entry in the store. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct Key { index: SlabIndex, /// Keep the stream ID in the key as an ABA guard, since slab indices /// could be re-used with a new stream. stream_id: StreamId, } // We can never have more than `StreamId::MAX` streams in the store, // so we can save a smaller index (u32 vs usize). #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct SlabIndex(u32); #[derive(Debug)] pub(super) struct Queue { indices: Option, _p: PhantomData, } pub(super) trait Next { fn next(stream: &Stream) -> Option; fn set_next(stream: &mut Stream, key: Option); fn take_next(stream: &mut Stream) -> Option; fn is_queued(stream: &Stream) -> bool; fn set_queued(stream: &mut Stream, val: bool); } /// A linked list #[derive(Debug, Clone, Copy)] struct Indices { pub head: Key, pub tail: Key, } pub(super) enum Entry<'a> { Occupied(OccupiedEntry<'a>), Vacant(VacantEntry<'a>), } pub(super) struct OccupiedEntry<'a> { ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>, } pub(super) struct VacantEntry<'a> { ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>, slab: &'a mut slab::Slab, } pub(super) trait Resolve { fn resolve(&mut self, key: Key) -> Ptr; } // ===== impl Store ===== impl Store { pub fn new() -> Self { Store { slab: slab::Slab::new(), ids: IndexMap::new(), } } pub fn find_mut(&mut self, id: &StreamId) -> Option { let index = match self.ids.get(id) { Some(key) => *key, None => return None, }; Some(Ptr { key: Key { index, stream_id: *id, }, store: self, }) } pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { let index = SlabIndex(self.slab.insert(val) as u32); assert!(self.ids.insert(id, index).is_none()); Ptr { key: Key { index, stream_id: id, }, store: self, } } pub fn find_entry(&mut self, id: StreamId) -> Entry { use self::indexmap::map::Entry::*; match self.ids.entry(id) { Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }), Vacant(e) => Entry::Vacant(VacantEntry { ids: e, slab: &mut self.slab, }), } } pub(crate) fn for_each(&mut self, mut f: F) where F: FnMut(Ptr), { match self.try_for_each(|ptr| { f(ptr); Ok::<_, Infallible>(()) }) { Ok(()) => (), Err(infallible) => match infallible {}, } } pub fn try_for_each(&mut self, mut f: F) -> Result<(), E> where F: FnMut(Ptr) -> Result<(), E>, { let mut len = self.ids.len(); let mut i = 0; while i < len { // Get the key by index, this makes the borrow checker happy let (stream_id, index) = { let entry = self.ids.get_index(i).unwrap(); (*entry.0, *entry.1) }; f(Ptr { key: Key { index, stream_id }, store: self, })?; // TODO: This logic probably could be better... let new_len = self.ids.len(); if new_len < len { debug_assert!(new_len == len - 1); len -= 1; } else { i += 1; } } Ok(()) } } impl Resolve for Store { fn resolve(&mut self, key: Key) -> Ptr { Ptr { key, store: self } } } impl ops::Index for Store { type Output = Stream; fn index(&self, key: Key) -> &Self::Output { self.slab .get(key.index.0 as usize) .filter(|s| s.id == key.stream_id) .unwrap_or_else(|| { panic!("dangling store key for stream_id={:?}", key.stream_id); }) } } impl ops::IndexMut for Store { fn index_mut(&mut self, key: Key) -> &mut Self::Output { self.slab .get_mut(key.index.0 as usize) .filter(|s| s.id == key.stream_id) .unwrap_or_else(|| { panic!("dangling store key for stream_id={:?}", key.stream_id); }) } } impl Store { #[cfg(feature = "unstable")] pub fn num_active_streams(&self) -> usize { self.ids.len() } #[cfg(feature = "unstable")] pub fn num_wired_streams(&self) -> usize { self.slab.len() } } // While running h2 unit/integration tests, enable this debug assertion. // // In practice, we don't need to ensure this. But the integration tests // help to make sure we've cleaned up in cases where we could (like, the // runtime isn't suddenly dropping the task for unknown reasons). #[cfg(feature = "unstable")] impl Drop for Store { fn drop(&mut self) { use std::thread; if !thread::panicking() { debug_assert!(self.slab.is_empty()); } } } // ===== impl Queue ===== impl Queue where N: Next, { pub fn new() -> Self { Queue { indices: None, _p: PhantomData, } } pub fn take(&mut self) -> Self { Queue { indices: self.indices.take(), _p: PhantomData, } } /// Queue the stream. /// /// If the stream is already contained by the list, return `false`. pub fn push(&mut self, stream: &mut store::Ptr) -> bool { tracing::trace!("Queue::push"); if N::is_queued(stream) { tracing::trace!(" -> already queued"); return false; } N::set_queued(stream, true); // The next pointer shouldn't be set debug_assert!(N::next(stream).is_none()); // Queue the stream match self.indices { Some(ref mut idxs) => { tracing::trace!(" -> existing entries"); // Update the current tail node to point to `stream` let key = stream.key(); N::set_next(&mut stream.resolve(idxs.tail), Some(key)); // Update the tail pointer idxs.tail = stream.key(); } None => { tracing::trace!(" -> first entry"); self.indices = Some(store::Indices { head: stream.key(), tail: stream.key(), }); } } true } pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> where R: Resolve, { if let Some(mut idxs) = self.indices { let mut stream = store.resolve(idxs.head); if idxs.head == idxs.tail { assert!(N::next(&*stream).is_none()); self.indices = None; } else { idxs.head = N::take_next(&mut *stream).unwrap(); self.indices = Some(idxs); } debug_assert!(N::is_queued(&*stream)); N::set_queued(&mut *stream, false); return Some(stream); } None } pub fn is_empty(&self) -> bool { self.indices.is_none() } pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option> where R: Resolve, F: Fn(&Stream) -> bool, { if let Some(idxs) = self.indices { let should_pop = f(&store.resolve(idxs.head)); if should_pop { return self.pop(store); } } None } } // ===== impl Ptr ===== impl<'a> Ptr<'a> { /// Returns the Key associated with the stream pub fn key(&self) -> Key { self.key } pub fn store_mut(&mut self) -> &mut Store { &mut self.store } /// Remove the stream from the store pub fn remove(self) -> StreamId { // The stream must have been unlinked before this point debug_assert!(!self.store.ids.contains_key(&self.key.stream_id)); // Remove the stream state let stream = self.store.slab.remove(self.key.index.0 as usize); assert_eq!(stream.id, self.key.stream_id); stream.id } /// Remove the StreamId -> stream state association. /// /// This will effectively remove the stream as far as the H2 protocol is /// concerned. pub fn unlink(&mut self) { let id = self.key.stream_id; self.store.ids.swap_remove(&id); } } impl<'a> Resolve for Ptr<'a> { fn resolve(&mut self, key: Key) -> Ptr { Ptr { key, store: &mut *self.store, } } } impl<'a> ops::Deref for Ptr<'a> { type Target = Stream; fn deref(&self) -> &Stream { &self.store[self.key] } } impl<'a> ops::DerefMut for Ptr<'a> { fn deref_mut(&mut self) -> &mut Stream { &mut self.store[self.key] } } impl<'a> fmt::Debug for Ptr<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { (**self).fmt(fmt) } } // ===== impl OccupiedEntry ===== impl<'a> OccupiedEntry<'a> { pub fn key(&self) -> Key { let stream_id = *self.ids.key(); let index = *self.ids.get(); Key { index, stream_id } } } // ===== impl VacantEntry ===== impl<'a> VacantEntry<'a> { pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab let stream_id = value.id; let index = SlabIndex(self.slab.insert(value) as u32); // Insert the handle in the ID map self.ids.insert(index); Key { index, stream_id } } }