summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/client/pool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/client/pool.rs')
-rw-r--r--third_party/rust/hyper/src/client/pool.rs1044
1 files changed, 1044 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/client/pool.rs b/third_party/rust/hyper/src/client/pool.rs
new file mode 100644
index 0000000000..b9772d688d
--- /dev/null
+++ b/third_party/rust/hyper/src/client/pool.rs
@@ -0,0 +1,1044 @@
+use std::collections::{HashMap, HashSet, VecDeque};
+use std::error::Error as StdError;
+use std::fmt;
+use std::ops::{Deref, DerefMut};
+use std::sync::{Arc, Mutex, Weak};
+
+#[cfg(not(feature = "runtime"))]
+use std::time::{Duration, Instant};
+
+use futures_channel::oneshot;
+#[cfg(feature = "runtime")]
+use tokio::time::{Duration, Instant, Interval};
+use tracing::{debug, trace};
+
+use super::client::Ver;
+use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) struct Pool<T> {
+ // If the pool is disabled, this is None.
+ inner: Option<Arc<Mutex<PoolInner<T>>>>,
+}
+
+// Before using a pooled connection, make sure the sender is not dead.
+//
+// This is a trait to allow the `client::pool::tests` to work for `i32`.
+//
+// See https://github.com/hyperium/hyper/issues/1429
+pub(super) trait Poolable: Unpin + Send + Sized + 'static {
+ fn is_open(&self) -> bool;
+ /// Reserve this connection.
+ ///
+ /// Allows for HTTP/2 to return a shared reservation.
+ fn reserve(self) -> Reservation<Self>;
+ fn can_share(&self) -> bool;
+}
+
+/// When checking out a pooled connection, it might be that the connection
+/// only supports a single reservation, or it might be usable for many.
+///
+/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
+/// used for multiple requests.
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) enum Reservation<T> {
+ /// This connection could be used multiple times, the first one will be
+ /// reinserted into the `idle` pool, and the second will be given to
+ /// the `Checkout`.
+ #[cfg(feature = "http2")]
+ Shared(T, T),
+ /// This connection requires unique access. It will be returned after
+ /// use is complete.
+ Unique(T),
+}
+
+/// Simple type alias in case the key type needs to be adjusted.
+pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
+
+struct PoolInner<T> {
+ // A flag that a connection is being established, and the connection
+ // should be shared. This prevents making multiple HTTP/2 connections
+ // to the same host.
+ connecting: HashSet<Key>,
+ // These are internal Conns sitting in the event loop in the KeepAlive
+ // state, waiting to receive a new Request to send on the socket.
+ idle: HashMap<Key, Vec<Idle<T>>>,
+ max_idle_per_host: usize,
+ // These are outstanding Checkouts that are waiting for a socket to be
+ // able to send a Request one. This is used when "racing" for a new
+ // connection.
+ //
+ // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
+ // for the Pool to receive an idle Conn. When a Conn becomes idle,
+ // this list is checked for any parked Checkouts, and tries to notify
+ // them that the Conn could be used instead of waiting for a brand new
+ // connection.
+ waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
+ // A oneshot channel is used to allow the interval to be notified when
+ // the Pool completely drops. That way, the interval can cancel immediately.
+ #[cfg(feature = "runtime")]
+ idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
+ #[cfg(feature = "runtime")]
+ exec: Exec,
+ timeout: Option<Duration>,
+}
+
+// This is because `Weak::new()` *allocates* space for `T`, even if it
+// doesn't need it!
+struct WeakOpt<T>(Option<Weak<T>>);
+
+#[derive(Clone, Copy, Debug)]
+pub(super) struct Config {
+ pub(super) idle_timeout: Option<Duration>,
+ pub(super) max_idle_per_host: usize,
+}
+
+impl Config {
+ pub(super) fn is_enabled(&self) -> bool {
+ self.max_idle_per_host > 0
+ }
+}
+
+impl<T> Pool<T> {
+ pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
+ let inner = if config.is_enabled() {
+ Some(Arc::new(Mutex::new(PoolInner {
+ connecting: HashSet::new(),
+ idle: HashMap::new(),
+ #[cfg(feature = "runtime")]
+ idle_interval_ref: None,
+ max_idle_per_host: config.max_idle_per_host,
+ waiters: HashMap::new(),
+ #[cfg(feature = "runtime")]
+ exec: __exec.clone(),
+ timeout: config.idle_timeout,
+ })))
+ } else {
+ None
+ };
+
+ Pool { inner }
+ }
+
+ fn is_enabled(&self) -> bool {
+ self.inner.is_some()
+ }
+
+ #[cfg(test)]
+ pub(super) fn no_timer(&self) {
+ // Prevent an actual interval from being created for this pool...
+ #[cfg(feature = "runtime")]
+ {
+ let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
+ assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
+ let (tx, _) = oneshot::channel();
+ inner.idle_interval_ref = Some(tx);
+ }
+ }
+}
+
+impl<T: Poolable> Pool<T> {
+ /// Returns a `Checkout` which is a future that resolves if an idle
+ /// connection becomes available.
+ pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
+ Checkout {
+ key,
+ pool: self.clone(),
+ waiter: None,
+ }
+ }
+
+ /// Ensure that there is only ever 1 connecting task for HTTP/2
+ /// connections. This does nothing for HTTP/1.
+ pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
+ if ver == Ver::Http2 {
+ if let Some(ref enabled) = self.inner {
+ let mut inner = enabled.lock().unwrap();
+ return if inner.connecting.insert(key.clone()) {
+ let connecting = Connecting {
+ key: key.clone(),
+ pool: WeakOpt::downgrade(enabled),
+ };
+ Some(connecting)
+ } else {
+ trace!("HTTP/2 connecting already in progress for {:?}", key);
+ None
+ };
+ }
+ }
+
+ // else
+ Some(Connecting {
+ key: key.clone(),
+ // in HTTP/1's case, there is never a lock, so we don't
+ // need to do anything in Drop.
+ pool: WeakOpt::none(),
+ })
+ }
+
+ #[cfg(test)]
+ fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
+ self.inner.as_ref().expect("enabled").lock().expect("lock")
+ }
+
+ /* Used in client/tests.rs...
+ #[cfg(feature = "runtime")]
+ #[cfg(test)]
+ pub(super) fn h1_key(&self, s: &str) -> Key {
+ Arc::new(s.to_string())
+ }
+
+ #[cfg(feature = "runtime")]
+ #[cfg(test)]
+ pub(super) fn idle_count(&self, key: &Key) -> usize {
+ self
+ .locked()
+ .idle
+ .get(key)
+ .map(|list| list.len())
+ .unwrap_or(0)
+ }
+ */
+
+ pub(super) fn pooled(
+ &self,
+ #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
+ value: T,
+ ) -> Pooled<T> {
+ let (value, pool_ref) = if let Some(ref enabled) = self.inner {
+ match value.reserve() {
+ #[cfg(feature = "http2")]
+ Reservation::Shared(to_insert, to_return) => {
+ let mut inner = enabled.lock().unwrap();
+ inner.put(connecting.key.clone(), to_insert, enabled);
+ // Do this here instead of Drop for Connecting because we
+ // already have a lock, no need to lock the mutex twice.
+ inner.connected(&connecting.key);
+ // prevent the Drop of Connecting from repeating inner.connected()
+ connecting.pool = WeakOpt::none();
+
+ // Shared reservations don't need a reference to the pool,
+ // since the pool always keeps a copy.
+ (to_return, WeakOpt::none())
+ }
+ Reservation::Unique(value) => {
+ // Unique reservations must take a reference to the pool
+ // since they hope to reinsert once the reservation is
+ // completed
+ (value, WeakOpt::downgrade(enabled))
+ }
+ }
+ } else {
+ // If pool is not enabled, skip all the things...
+
+ // The Connecting should have had no pool ref
+ debug_assert!(connecting.pool.upgrade().is_none());
+
+ (value, WeakOpt::none())
+ };
+ Pooled {
+ key: connecting.key.clone(),
+ is_reused: false,
+ pool: pool_ref,
+ value: Some(value),
+ }
+ }
+
+ fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
+ debug!("reuse idle connection for {:?}", key);
+ // TODO: unhack this
+ // In Pool::pooled(), which is used for inserting brand new connections,
+ // there's some code that adjusts the pool reference taken depending
+ // on if the Reservation can be shared or is unique. By the time
+ // reuse() is called, the reservation has already been made, and
+ // we just have the final value, without knowledge of if this is
+ // unique or shared. So, the hack is to just assume Ver::Http2 means
+ // shared... :(
+ let mut pool_ref = WeakOpt::none();
+ if !value.can_share() {
+ if let Some(ref enabled) = self.inner {
+ pool_ref = WeakOpt::downgrade(enabled);
+ }
+ }
+
+ Pooled {
+ is_reused: true,
+ key: key.clone(),
+ pool: pool_ref,
+ value: Some(value),
+ }
+ }
+}
+
+/// Pop off this list, looking for a usable connection that hasn't expired.
+struct IdlePopper<'a, T> {
+ key: &'a Key,
+ list: &'a mut Vec<Idle<T>>,
+}
+
+impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
+ fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
+ while let Some(entry) = self.list.pop() {
+ // If the connection has been closed, or is older than our idle
+ // timeout, simply drop it and keep looking...
+ if !entry.value.is_open() {
+ trace!("removing closed connection for {:?}", self.key);
+ continue;
+ }
+ // TODO: Actually, since the `idle` list is pushed to the end always,
+ // that would imply that if *this* entry is expired, then anything
+ // "earlier" in the list would *have* to be expired also... Right?
+ //
+ // In that case, we could just break out of the loop and drop the
+ // whole list...
+ if expiration.expires(entry.idle_at) {
+ trace!("removing expired connection for {:?}", self.key);
+ continue;
+ }
+
+ let value = match entry.value.reserve() {
+ #[cfg(feature = "http2")]
+ Reservation::Shared(to_reinsert, to_checkout) => {
+ self.list.push(Idle {
+ idle_at: Instant::now(),
+ value: to_reinsert,
+ });
+ to_checkout
+ }
+ Reservation::Unique(unique) => unique,
+ };
+
+ return Some(Idle {
+ idle_at: entry.idle_at,
+ value,
+ });
+ }
+
+ None
+ }
+}
+
+impl<T: Poolable> PoolInner<T> {
+ fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
+ if value.can_share() && self.idle.contains_key(&key) {
+ trace!("put; existing idle HTTP/2 connection for {:?}", key);
+ return;
+ }
+ trace!("put; add idle connection for {:?}", key);
+ let mut remove_waiters = false;
+ let mut value = Some(value);
+ if let Some(waiters) = self.waiters.get_mut(&key) {
+ while let Some(tx) = waiters.pop_front() {
+ if !tx.is_canceled() {
+ let reserved = value.take().expect("value already sent");
+ let reserved = match reserved.reserve() {
+ #[cfg(feature = "http2")]
+ Reservation::Shared(to_keep, to_send) => {
+ value = Some(to_keep);
+ to_send
+ }
+ Reservation::Unique(uniq) => uniq,
+ };
+ match tx.send(reserved) {
+ Ok(()) => {
+ if value.is_none() {
+ break;
+ } else {
+ continue;
+ }
+ }
+ Err(e) => {
+ value = Some(e);
+ }
+ }
+ }
+
+ trace!("put; removing canceled waiter for {:?}", key);
+ }
+ remove_waiters = waiters.is_empty();
+ }
+ if remove_waiters {
+ self.waiters.remove(&key);
+ }
+
+ match value {
+ Some(value) => {
+ // borrow-check scope...
+ {
+ let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
+ if self.max_idle_per_host <= idle_list.len() {
+ trace!("max idle per host for {:?}, dropping connection", key);
+ return;
+ }
+
+ debug!("pooling idle connection for {:?}", key);
+ idle_list.push(Idle {
+ value,
+ idle_at: Instant::now(),
+ });
+ }
+
+ #[cfg(feature = "runtime")]
+ {
+ self.spawn_idle_interval(__pool_ref);
+ }
+ }
+ None => trace!("put; found waiter for {:?}", key),
+ }
+ }
+
+ /// A `Connecting` task is complete. Not necessarily successfully,
+ /// but the lock is going away, so clean up.
+ fn connected(&mut self, key: &Key) {
+ let existed = self.connecting.remove(key);
+ debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
+ // cancel any waiters. if there are any, it's because
+ // this Connecting task didn't complete successfully.
+ // those waiters would never receive a connection.
+ self.waiters.remove(key);
+ }
+
+ #[cfg(feature = "runtime")]
+ fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
+ let (dur, rx) = {
+ if self.idle_interval_ref.is_some() {
+ return;
+ }
+
+ if let Some(dur) = self.timeout {
+ let (tx, rx) = oneshot::channel();
+ self.idle_interval_ref = Some(tx);
+ (dur, rx)
+ } else {
+ return;
+ }
+ };
+
+ let interval = IdleTask {
+ interval: tokio::time::interval(dur),
+ pool: WeakOpt::downgrade(pool_ref),
+ pool_drop_notifier: rx,
+ };
+
+ self.exec.execute(interval);
+ }
+}
+
+impl<T> PoolInner<T> {
+ /// Any `FutureResponse`s that were created will have made a `Checkout`,
+ /// and possibly inserted into the pool that it is waiting for an idle
+ /// connection. If a user ever dropped that future, we need to clean out
+ /// those parked senders.
+ fn clean_waiters(&mut self, key: &Key) {
+ let mut remove_waiters = false;
+ if let Some(waiters) = self.waiters.get_mut(key) {
+ waiters.retain(|tx| !tx.is_canceled());
+ remove_waiters = waiters.is_empty();
+ }
+ if remove_waiters {
+ self.waiters.remove(key);
+ }
+ }
+}
+
+#[cfg(feature = "runtime")]
+impl<T: Poolable> PoolInner<T> {
+ /// This should *only* be called by the IdleTask
+ fn clear_expired(&mut self) {
+ let dur = self.timeout.expect("interval assumes timeout");
+
+ let now = Instant::now();
+ //self.last_idle_check_at = now;
+
+ self.idle.retain(|key, values| {
+ values.retain(|entry| {
+ if !entry.value.is_open() {
+ trace!("idle interval evicting closed for {:?}", key);
+ return false;
+ }
+
+ // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
+ if now.saturating_duration_since(entry.idle_at) > dur {
+ trace!("idle interval evicting expired for {:?}", key);
+ return false;
+ }
+
+ // Otherwise, keep this value...
+ true
+ });
+
+ // returning false evicts this key/val
+ !values.is_empty()
+ });
+ }
+}
+
+impl<T> Clone for Pool<T> {
+ fn clone(&self) -> Pool<T> {
+ Pool {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
+// Note: The bounds `T: Poolable` is needed for the Drop impl.
+pub(super) struct Pooled<T: Poolable> {
+ value: Option<T>,
+ is_reused: bool,
+ key: Key,
+ pool: WeakOpt<Mutex<PoolInner<T>>>,
+}
+
+impl<T: Poolable> Pooled<T> {
+ pub(super) fn is_reused(&self) -> bool {
+ self.is_reused
+ }
+
+ pub(super) fn is_pool_enabled(&self) -> bool {
+ self.pool.0.is_some()
+ }
+
+ fn as_ref(&self) -> &T {
+ self.value.as_ref().expect("not dropped")
+ }
+
+ fn as_mut(&mut self) -> &mut T {
+ self.value.as_mut().expect("not dropped")
+ }
+}
+
+impl<T: Poolable> Deref for Pooled<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ self.as_ref()
+ }
+}
+
+impl<T: Poolable> DerefMut for Pooled<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ self.as_mut()
+ }
+}
+
+impl<T: Poolable> Drop for Pooled<T> {
+ fn drop(&mut self) {
+ if let Some(value) = self.value.take() {
+ if !value.is_open() {
+ // If we *already* know the connection is done here,
+ // it shouldn't be re-inserted back into the pool.
+ return;
+ }
+
+ if let Some(pool) = self.pool.upgrade() {
+ if let Ok(mut inner) = pool.lock() {
+ inner.put(self.key.clone(), value, &pool);
+ }
+ } else if !value.can_share() {
+ trace!("pool dropped, dropping pooled ({:?})", self.key);
+ }
+ // Ver::Http2 is already in the Pool (or dead), so we wouldn't
+ // have an actual reference to the Pool.
+ }
+ }
+}
+
+impl<T: Poolable> fmt::Debug for Pooled<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Pooled").field("key", &self.key).finish()
+ }
+}
+
+struct Idle<T> {
+ idle_at: Instant,
+ value: T,
+}
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) struct Checkout<T> {
+ key: Key,
+ pool: Pool<T>,
+ waiter: Option<oneshot::Receiver<T>>,
+}
+
+#[derive(Debug)]
+pub(super) struct CheckoutIsClosedError;
+
+impl StdError for CheckoutIsClosedError {}
+
+impl fmt::Display for CheckoutIsClosedError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("checked out connection was closed")
+ }
+}
+
+impl<T: Poolable> Checkout<T> {
+ fn poll_waiter(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<crate::Result<Pooled<T>>>> {
+ if let Some(mut rx) = self.waiter.take() {
+ match Pin::new(&mut rx).poll(cx) {
+ Poll::Ready(Ok(value)) => {
+ if value.is_open() {
+ Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
+ } else {
+ Poll::Ready(Some(Err(
+ crate::Error::new_canceled().with(CheckoutIsClosedError)
+ )))
+ }
+ }
+ Poll::Pending => {
+ self.waiter = Some(rx);
+ Poll::Pending
+ }
+ Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
+ crate::Error::new_canceled().with("request has been canceled")
+ ))),
+ }
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> {
+ let entry = {
+ let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
+ let expiration = Expiration::new(inner.timeout);
+ let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
+ trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
+ // A block to end the mutable borrow on list,
+ // so the map below can check is_empty()
+ {
+ let popper = IdlePopper {
+ key: &self.key,
+ list,
+ };
+ popper.pop(&expiration)
+ }
+ .map(|e| (e, list.is_empty()))
+ });
+
+ let (entry, empty) = if let Some((e, empty)) = maybe_entry {
+ (Some(e), empty)
+ } else {
+ // No entry found means nuke the list for sure.
+ (None, true)
+ };
+ if empty {
+ //TODO: This could be done with the HashMap::entry API instead.
+ inner.idle.remove(&self.key);
+ }
+
+ if entry.is_none() && self.waiter.is_none() {
+ let (tx, mut rx) = oneshot::channel();
+ trace!("checkout waiting for idle connection: {:?}", self.key);
+ inner
+ .waiters
+ .entry(self.key.clone())
+ .or_insert_with(VecDeque::new)
+ .push_back(tx);
+
+ // register the waker with this oneshot
+ assert!(Pin::new(&mut rx).poll(cx).is_pending());
+ self.waiter = Some(rx);
+ }
+
+ entry
+ };
+
+ entry.map(|e| self.pool.reuse(&self.key, e.value))
+ }
+}
+
+impl<T: Poolable> Future for Checkout<T> {
+ type Output = crate::Result<Pooled<T>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
+ return Poll::Ready(Ok(pooled));
+ }
+
+ if let Some(pooled) = self.checkout(cx) {
+ Poll::Ready(Ok(pooled))
+ } else if !self.pool.is_enabled() {
+ Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
+ } else {
+ // There's a new waiter, already registered in self.checkout()
+ debug_assert!(self.waiter.is_some());
+ Poll::Pending
+ }
+ }
+}
+
+impl<T> Drop for Checkout<T> {
+ fn drop(&mut self) {
+ if self.waiter.take().is_some() {
+ trace!("checkout dropped for {:?}", self.key);
+ if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
+ inner.clean_waiters(&self.key);
+ }
+ }
+ }
+}
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) struct Connecting<T: Poolable> {
+ key: Key,
+ pool: WeakOpt<Mutex<PoolInner<T>>>,
+}
+
+impl<T: Poolable> Connecting<T> {
+ pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
+ debug_assert!(
+ self.pool.0.is_none(),
+ "Connecting::alpn_h2 but already Http2"
+ );
+
+ pool.connecting(&self.key, Ver::Http2)
+ }
+}
+
+impl<T: Poolable> Drop for Connecting<T> {
+ fn drop(&mut self) {
+ if let Some(pool) = self.pool.upgrade() {
+ // No need to panic on drop, that could abort!
+ if let Ok(mut inner) = pool.lock() {
+ inner.connected(&self.key);
+ }
+ }
+ }
+}
+
+struct Expiration(Option<Duration>);
+
+impl Expiration {
+ fn new(dur: Option<Duration>) -> Expiration {
+ Expiration(dur)
+ }
+
+ fn expires(&self, instant: Instant) -> bool {
+ match self.0 {
+ // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
+ Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
+ None => false,
+ }
+ }
+}
+
+#[cfg(feature = "runtime")]
+pin_project_lite::pin_project! {
+ struct IdleTask<T> {
+ #[pin]
+ interval: Interval,
+ pool: WeakOpt<Mutex<PoolInner<T>>>,
+ // This allows the IdleTask to be notified as soon as the entire
+ // Pool is fully dropped, and shutdown. This channel is never sent on,
+ // but Err(Canceled) will be received when the Pool is dropped.
+ #[pin]
+ pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
+ }
+}
+
+#[cfg(feature = "runtime")]
+impl<T: Poolable + 'static> Future for IdleTask<T> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ match this.pool_drop_notifier.as_mut().poll(cx) {
+ Poll::Ready(Ok(n)) => match n {},
+ Poll::Pending => (),
+ Poll::Ready(Err(_canceled)) => {
+ trace!("pool closed, canceling idle interval");
+ return Poll::Ready(());
+ }
+ }
+
+ ready!(this.interval.as_mut().poll_tick(cx));
+
+ if let Some(inner) = this.pool.upgrade() {
+ if let Ok(mut inner) = inner.lock() {
+ trace!("idle interval checking for expired");
+ inner.clear_expired();
+ continue;
+ }
+ }
+ return Poll::Ready(());
+ }
+ }
+}
+
+impl<T> WeakOpt<T> {
+ fn none() -> Self {
+ WeakOpt(None)
+ }
+
+ fn downgrade(arc: &Arc<T>) -> Self {
+ WeakOpt(Some(Arc::downgrade(arc)))
+ }
+
+ fn upgrade(&self) -> Option<Arc<T>> {
+ self.0.as_ref().and_then(Weak::upgrade)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::task::Poll;
+ use std::time::Duration;
+
+ use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
+ use crate::common::{exec::Exec, task, Future, Pin};
+
+ /// Test unique reservations.
+ #[derive(Debug, PartialEq, Eq)]
+ struct Uniq<T>(T);
+
+ impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
+ fn is_open(&self) -> bool {
+ true
+ }
+
+ fn reserve(self) -> Reservation<Self> {
+ Reservation::Unique(self)
+ }
+
+ fn can_share(&self) -> bool {
+ false
+ }
+ }
+
+ fn c<T: Poolable>(key: Key) -> Connecting<T> {
+ Connecting {
+ key,
+ pool: WeakOpt::none(),
+ }
+ }
+
+ fn host_key(s: &str) -> Key {
+ (http::uri::Scheme::HTTP, s.parse().expect("host key"))
+ }
+
+ fn pool_no_timer<T>() -> Pool<T> {
+ pool_max_idle_no_timer(::std::usize::MAX)
+ }
+
+ fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
+ let pool = Pool::new(
+ super::Config {
+ idle_timeout: Some(Duration::from_millis(100)),
+ max_idle_per_host: max_idle,
+ },
+ &Exec::Default,
+ );
+ pool.no_timer();
+ pool
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_smoke() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ let pooled = pool.pooled(c(key.clone()), Uniq(41));
+
+ drop(pooled);
+
+ match pool.checkout(key).await {
+ Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
+ Err(_) => panic!("not ready"),
+ };
+ }
+
+ /// Helper to check if the future is ready after polling once.
+ struct PollOnce<'a, F>(&'a mut F);
+
+ impl<F, T, U> Future for PollOnce<'_, F>
+ where
+ F: Future<Output = Result<T, U>> + Unpin,
+ {
+ type Output = Option<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match Pin::new(&mut self.0).poll(cx) {
+ Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
+ Poll::Ready(Err(_)) => Poll::Ready(Some(())),
+ Poll::Pending => Poll::Ready(None),
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_returns_none_if_expired() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ let pooled = pool.pooled(c(key.clone()), Uniq(41));
+
+ drop(pooled);
+ tokio::time::sleep(pool.locked().timeout.unwrap()).await;
+ let mut checkout = pool.checkout(key);
+ let poll_once = PollOnce(&mut checkout);
+ let is_not_ready = poll_once.await.is_none();
+ assert!(is_not_ready);
+ }
+
+ #[cfg(feature = "runtime")]
+ #[tokio::test]
+ async fn test_pool_checkout_removes_expired() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+
+ pool.pooled(c(key.clone()), Uniq(41));
+ pool.pooled(c(key.clone()), Uniq(5));
+ pool.pooled(c(key.clone()), Uniq(99));
+
+ assert_eq!(
+ pool.locked().idle.get(&key).map(|entries| entries.len()),
+ Some(3)
+ );
+ tokio::time::sleep(pool.locked().timeout.unwrap()).await;
+
+ let mut checkout = pool.checkout(key.clone());
+ let poll_once = PollOnce(&mut checkout);
+ // checkout.await should clean out the expired
+ poll_once.await;
+ assert!(pool.locked().idle.get(&key).is_none());
+ }
+
+ #[test]
+ fn test_pool_max_idle_per_host() {
+ let pool = pool_max_idle_no_timer(2);
+ let key = host_key("foo");
+
+ pool.pooled(c(key.clone()), Uniq(41));
+ pool.pooled(c(key.clone()), Uniq(5));
+ pool.pooled(c(key.clone()), Uniq(99));
+
+ // pooled and dropped 3, max_idle should only allow 2
+ assert_eq!(
+ pool.locked().idle.get(&key).map(|entries| entries.len()),
+ Some(2)
+ );
+ }
+
+ #[cfg(feature = "runtime")]
+ #[tokio::test]
+ async fn test_pool_timer_removes_expired() {
+ let _ = pretty_env_logger::try_init();
+ tokio::time::pause();
+
+ let pool = Pool::new(
+ super::Config {
+ idle_timeout: Some(Duration::from_millis(10)),
+ max_idle_per_host: std::usize::MAX,
+ },
+ &Exec::Default,
+ );
+
+ let key = host_key("foo");
+
+ pool.pooled(c(key.clone()), Uniq(41));
+ pool.pooled(c(key.clone()), Uniq(5));
+ pool.pooled(c(key.clone()), Uniq(99));
+
+ assert_eq!(
+ pool.locked().idle.get(&key).map(|entries| entries.len()),
+ Some(3)
+ );
+
+ // Let the timer tick passed the expiration...
+ tokio::time::advance(Duration::from_millis(30)).await;
+ // Yield so the Interval can reap...
+ tokio::task::yield_now().await;
+
+ assert!(pool.locked().idle.get(&key).is_none());
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_task_unparked() {
+ use futures_util::future::join;
+ use futures_util::FutureExt;
+
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ let pooled = pool.pooled(c(key.clone()), Uniq(41));
+
+ let checkout = join(pool.checkout(key), async {
+ // the checkout future will park first,
+ // and then this lazy future will be polled, which will insert
+ // the pooled back into the pool
+ //
+ // this test makes sure that doing so will unpark the checkout
+ drop(pooled);
+ })
+ .map(|(entry, _)| entry);
+
+ assert_eq!(*checkout.await.unwrap(), Uniq(41));
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_drop_cleans_up_waiters() {
+ let pool = pool_no_timer::<Uniq<i32>>();
+ let key = host_key("foo");
+
+ let mut checkout1 = pool.checkout(key.clone());
+ let mut checkout2 = pool.checkout(key.clone());
+
+ let poll_once1 = PollOnce(&mut checkout1);
+ let poll_once2 = PollOnce(&mut checkout2);
+
+ // first poll needed to get into Pool's parked
+ poll_once1.await;
+ assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
+ poll_once2.await;
+ assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
+
+ // on drop, clean up Pool
+ drop(checkout1);
+ assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
+
+ drop(checkout2);
+ assert!(pool.locked().waiters.get(&key).is_none());
+ }
+
+ #[derive(Debug)]
+ struct CanClose {
+ #[allow(unused)]
+ val: i32,
+ closed: bool,
+ }
+
+ impl Poolable for CanClose {
+ fn is_open(&self) -> bool {
+ !self.closed
+ }
+
+ fn reserve(self) -> Reservation<Self> {
+ Reservation::Unique(self)
+ }
+
+ fn can_share(&self) -> bool {
+ false
+ }
+ }
+
+ #[test]
+ fn pooled_drop_if_closed_doesnt_reinsert() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ pool.pooled(
+ c(key.clone()),
+ CanClose {
+ val: 57,
+ closed: true,
+ },
+ );
+
+ assert!(!pool.locked().idle.contains_key(&key));
+ }
+}