summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/server/shutdown.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/hyper/src/server/shutdown.rs
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/hyper/src/server/shutdown.rs')
-rw-r--r--third_party/rust/hyper/src/server/shutdown.rs128
1 files changed, 128 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/server/shutdown.rs b/third_party/rust/hyper/src/server/shutdown.rs
new file mode 100644
index 0000000000..96937d0827
--- /dev/null
+++ b/third_party/rust/hyper/src/server/shutdown.rs
@@ -0,0 +1,128 @@
+use std::error::Error as StdError;
+
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tracing::debug;
+
+use super::accept::Accept;
+use super::conn::UpgradeableConnection;
+use super::server::{Server, Watcher};
+use crate::body::{Body, HttpBody};
+use crate::common::drain::{self, Draining, Signal, Watch, Watching};
+use crate::common::exec::{ConnStreamExec, NewSvcExec};
+use crate::common::{task, Future, Pin, Poll, Unpin};
+use crate::service::{HttpService, MakeServiceRef};
+
+pin_project! {
+ #[allow(missing_debug_implementations)]
+ pub struct Graceful<I, S, F, E> {
+ #[pin]
+ state: State<I, S, F, E>,
+ }
+}
+
+pin_project! {
+ #[project = StateProj]
+ pub(super) enum State<I, S, F, E> {
+ Running {
+ drain: Option<(Signal, Watch)>,
+ #[pin]
+ server: Server<I, S, E>,
+ #[pin]
+ signal: F,
+ },
+ Draining { draining: Draining },
+ }
+}
+
+impl<I, S, F, E> Graceful<I, S, F, E> {
+ pub(super) fn new(server: Server<I, S, E>, signal: F) -> Self {
+ let drain = Some(drain::channel());
+ Graceful {
+ state: State::Running {
+ drain,
+ server,
+ signal,
+ },
+ }
+ }
+}
+
+impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ F: Future<Output = ()>,
+ E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ let next = {
+ match me.state.as_mut().project() {
+ StateProj::Running {
+ drain,
+ server,
+ signal,
+ } => match signal.poll(cx) {
+ Poll::Ready(()) => {
+ debug!("signal received, starting graceful shutdown");
+ let sig = drain.take().expect("drain channel").0;
+ State::Draining {
+ draining: sig.drain(),
+ }
+ }
+ Poll::Pending => {
+ let watch = drain.as_ref().expect("drain channel").1.clone();
+ return server.poll_watch(cx, &GracefulWatcher(watch));
+ }
+ },
+ StateProj::Draining { ref mut draining } => {
+ return Pin::new(draining).poll(cx).map(Ok);
+ }
+ }
+ };
+ me.state.set(next);
+ }
+ }
+}
+
+#[allow(missing_debug_implementations)]
+#[derive(Clone)]
+pub struct GracefulWatcher(Watch);
+
+impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
+where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: HttpService<Body>,
+ E: ConnStreamExec<S::Future, S::ResBody>,
+ S::ResBody: 'static,
+ <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Future =
+ Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
+ self.0.clone().watch(conn, on_drain)
+ }
+}
+
+fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
+where
+ S: HttpService<Body>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ S::ResBody: HttpBody + 'static,
+ <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, S::ResBody>,
+{
+ conn.graceful_shutdown()
+}