use std::collections::{HashMap, HashSet, VecDeque}; use std::error::Error as StdError; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, Weak}; #[cfg(not(feature = "runtime"))] use std::time::{Duration, Instant}; use futures_channel::oneshot; #[cfg(feature = "runtime")] use tokio::time::{Duration, Instant, Interval}; use tracing::{debug, trace}; use super::client::Ver; use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] pub(super) struct Pool { // If the pool is disabled, this is None. inner: Option>>>, } // Before using a pooled connection, make sure the sender is not dead. // // This is a trait to allow the `client::pool::tests` to work for `i32`. // // See https://github.com/hyperium/hyper/issues/1429 pub(super) trait Poolable: Unpin + Send + Sized + 'static { fn is_open(&self) -> bool; /// Reserve this connection. /// /// Allows for HTTP/2 to return a shared reservation. fn reserve(self) -> Reservation; fn can_share(&self) -> bool; } /// When checking out a pooled connection, it might be that the connection /// only supports a single reservation, or it might be usable for many. /// /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be /// used for multiple requests. // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] pub(super) enum Reservation { /// This connection could be used multiple times, the first one will be /// reinserted into the `idle` pool, and the second will be given to /// the `Checkout`. #[cfg(feature = "http2")] Shared(T, T), /// This connection requires unique access. It will be returned after /// use is complete. Unique(T), } /// Simple type alias in case the key type needs to be adjusted. pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc; struct PoolInner { // A flag that a connection is being established, and the connection // should be shared. This prevents making multiple HTTP/2 connections // to the same host. connecting: HashSet, // These are internal Conns sitting in the event loop in the KeepAlive // state, waiting to receive a new Request to send on the socket. idle: HashMap>>, max_idle_per_host: usize, // These are outstanding Checkouts that are waiting for a socket to be // able to send a Request one. This is used when "racing" for a new // connection. // // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait // for the Pool to receive an idle Conn. When a Conn becomes idle, // this list is checked for any parked Checkouts, and tries to notify // them that the Conn could be used instead of waiting for a brand new // connection. waiters: HashMap>>, // A oneshot channel is used to allow the interval to be notified when // the Pool completely drops. That way, the interval can cancel immediately. #[cfg(feature = "runtime")] idle_interval_ref: Option>, #[cfg(feature = "runtime")] exec: Exec, timeout: Option, } // This is because `Weak::new()` *allocates* space for `T`, even if it // doesn't need it! struct WeakOpt(Option>); #[derive(Clone, Copy, Debug)] pub(super) struct Config { pub(super) idle_timeout: Option, pub(super) max_idle_per_host: usize, } impl Config { pub(super) fn is_enabled(&self) -> bool { self.max_idle_per_host > 0 } } impl Pool { pub(super) fn new(config: Config, __exec: &Exec) -> Pool { let inner = if config.is_enabled() { Some(Arc::new(Mutex::new(PoolInner { connecting: HashSet::new(), idle: HashMap::new(), #[cfg(feature = "runtime")] idle_interval_ref: None, max_idle_per_host: config.max_idle_per_host, waiters: HashMap::new(), #[cfg(feature = "runtime")] exec: __exec.clone(), timeout: config.idle_timeout, }))) } else { None }; Pool { inner } } fn is_enabled(&self) -> bool { self.inner.is_some() } #[cfg(test)] pub(super) fn no_timer(&self) { // Prevent an actual interval from being created for this pool... #[cfg(feature = "runtime")] { let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); let (tx, _) = oneshot::channel(); inner.idle_interval_ref = Some(tx); } } } impl Pool { /// Returns a `Checkout` which is a future that resolves if an idle /// connection becomes available. pub(super) fn checkout(&self, key: Key) -> Checkout { Checkout { key, pool: self.clone(), waiter: None, } } /// Ensure that there is only ever 1 connecting task for HTTP/2 /// connections. This does nothing for HTTP/1. pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option> { if ver == Ver::Http2 { if let Some(ref enabled) = self.inner { let mut inner = enabled.lock().unwrap(); return if inner.connecting.insert(key.clone()) { let connecting = Connecting { key: key.clone(), pool: WeakOpt::downgrade(enabled), }; Some(connecting) } else { trace!("HTTP/2 connecting already in progress for {:?}", key); None }; } } // else Some(Connecting { key: key.clone(), // in HTTP/1's case, there is never a lock, so we don't // need to do anything in Drop. pool: WeakOpt::none(), }) } #[cfg(test)] fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner> { self.inner.as_ref().expect("enabled").lock().expect("lock") } /* Used in client/tests.rs... #[cfg(feature = "runtime")] #[cfg(test)] pub(super) fn h1_key(&self, s: &str) -> Key { Arc::new(s.to_string()) } #[cfg(feature = "runtime")] #[cfg(test)] pub(super) fn idle_count(&self, key: &Key) -> usize { self .locked() .idle .get(key) .map(|list| list.len()) .unwrap_or(0) } */ pub(super) fn pooled( &self, #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting, value: T, ) -> Pooled { let (value, pool_ref) = if let Some(ref enabled) = self.inner { match value.reserve() { #[cfg(feature = "http2")] Reservation::Shared(to_insert, to_return) => { let mut inner = enabled.lock().unwrap(); inner.put(connecting.key.clone(), to_insert, enabled); // Do this here instead of Drop for Connecting because we // already have a lock, no need to lock the mutex twice. inner.connected(&connecting.key); // prevent the Drop of Connecting from repeating inner.connected() connecting.pool = WeakOpt::none(); // Shared reservations don't need a reference to the pool, // since the pool always keeps a copy. (to_return, WeakOpt::none()) } Reservation::Unique(value) => { // Unique reservations must take a reference to the pool // since they hope to reinsert once the reservation is // completed (value, WeakOpt::downgrade(enabled)) } } } else { // If pool is not enabled, skip all the things... // The Connecting should have had no pool ref debug_assert!(connecting.pool.upgrade().is_none()); (value, WeakOpt::none()) }; Pooled { key: connecting.key.clone(), is_reused: false, pool: pool_ref, value: Some(value), } } fn reuse(&self, key: &Key, value: T) -> Pooled { debug!("reuse idle connection for {:?}", key); // TODO: unhack this // In Pool::pooled(), which is used for inserting brand new connections, // there's some code that adjusts the pool reference taken depending // on if the Reservation can be shared or is unique. By the time // reuse() is called, the reservation has already been made, and // we just have the final value, without knowledge of if this is // unique or shared. So, the hack is to just assume Ver::Http2 means // shared... :( let mut pool_ref = WeakOpt::none(); if !value.can_share() { if let Some(ref enabled) = self.inner { pool_ref = WeakOpt::downgrade(enabled); } } Pooled { is_reused: true, key: key.clone(), pool: pool_ref, value: Some(value), } } } /// Pop off this list, looking for a usable connection that hasn't expired. struct IdlePopper<'a, T> { key: &'a Key, list: &'a mut Vec>, } impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { fn pop(self, expiration: &Expiration) -> Option> { while let Some(entry) = self.list.pop() { // If the connection has been closed, or is older than our idle // timeout, simply drop it and keep looking... if !entry.value.is_open() { trace!("removing closed connection for {:?}", self.key); continue; } // TODO: Actually, since the `idle` list is pushed to the end always, // that would imply that if *this* entry is expired, then anything // "earlier" in the list would *have* to be expired also... Right? // // In that case, we could just break out of the loop and drop the // whole list... if expiration.expires(entry.idle_at) { trace!("removing expired connection for {:?}", self.key); continue; } let value = match entry.value.reserve() { #[cfg(feature = "http2")] Reservation::Shared(to_reinsert, to_checkout) => { self.list.push(Idle { idle_at: Instant::now(), value: to_reinsert, }); to_checkout } Reservation::Unique(unique) => unique, }; return Some(Idle { idle_at: entry.idle_at, value, }); } None } } impl PoolInner { fn put(&mut self, key: Key, value: T, __pool_ref: &Arc>>) { if value.can_share() && self.idle.contains_key(&key) { trace!("put; existing idle HTTP/2 connection for {:?}", key); return; } trace!("put; add idle connection for {:?}", key); let mut remove_waiters = false; let mut value = Some(value); if let Some(waiters) = self.waiters.get_mut(&key) { while let Some(tx) = waiters.pop_front() { if !tx.is_canceled() { let reserved = value.take().expect("value already sent"); let reserved = match reserved.reserve() { #[cfg(feature = "http2")] Reservation::Shared(to_keep, to_send) => { value = Some(to_keep); to_send } Reservation::Unique(uniq) => uniq, }; match tx.send(reserved) { Ok(()) => { if value.is_none() { break; } else { continue; } } Err(e) => { value = Some(e); } } } trace!("put; removing canceled waiter for {:?}", key); } remove_waiters = waiters.is_empty(); } if remove_waiters { self.waiters.remove(&key); } match value { Some(value) => { // borrow-check scope... { let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); if self.max_idle_per_host <= idle_list.len() { trace!("max idle per host for {:?}, dropping connection", key); return; } debug!("pooling idle connection for {:?}", key); idle_list.push(Idle { value, idle_at: Instant::now(), }); } #[cfg(feature = "runtime")] { self.spawn_idle_interval(__pool_ref); } } None => trace!("put; found waiter for {:?}", key), } } /// A `Connecting` task is complete. Not necessarily successfully, /// but the lock is going away, so clean up. fn connected(&mut self, key: &Key) { let existed = self.connecting.remove(key); debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); // cancel any waiters. if there are any, it's because // this Connecting task didn't complete successfully. // those waiters would never receive a connection. self.waiters.remove(key); } #[cfg(feature = "runtime")] fn spawn_idle_interval(&mut self, pool_ref: &Arc>>) { let (dur, rx) = { if self.idle_interval_ref.is_some() { return; } if let Some(dur) = self.timeout { let (tx, rx) = oneshot::channel(); self.idle_interval_ref = Some(tx); (dur, rx) } else { return; } }; let interval = IdleTask { interval: tokio::time::interval(dur), pool: WeakOpt::downgrade(pool_ref), pool_drop_notifier: rx, }; self.exec.execute(interval); } } impl PoolInner { /// Any `FutureResponse`s that were created will have made a `Checkout`, /// and possibly inserted into the pool that it is waiting for an idle /// connection. If a user ever dropped that future, we need to clean out /// those parked senders. fn clean_waiters(&mut self, key: &Key) { let mut remove_waiters = false; if let Some(waiters) = self.waiters.get_mut(key) { waiters.retain(|tx| !tx.is_canceled()); remove_waiters = waiters.is_empty(); } if remove_waiters { self.waiters.remove(key); } } } #[cfg(feature = "runtime")] impl PoolInner { /// This should *only* be called by the IdleTask fn clear_expired(&mut self) { let dur = self.timeout.expect("interval assumes timeout"); let now = Instant::now(); //self.last_idle_check_at = now; self.idle.retain(|key, values| { values.retain(|entry| { if !entry.value.is_open() { trace!("idle interval evicting closed for {:?}", key); return false; } // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. if now.saturating_duration_since(entry.idle_at) > dur { trace!("idle interval evicting expired for {:?}", key); return false; } // Otherwise, keep this value... true }); // returning false evicts this key/val !values.is_empty() }); } } impl Clone for Pool { fn clone(&self) -> Pool { Pool { inner: self.inner.clone(), } } } /// A wrapped poolable value that tries to reinsert to the Pool on Drop. // Note: The bounds `T: Poolable` is needed for the Drop impl. pub(super) struct Pooled { value: Option, is_reused: bool, key: Key, pool: WeakOpt>>, } impl Pooled { pub(super) fn is_reused(&self) -> bool { self.is_reused } pub(super) fn is_pool_enabled(&self) -> bool { self.pool.0.is_some() } fn as_ref(&self) -> &T { self.value.as_ref().expect("not dropped") } fn as_mut(&mut self) -> &mut T { self.value.as_mut().expect("not dropped") } } impl Deref for Pooled { type Target = T; fn deref(&self) -> &T { self.as_ref() } } impl DerefMut for Pooled { fn deref_mut(&mut self) -> &mut T { self.as_mut() } } impl Drop for Pooled { fn drop(&mut self) { if let Some(value) = self.value.take() { if !value.is_open() { // If we *already* know the connection is done here, // it shouldn't be re-inserted back into the pool. return; } if let Some(pool) = self.pool.upgrade() { if let Ok(mut inner) = pool.lock() { inner.put(self.key.clone(), value, &pool); } } else if !value.can_share() { trace!("pool dropped, dropping pooled ({:?})", self.key); } // Ver::Http2 is already in the Pool (or dead), so we wouldn't // have an actual reference to the Pool. } } } impl fmt::Debug for Pooled { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Pooled").field("key", &self.key).finish() } } struct Idle { idle_at: Instant, value: T, } // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] pub(super) struct Checkout { key: Key, pool: Pool, waiter: Option>, } #[derive(Debug)] pub(super) struct CheckoutIsClosedError; impl StdError for CheckoutIsClosedError {} impl fmt::Display for CheckoutIsClosedError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("checked out connection was closed") } } impl Checkout { fn poll_waiter( &mut self, cx: &mut task::Context<'_>, ) -> Poll>>> { if let Some(mut rx) = self.waiter.take() { match Pin::new(&mut rx).poll(cx) { Poll::Ready(Ok(value)) => { if value.is_open() { Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) } else { Poll::Ready(Some(Err( crate::Error::new_canceled().with(CheckoutIsClosedError) ))) } } Poll::Pending => { self.waiter = Some(rx); Poll::Pending } Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( crate::Error::new_canceled().with("request has been canceled") ))), } } else { Poll::Ready(None) } } fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option> { let entry = { let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); let expiration = Expiration::new(inner.timeout); let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); // A block to end the mutable borrow on list, // so the map below can check is_empty() { let popper = IdlePopper { key: &self.key, list, }; popper.pop(&expiration) } .map(|e| (e, list.is_empty())) }); let (entry, empty) = if let Some((e, empty)) = maybe_entry { (Some(e), empty) } else { // No entry found means nuke the list for sure. (None, true) }; if empty { //TODO: This could be done with the HashMap::entry API instead. inner.idle.remove(&self.key); } if entry.is_none() && self.waiter.is_none() { let (tx, mut rx) = oneshot::channel(); trace!("checkout waiting for idle connection: {:?}", self.key); inner .waiters .entry(self.key.clone()) .or_insert_with(VecDeque::new) .push_back(tx); // register the waker with this oneshot assert!(Pin::new(&mut rx).poll(cx).is_pending()); self.waiter = Some(rx); } entry }; entry.map(|e| self.pool.reuse(&self.key, e.value)) } } impl Future for Checkout { type Output = crate::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { if let Some(pooled) = ready!(self.poll_waiter(cx)?) { return Poll::Ready(Ok(pooled)); } if let Some(pooled) = self.checkout(cx) { Poll::Ready(Ok(pooled)) } else if !self.pool.is_enabled() { Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled"))) } else { // There's a new waiter, already registered in self.checkout() debug_assert!(self.waiter.is_some()); Poll::Pending } } } impl Drop for Checkout { fn drop(&mut self) { if self.waiter.take().is_some() { trace!("checkout dropped for {:?}", self.key); if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { inner.clean_waiters(&self.key); } } } } // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] pub(super) struct Connecting { key: Key, pool: WeakOpt>>, } impl Connecting { pub(super) fn alpn_h2(self, pool: &Pool) -> Option { debug_assert!( self.pool.0.is_none(), "Connecting::alpn_h2 but already Http2" ); pool.connecting(&self.key, Ver::Http2) } } impl Drop for Connecting { fn drop(&mut self) { if let Some(pool) = self.pool.upgrade() { // No need to panic on drop, that could abort! if let Ok(mut inner) = pool.lock() { inner.connected(&self.key); } } } } struct Expiration(Option); impl Expiration { fn new(dur: Option) -> Expiration { Expiration(dur) } fn expires(&self, instant: Instant) -> bool { match self.0 { // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout, None => false, } } } #[cfg(feature = "runtime")] pin_project_lite::pin_project! { struct IdleTask { #[pin] interval: Interval, pool: WeakOpt>>, // This allows the IdleTask to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, // but Err(Canceled) will be received when the Pool is dropped. #[pin] pool_drop_notifier: oneshot::Receiver, } } #[cfg(feature = "runtime")] impl Future for IdleTask { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut this = self.project(); loop { match this.pool_drop_notifier.as_mut().poll(cx) { Poll::Ready(Ok(n)) => match n {}, Poll::Pending => (), Poll::Ready(Err(_canceled)) => { trace!("pool closed, canceling idle interval"); return Poll::Ready(()); } } ready!(this.interval.as_mut().poll_tick(cx)); if let Some(inner) = this.pool.upgrade() { if let Ok(mut inner) = inner.lock() { trace!("idle interval checking for expired"); inner.clear_expired(); continue; } } return Poll::Ready(()); } } } impl WeakOpt { fn none() -> Self { WeakOpt(None) } fn downgrade(arc: &Arc) -> Self { WeakOpt(Some(Arc::downgrade(arc))) } fn upgrade(&self) -> Option> { self.0.as_ref().and_then(Weak::upgrade) } } #[cfg(test)] mod tests { use std::task::Poll; use std::time::Duration; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; use crate::common::{exec::Exec, task, Future, Pin}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] struct Uniq(T); impl Poolable for Uniq { fn is_open(&self) -> bool { true } fn reserve(self) -> Reservation { Reservation::Unique(self) } fn can_share(&self) -> bool { false } } fn c(key: Key) -> Connecting { Connecting { key, pool: WeakOpt::none(), } } fn host_key(s: &str) -> Key { (http::uri::Scheme::HTTP, s.parse().expect("host key")) } fn pool_no_timer() -> Pool { pool_max_idle_no_timer(::std::usize::MAX) } fn pool_max_idle_no_timer(max_idle: usize) -> Pool { let pool = Pool::new( super::Config { idle_timeout: Some(Duration::from_millis(100)), max_idle_per_host: max_idle, }, &Exec::Default, ); pool.no_timer(); pool } #[tokio::test] async fn test_pool_checkout_smoke() { let pool = pool_no_timer(); let key = host_key("foo"); let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); match pool.checkout(key).await { Ok(pooled) => assert_eq!(*pooled, Uniq(41)), Err(_) => panic!("not ready"), }; } /// Helper to check if the future is ready after polling once. struct PollOnce<'a, F>(&'a mut F); impl Future for PollOnce<'_, F> where F: Future> + Unpin, { type Output = Option<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match Pin::new(&mut self.0).poll(cx) { Poll::Ready(Ok(_)) => Poll::Ready(Some(())), Poll::Ready(Err(_)) => Poll::Ready(Some(())), Poll::Pending => Poll::Ready(None), } } } #[tokio::test] async fn test_pool_checkout_returns_none_if_expired() { let pool = pool_no_timer(); let key = host_key("foo"); let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); tokio::time::sleep(pool.locked().timeout.unwrap()).await; let mut checkout = pool.checkout(key); let poll_once = PollOnce(&mut checkout); let is_not_ready = poll_once.await.is_none(); assert!(is_not_ready); } #[cfg(feature = "runtime")] #[tokio::test] async fn test_pool_checkout_removes_expired() { let pool = pool_no_timer(); let key = host_key("foo"); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(99)); assert_eq!( pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3) ); tokio::time::sleep(pool.locked().timeout.unwrap()).await; let mut checkout = pool.checkout(key.clone()); let poll_once = PollOnce(&mut checkout); // checkout.await should clean out the expired poll_once.await; assert!(pool.locked().idle.get(&key).is_none()); } #[test] fn test_pool_max_idle_per_host() { let pool = pool_max_idle_no_timer(2); let key = host_key("foo"); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(99)); // pooled and dropped 3, max_idle should only allow 2 assert_eq!( pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2) ); } #[cfg(feature = "runtime")] #[tokio::test] async fn test_pool_timer_removes_expired() { let _ = pretty_env_logger::try_init(); tokio::time::pause(); let pool = Pool::new( super::Config { idle_timeout: Some(Duration::from_millis(10)), max_idle_per_host: std::usize::MAX, }, &Exec::Default, ); let key = host_key("foo"); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(99)); assert_eq!( pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3) ); // Let the timer tick passed the expiration... tokio::time::advance(Duration::from_millis(30)).await; // Yield so the Interval can reap... tokio::task::yield_now().await; assert!(pool.locked().idle.get(&key).is_none()); } #[tokio::test] async fn test_pool_checkout_task_unparked() { use futures_util::future::join; use futures_util::FutureExt; let pool = pool_no_timer(); let key = host_key("foo"); let pooled = pool.pooled(c(key.clone()), Uniq(41)); let checkout = join(pool.checkout(key), async { // the checkout future will park first, // and then this lazy future will be polled, which will insert // the pooled back into the pool // // this test makes sure that doing so will unpark the checkout drop(pooled); }) .map(|(entry, _)| entry); assert_eq!(*checkout.await.unwrap(), Uniq(41)); } #[tokio::test] async fn test_pool_checkout_drop_cleans_up_waiters() { let pool = pool_no_timer::>(); let key = host_key("foo"); let mut checkout1 = pool.checkout(key.clone()); let mut checkout2 = pool.checkout(key.clone()); let poll_once1 = PollOnce(&mut checkout1); let poll_once2 = PollOnce(&mut checkout2); // first poll needed to get into Pool's parked poll_once1.await; assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); poll_once2.await; assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); // on drop, clean up Pool drop(checkout1); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); drop(checkout2); assert!(pool.locked().waiters.get(&key).is_none()); } #[derive(Debug)] struct CanClose { #[allow(unused)] val: i32, closed: bool, } impl Poolable for CanClose { fn is_open(&self) -> bool { !self.closed } fn reserve(self) -> Reservation { Reservation::Unique(self) } fn can_share(&self) -> bool { false } } #[test] fn pooled_drop_if_closed_doesnt_reinsert() { let pool = pool_no_timer(); let key = host_key("foo"); pool.pooled( c(key.clone()), CanClose { val: 57, closed: true, }, ); assert!(!pool.locked().idle.contains_key(&key)); } }