use std::error::Error as StdError; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use super::accept::Accept; use super::conn::UpgradeableConnection; use super::server::{Server, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::{HttpService, MakeServiceRef}; pin_project! { #[allow(missing_debug_implementations)] pub struct Graceful { #[pin] state: State, } } pin_project! { #[project = StateProj] pub(super) enum State { Running { drain: Option<(Signal, Watch)>, #[pin] server: Server, #[pin] signal: F, }, Draining { draining: Draining }, } } impl Graceful { pub(super) fn new(server: Server, signal: F) -> Self { let drain = Some(drain::channel()); Graceful { state: State::Running { drain, server, signal, }, } } } impl Future for Graceful where I: Accept, IE: Into>, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: MakeServiceRef, S::Error: Into>, B: HttpBody + 'static, B::Error: Into>, F: Future, E: ConnStreamExec<>::Future, B>, E: NewSvcExec, { type Output = crate::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut me = self.project(); loop { let next = { match me.state.as_mut().project() { StateProj::Running { drain, server, signal, } => match signal.poll(cx) { Poll::Ready(()) => { debug!("signal received, starting graceful shutdown"); let sig = drain.take().expect("drain channel").0; State::Draining { draining: sig.drain(), } } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); return server.poll_watch(cx, &GracefulWatcher(watch)); } }, StateProj::Draining { ref mut draining } => { return Pin::new(draining).poll(cx).map(Ok); } } }; me.state.set(next); } } } #[allow(missing_debug_implementations)] #[derive(Clone)] pub struct GracefulWatcher(Watch); impl Watcher for GracefulWatcher where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: HttpService, E: ConnStreamExec, S::ResBody: 'static, ::Error: Into>, { type Future = Watching, fn(Pin<&mut UpgradeableConnection>)>; fn watch(&self, conn: UpgradeableConnection) -> Self::Future { self.0.clone().watch(conn, on_drain) } } fn on_drain(conn: Pin<&mut UpgradeableConnection>) where S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, S::ResBody: HttpBody + 'static, ::Error: Into>, E: ConnStreamExec, { conn.graceful_shutdown() }