diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/hyper/src/server/shutdown.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-upstream.tar.xz firefox-esr-upstream.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/hyper/src/server/shutdown.rs')
-rw-r--r-- | third_party/rust/hyper/src/server/shutdown.rs | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/server/shutdown.rs b/third_party/rust/hyper/src/server/shutdown.rs new file mode 100644 index 0000000000..96937d0827 --- /dev/null +++ b/third_party/rust/hyper/src/server/shutdown.rs @@ -0,0 +1,128 @@ +use std::error::Error as StdError; + +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::debug; + +use super::accept::Accept; +use super::conn::UpgradeableConnection; +use super::server::{Server, Watcher}; +use crate::body::{Body, HttpBody}; +use crate::common::drain::{self, Draining, Signal, Watch, Watching}; +use crate::common::exec::{ConnStreamExec, NewSvcExec}; +use crate::common::{task, Future, Pin, Poll, Unpin}; +use crate::service::{HttpService, MakeServiceRef}; + +pin_project! { + #[allow(missing_debug_implementations)] + pub struct Graceful<I, S, F, E> { + #[pin] + state: State<I, S, F, E>, + } +} + +pin_project! { + #[project = StateProj] + pub(super) enum State<I, S, F, E> { + Running { + drain: Option<(Signal, Watch)>, + #[pin] + server: Server<I, S, E>, + #[pin] + signal: F, + }, + Draining { draining: Draining }, + } +} + +impl<I, S, F, E> Graceful<I, S, F, E> { + pub(super) fn new(server: Server<I, S, E>, signal: F) -> Self { + let drain = Some(drain::channel()); + Graceful { + state: State::Running { + drain, + server, + signal, + }, + } + } +} + +impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E> +where + I: Accept<Conn = IO, Error = IE>, + IE: Into<Box<dyn StdError + Send + Sync>>, + IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, + S: MakeServiceRef<IO, Body, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + B: HttpBody + 'static, + B::Error: Into<Box<dyn StdError + Send + Sync>>, + F: Future<Output = ()>, + E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, + E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>, +{ + type Output = crate::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + let next = { + match me.state.as_mut().project() { + StateProj::Running { + drain, + server, + signal, + } => match signal.poll(cx) { + Poll::Ready(()) => { + debug!("signal received, starting graceful shutdown"); + let sig = drain.take().expect("drain channel").0; + State::Draining { + draining: sig.drain(), + } + } + Poll::Pending => { + let watch = drain.as_ref().expect("drain channel").1.clone(); + return server.poll_watch(cx, &GracefulWatcher(watch)); + } + }, + StateProj::Draining { ref mut draining } => { + return Pin::new(draining).poll(cx).map(Ok); + } + } + }; + me.state.set(next); + } + } +} + +#[allow(missing_debug_implementations)] +#[derive(Clone)] +pub struct GracefulWatcher(Watch); + +impl<I, S, E> Watcher<I, S, E> for GracefulWatcher +where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + S: HttpService<Body>, + E: ConnStreamExec<S::Future, S::ResBody>, + S::ResBody: 'static, + <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Future = + Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>; + + fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future { + self.0.clone().watch(conn, on_drain) + } +} + +fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>) +where + S: HttpService<Body>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + I: AsyncRead + AsyncWrite + Unpin, + S::ResBody: HttpBody + 'static, + <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>, + E: ConnStreamExec<S::Future, S::ResBody>, +{ + conn.graceful_shutdown() +} |