summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/server
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/hyper/src/server
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/hyper/src/server')
-rw-r--r--third_party/rust/hyper/src/server/accept.rs101
-rw-r--r--third_party/rust/hyper/src/server/conn.rs1025
-rw-r--r--third_party/rust/hyper/src/server/mod.rs480
-rw-r--r--third_party/rust/hyper/src/server/shutdown.rs119
-rw-r--r--third_party/rust/hyper/src/server/tcp.rs299
5 files changed, 2024 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/server/accept.rs b/third_party/rust/hyper/src/server/accept.rs
new file mode 100644
index 0000000000..e56e3acf84
--- /dev/null
+++ b/third_party/rust/hyper/src/server/accept.rs
@@ -0,0 +1,101 @@
+//! The `Accept` trait and supporting types.
+//!
+//! This module contains:
+//!
+//! - The [`Accept`](Accept) trait used to asynchronously accept incoming
+//! connections.
+//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
+
+#[cfg(feature = "stream")]
+use futures_core::Stream;
+
+use crate::common::{
+ task::{self, Poll},
+ Pin,
+};
+
+/// Asynchronously accept incoming connections.
+pub trait Accept {
+ /// The connection type that can be accepted.
+ type Conn;
+ /// The error type that can occur when accepting a connection.
+ type Error;
+
+ /// Poll to accept the next connection.
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>>;
+}
+
+/// Create an `Accept` with a polling function.
+///
+/// # Example
+///
+/// ```
+/// use std::task::Poll;
+/// use hyper::server::{accept, Server};
+///
+/// # let mock_conn = ();
+/// // If we created some mocked connection...
+/// let mut conn = Some(mock_conn);
+///
+/// // And accept just the mocked conn once...
+/// let once = accept::poll_fn(move |cx| {
+/// Poll::Ready(conn.take().map(Ok::<_, ()>))
+/// });
+///
+/// let builder = Server::builder(once);
+/// ```
+pub fn poll_fn<F, IO, E>(func: F) -> impl Accept<Conn = IO, Error = E>
+where
+ F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
+{
+ struct PollFn<F>(F);
+
+ impl<F, IO, E> Accept for PollFn<F>
+ where
+ F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
+ {
+ type Conn = IO;
+ type Error = E;
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ unsafe { (self.get_unchecked_mut().0)(cx) }
+ }
+ }
+
+ PollFn(func)
+}
+
+/// Adapt a `Stream` of incoming connections into an `Accept`.
+///
+/// # Optional
+///
+/// This function requires enabling the `stream` feature in your
+/// `Cargo.toml`.
+#[cfg(feature = "stream")]
+pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
+where
+ S: Stream<Item = Result<IO, E>>,
+{
+ struct FromStream<S>(S);
+
+ impl<S, IO, E> Accept for FromStream<S>
+ where
+ S: Stream<Item = Result<IO, E>>,
+ {
+ type Conn = IO;
+ type Error = E;
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0).poll_next(cx) }
+ }
+ }
+
+ FromStream(stream)
+}
diff --git a/third_party/rust/hyper/src/server/conn.rs b/third_party/rust/hyper/src/server/conn.rs
new file mode 100644
index 0000000000..aa8233da46
--- /dev/null
+++ b/third_party/rust/hyper/src/server/conn.rs
@@ -0,0 +1,1025 @@
+//! Lower-level Server connection API.
+//!
+//! The types in this module are to provide a lower-level API based around a
+//! single connection. Accepting a connection and binding it with a service
+//! are not handled at this level. This module provides the building blocks to
+//! customize those things externally.
+//!
+//! If you don't have need to manage connections yourself, consider using the
+//! higher-level [Server](super) API.
+
+use std::error::Error as StdError;
+use std::fmt;
+use std::mem;
+#[cfg(feature = "tcp")]
+use std::net::SocketAddr;
+#[cfg(feature = "runtime")]
+use std::time::Duration;
+
+use bytes::Bytes;
+use pin_project::{pin_project, project};
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use super::Accept;
+use crate::body::{Body, Payload};
+use crate::common::exec::{Exec, H2Exec, NewSvcExec};
+use crate::common::io::Rewind;
+use crate::common::{task, Future, Pin, Poll, Unpin};
+use crate::error::{Kind, Parse};
+use crate::proto;
+use crate::service::{HttpService, MakeServiceRef};
+use crate::upgrade::Upgraded;
+
+use self::spawn_all::NewSvcTask;
+pub(super) use self::spawn_all::NoopWatcher;
+pub(super) use self::spawn_all::Watcher;
+pub(super) use self::upgrades::UpgradeableConnection;
+
+#[cfg(feature = "tcp")]
+pub use super::tcp::{AddrIncoming, AddrStream};
+
+/// A lower-level configuration of the HTTP protocol.
+///
+/// This structure is used to configure options for an HTTP server connection.
+///
+/// If you don't have need to manage connections yourself, consider using the
+/// higher-level [Server](super) API.
+#[derive(Clone, Debug)]
+pub struct Http<E = Exec> {
+ exec: E,
+ h1_half_close: bool,
+ h1_keep_alive: bool,
+ h1_writev: bool,
+ h2_builder: proto::h2::server::Config,
+ mode: ConnectionMode,
+ max_buf_size: Option<usize>,
+ pipeline_flush: bool,
+}
+
+/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs.
+#[derive(Clone, Debug, PartialEq)]
+enum ConnectionMode {
+ /// Always use HTTP/1 and do not upgrade when a parse error occurs.
+ H1Only,
+ /// Always use HTTP/2.
+ H2Only,
+ /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs.
+ Fallback,
+}
+
+/// A stream mapping incoming IOs to new services.
+///
+/// Yields `Connecting`s that are futures that should be put on a reactor.
+#[must_use = "streams do nothing unless polled"]
+#[pin_project]
+#[derive(Debug)]
+pub(super) struct Serve<I, S, E = Exec> {
+ #[pin]
+ incoming: I,
+ make_service: S,
+ protocol: Http<E>,
+}
+
+/// A future building a new `Service` to a `Connection`.
+///
+/// Wraps the future returned from `MakeService` into one that returns
+/// a `Connection`.
+#[must_use = "futures do nothing unless polled"]
+#[pin_project]
+#[derive(Debug)]
+pub struct Connecting<I, F, E = Exec> {
+ #[pin]
+ future: F,
+ io: Option<I>,
+ protocol: Http<E>,
+}
+
+#[must_use = "futures do nothing unless polled"]
+#[pin_project]
+#[derive(Debug)]
+pub(super) struct SpawnAll<I, S, E> {
+ // TODO: re-add `pub(super)` once rustdoc can handle this.
+ //
+ // See https://github.com/rust-lang/rust/issues/64705
+ #[pin]
+ pub serve: Serve<I, S, E>,
+}
+
+/// A future binding a connection with a Service.
+///
+/// Polling this future will drive HTTP forward.
+#[must_use = "futures do nothing unless polled"]
+#[pin_project]
+pub struct Connection<T, S, E = Exec>
+where
+ S: HttpService<Body>,
+{
+ pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
+ fallback: Fallback<E>,
+}
+
+#[pin_project]
+pub(super) enum ProtoServer<T, B, S, E = Exec>
+where
+ S: HttpService<Body>,
+ B: Payload,
+{
+ H1(
+ #[pin]
+ proto::h1::Dispatcher<
+ proto::h1::dispatch::Server<S, Body>,
+ B,
+ T,
+ proto::ServerTransaction,
+ >,
+ ),
+ H2(#[pin] proto::h2::Server<Rewind<T>, S, B, E>),
+}
+
+#[derive(Clone, Debug)]
+enum Fallback<E> {
+ ToHttp2(proto::h2::server::Config, E),
+ Http1Only,
+}
+
+impl<E> Fallback<E> {
+ fn to_h2(&self) -> bool {
+ match *self {
+ Fallback::ToHttp2(..) => true,
+ Fallback::Http1Only => false,
+ }
+ }
+}
+
+impl<E> Unpin for Fallback<E> {}
+
+/// Deconstructed parts of a `Connection`.
+///
+/// This allows taking apart a `Connection` at a later time, in order to
+/// reclaim the IO object, and additional related pieces.
+#[derive(Debug)]
+pub struct Parts<T, S> {
+ /// The original IO object used in the handshake.
+ pub io: T,
+ /// A buffer of bytes that have been read but not processed as HTTP.
+ ///
+ /// If the client sent additional bytes after its last request, and
+ /// this connection "ended" with an upgrade, the read buffer will contain
+ /// those bytes.
+ ///
+ /// You will want to check for any existing bytes if you plan to continue
+ /// communicating on the IO object.
+ pub read_buf: Bytes,
+ /// The `Service` used to serve this connection.
+ pub service: S,
+ _inner: (),
+}
+
+// ===== impl Http =====
+
+impl Http {
+ /// Creates a new instance of the HTTP protocol, ready to spawn a server or
+ /// start accepting connections.
+ pub fn new() -> Http {
+ Http {
+ exec: Exec::Default,
+ h1_half_close: false,
+ h1_keep_alive: true,
+ h1_writev: true,
+ h2_builder: Default::default(),
+ mode: ConnectionMode::Fallback,
+ max_buf_size: None,
+ pipeline_flush: false,
+ }
+ }
+}
+
+impl<E> Http<E> {
+ /// Sets whether HTTP1 is required.
+ ///
+ /// Default is false
+ pub fn http1_only(&mut self, val: bool) -> &mut Self {
+ if val {
+ self.mode = ConnectionMode::H1Only;
+ } else {
+ self.mode = ConnectionMode::Fallback;
+ }
+ self
+ }
+
+ /// Set whether HTTP/1 connections should support half-closures.
+ ///
+ /// Clients can chose to shutdown their write-side while waiting
+ /// for the server to respond. Setting this to `true` will
+ /// prevent closing the connection immediately if `read`
+ /// detects an EOF in the middle of a request.
+ ///
+ /// Default is `false`.
+ pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
+ self.h1_half_close = val;
+ self
+ }
+
+ /// Enables or disables HTTP/1 keep-alive.
+ ///
+ /// Default is true.
+ pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
+ self.h1_keep_alive = val;
+ self
+ }
+
+ // renamed due different semantics of http2 keep alive
+ #[doc(hidden)]
+ #[deprecated(note = "renamed to `http1_keep_alive`")]
+ pub fn keep_alive(&mut self, val: bool) -> &mut Self {
+ self.http1_keep_alive(val)
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// Note that setting this to false may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Default is `true`.
+ #[inline]
+ pub fn http1_writev(&mut self, val: bool) -> &mut Self {
+ self.h1_writev = val;
+ self
+ }
+
+ /// Sets whether HTTP2 is required.
+ ///
+ /// Default is false
+ pub fn http2_only(&mut self, val: bool) -> &mut Self {
+ if val {
+ self.mode = ConnectionMode::H2Only;
+ } else {
+ self.mode = ConnectionMode::Fallback;
+ }
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.adaptive_window = false;
+ self.h2_builder.initial_stream_window_size = sz;
+ }
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ pub fn http2_initial_connection_window_size(
+ &mut self,
+ sz: impl Into<Option<u32>>,
+ ) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.adaptive_window = false;
+ self.h2_builder.initial_conn_window_size = sz;
+ }
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
+ use proto::h2::SPEC_WINDOW_SIZE;
+
+ self.h2_builder.adaptive_window = enabled;
+ if enabled {
+ self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
+ self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
+ }
+ self
+ }
+
+ /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
+ /// connections.
+ ///
+ /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
+ pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
+ self.h2_builder.max_concurrent_streams = max.into();
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ pub fn http2_keep_alive_interval(
+ &mut self,
+ interval: impl Into<Option<Duration>>,
+ ) -> &mut Self {
+ self.h2_builder.keep_alive_interval = interval.into();
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
+ self.h2_builder.keep_alive_timeout = timeout;
+ self
+ }
+
+ /// Set the maximum buffer size for the connection.
+ ///
+ /// Default is ~400kb.
+ ///
+ /// # Panics
+ ///
+ /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
+ pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
+ assert!(
+ max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
+ "the max_buf_size cannot be smaller than the minimum that h1 specifies."
+ );
+ self.max_buf_size = Some(max);
+ self
+ }
+
+ /// Aggregates flushes to better support pipelined responses.
+ ///
+ /// Experimental, may have bugs.
+ ///
+ /// Default is false.
+ pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
+ self.pipeline_flush = enabled;
+ self
+ }
+
+ /// Set the executor used to spawn background tasks.
+ ///
+ /// Default uses implicit default (like `tokio::spawn`).
+ pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
+ Http {
+ exec,
+ h1_half_close: self.h1_half_close,
+ h1_keep_alive: self.h1_keep_alive,
+ h1_writev: self.h1_writev,
+ h2_builder: self.h2_builder,
+ mode: self.mode,
+ max_buf_size: self.max_buf_size,
+ pipeline_flush: self.pipeline_flush,
+ }
+ }
+
+ /// Bind a connection together with a [`Service`](crate::service::Service).
+ ///
+ /// This returns a Future that must be polled in order for HTTP to be
+ /// driven on the connection.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use hyper::{Body, Request, Response};
+ /// # use hyper::service::Service;
+ /// # use hyper::server::conn::Http;
+ /// # use tokio::io::{AsyncRead, AsyncWrite};
+ /// # async fn run<I, S>(some_io: I, some_service: S)
+ /// # where
+ /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ /// # S: Service<hyper::Request<Body>, Response=hyper::Response<Body>> + Send + 'static,
+ /// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ /// # S::Future: Send,
+ /// # {
+ /// let http = Http::new();
+ /// let conn = http.serve_connection(some_io, some_service);
+ ///
+ /// if let Err(e) = conn.await {
+ /// eprintln!("server connection error: {}", e);
+ /// }
+ /// # }
+ /// # fn main() {}
+ /// ```
+ pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
+ where
+ S: HttpService<Body, ResBody = Bd>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ Bd: Payload,
+ I: AsyncRead + AsyncWrite + Unpin,
+ E: H2Exec<S::Future, Bd>,
+ {
+ let proto = match self.mode {
+ ConnectionMode::H1Only | ConnectionMode::Fallback => {
+ let mut conn = proto::Conn::new(io);
+ if !self.h1_keep_alive {
+ conn.disable_keep_alive();
+ }
+ if self.h1_half_close {
+ conn.set_allow_half_close();
+ }
+ if !self.h1_writev {
+ conn.set_write_strategy_flatten();
+ }
+ conn.set_flush_pipeline(self.pipeline_flush);
+ if let Some(max) = self.max_buf_size {
+ conn.set_max_buf_size(max);
+ }
+ let sd = proto::h1::dispatch::Server::new(service);
+ ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn))
+ }
+ ConnectionMode::H2Only => {
+ let rewind_io = Rewind::new(io);
+ let h2 =
+ proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
+ ProtoServer::H2(h2)
+ }
+ };
+
+ Connection {
+ conn: Some(proto),
+ fallback: if self.mode == ConnectionMode::Fallback {
+ Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
+ } else {
+ Fallback::Http1Only
+ },
+ }
+ }
+
+ pub(super) fn serve<I, IO, IE, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
+ where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin,
+ S: MakeServiceRef<IO, Body, ResBody = Bd>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ Bd: Payload,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, Bd>,
+ {
+ Serve {
+ incoming,
+ make_service,
+ protocol: self.clone(),
+ }
+ }
+}
+
+// ===== impl Connection =====
+
+impl<I, B, S, E> Connection<I, S, E>
+where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ B: Payload + 'static,
+ E: H2Exec<S::Future, B>,
+{
+ /// Start a graceful shutdown process for this connection.
+ ///
+ /// This `Connection` should continue to be polled until shutdown
+ /// can finish.
+ ///
+ /// # Note
+ ///
+ /// This should only be called while the `Connection` future is still
+ /// pending. If called after `Connection::poll` has resolved, this does
+ /// nothing.
+ pub fn graceful_shutdown(self: Pin<&mut Self>) {
+ match self.project().conn {
+ Some(ProtoServer::H1(ref mut h1)) => {
+ h1.disable_keep_alive();
+ }
+ Some(ProtoServer::H2(ref mut h2)) => {
+ h2.graceful_shutdown();
+ }
+ None => (),
+ }
+ }
+
+ /// Return the inner IO object, and additional information.
+ ///
+ /// If the IO object has been "rewound" the io will not contain those bytes rewound.
+ /// This should only be called after `poll_without_shutdown` signals
+ /// that the connection is "done". Otherwise, it may not have finished
+ /// flushing all necessary HTTP bytes.
+ ///
+ /// # Panics
+ /// This method will panic if this connection is using an h2 protocol.
+ pub fn into_parts(self) -> Parts<I, S> {
+ self.try_into_parts()
+ .unwrap_or_else(|| panic!("h2 cannot into_inner"))
+ }
+
+ /// Return the inner IO object, and additional information, if available.
+ ///
+ /// This method will return a `None` if this connection is using an h2 protocol.
+ pub fn try_into_parts(self) -> Option<Parts<I, S>> {
+ match self.conn.unwrap() {
+ ProtoServer::H1(h1) => {
+ let (io, read_buf, dispatch) = h1.into_inner();
+ Some(Parts {
+ io,
+ read_buf,
+ service: dispatch.into_service(),
+ _inner: (),
+ })
+ }
+ ProtoServer::H2(_h2) => None,
+ }
+ }
+
+ /// Poll the connection for completion, but without calling `shutdown`
+ /// on the underlying IO.
+ ///
+ /// This is useful to allow running a connection while doing an HTTP
+ /// upgrade. Once the upgrade is completed, the connection would be "done",
+ /// but it is not desired to actually shutdown the IO object. Instead you
+ /// would take it back using `into_parts`.
+ ///
+ /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
+ /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
+ /// to work with this function; or use the `without_shutdown` wrapper.
+ pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
+ where
+ S: Unpin,
+ S::Future: Unpin,
+ B: Unpin,
+ {
+ loop {
+ let polled = match *self.conn.as_mut().unwrap() {
+ ProtoServer::H1(ref mut h1) => h1.poll_without_shutdown(cx),
+ ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()),
+ };
+ match ready!(polled) {
+ Ok(()) => return Poll::Ready(Ok(())),
+ Err(e) => match *e.kind() {
+ Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
+ self.upgrade_h2();
+ continue;
+ }
+ _ => return Poll::Ready(Err(e)),
+ },
+ }
+ }
+ }
+
+ /// Prevent shutdown of the underlying IO object at the end of service the request,
+ /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
+ pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
+ where
+ S: Unpin,
+ S::Future: Unpin,
+ B: Unpin,
+ {
+ let mut conn = Some(self);
+ futures_util::future::poll_fn(move |cx| {
+ ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
+ Poll::Ready(Ok(conn.take().unwrap().into_parts()))
+ })
+ }
+
+ fn upgrade_h2(&mut self) {
+ trace!("Trying to upgrade connection to h2");
+ let conn = self.conn.take();
+
+ let (io, read_buf, dispatch) = match conn.unwrap() {
+ ProtoServer::H1(h1) => h1.into_inner(),
+ ProtoServer::H2(_h2) => {
+ panic!("h2 cannot into_inner");
+ }
+ };
+ let mut rewind_io = Rewind::new(io);
+ rewind_io.rewind(read_buf);
+ let (builder, exec) = match self.fallback {
+ Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
+ Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
+ };
+ let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
+
+ debug_assert!(self.conn.is_none());
+ self.conn = Some(ProtoServer::H2(h2));
+ }
+
+ /// Enable this connection to support higher-level HTTP upgrades.
+ ///
+ /// See [the `upgrade` module](crate::upgrade) for more.
+ pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
+ where
+ I: Send,
+ {
+ UpgradeableConnection { inner: self }
+ }
+}
+
+impl<I, B, S, E> Future for Connection<I, S, E>
+where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin + 'static,
+ B: Payload + 'static,
+ E: H2Exec<S::Future, B>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ loop {
+ match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
+ Ok(done) => {
+ if let proto::Dispatched::Upgrade(pending) = done {
+ // With no `Send` bound on `I`, we can't try to do
+ // upgrades here. In case a user was trying to use
+ // `Body::on_upgrade` with this API, send a special
+ // error letting them know about that.
+ pending.manual();
+ }
+ return Poll::Ready(Ok(()));
+ }
+ Err(e) => match *e.kind() {
+ Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
+ self.upgrade_h2();
+ continue;
+ }
+ _ => return Poll::Ready(Err(e)),
+ },
+ }
+ }
+ }
+}
+
+impl<I, S> fmt::Debug for Connection<I, S>
+where
+ S: HttpService<Body>,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Connection").finish()
+ }
+}
+// ===== impl Serve =====
+
+impl<I, S, E> Serve<I, S, E> {
+ /// Get a reference to the incoming stream.
+ #[inline]
+ pub fn incoming_ref(&self) -> &I {
+ &self.incoming
+ }
+
+ /*
+ /// Get a mutable reference to the incoming stream.
+ #[inline]
+ pub fn incoming_mut(&mut self) -> &mut I {
+ &mut self.incoming
+ }
+ */
+
+ /// Spawn all incoming connections onto the executor in `Http`.
+ pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
+ SpawnAll { serve: self }
+ }
+}
+
+impl<I, IO, IE, S, B, E> Serve<I, S, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IO: AsyncRead + AsyncWrite + Unpin,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ B: Payload,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
+{
+ fn poll_next_(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
+ let me = self.project();
+ match ready!(me.make_service.poll_ready_ref(cx)) {
+ Ok(()) => (),
+ Err(e) => {
+ trace!("make_service closed");
+ return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
+ }
+ }
+
+ if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
+ let io = item.map_err(crate::Error::new_accept)?;
+ let new_fut = me.make_service.make_service_ref(&io);
+ Poll::Ready(Some(Ok(Connecting {
+ future: new_fut,
+ io: Some(io),
+ protocol: me.protocol.clone(),
+ })))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+}
+
+// ===== impl Connecting =====
+
+impl<I, F, S, FE, E, B> Future for Connecting<I, F, E>
+where
+ I: AsyncRead + AsyncWrite + Unpin,
+ F: Future<Output = Result<S, FE>>,
+ S: HttpService<Body, ResBody = B>,
+ B: Payload,
+ E: H2Exec<S::Future, B>,
+{
+ type Output = Result<Connection<I, S, E>, FE>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ let service = ready!(me.future.poll(cx))?;
+ let io = me.io.take().expect("polled after complete");
+ Poll::Ready(Ok(me.protocol.serve_connection(io, service)))
+ }
+}
+
+// ===== impl SpawnAll =====
+
+#[cfg(feature = "tcp")]
+impl<S, E> SpawnAll<AddrIncoming, S, E> {
+ pub(super) fn local_addr(&self) -> SocketAddr {
+ self.serve.incoming.local_addr()
+ }
+}
+
+impl<I, S, E> SpawnAll<I, S, E> {
+ pub(super) fn incoming_ref(&self) -> &I {
+ self.serve.incoming_ref()
+ }
+}
+
+impl<I, IO, IE, S, B, E> SpawnAll<I, S, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ B: Payload,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
+{
+ pub(super) fn poll_watch<W>(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ watcher: &W,
+ ) -> Poll<crate::Result<()>>
+ where
+ E: NewSvcExec<IO, S::Future, S::Service, E, W>,
+ W: Watcher<IO, S::Service, E>,
+ {
+ let mut me = self.project();
+ loop {
+ if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
+ let fut = NewSvcTask::new(connecting, watcher.clone());
+ me.serve
+ .as_mut()
+ .project()
+ .protocol
+ .exec
+ .execute_new_svc(fut);
+ } else {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+}
+
+// ===== impl ProtoServer =====
+
+impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
+where
+ T: AsyncRead + AsyncWrite + Unpin,
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Payload,
+ E: H2Exec<S::Future, B>,
+{
+ type Output = crate::Result<proto::Dispatched>;
+
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ #[project]
+ match self.project() {
+ ProtoServer::H1(s) => s.poll(cx),
+ ProtoServer::H2(s) => s.poll(cx),
+ }
+ }
+}
+
+pub(crate) mod spawn_all {
+ use std::error::Error as StdError;
+ use tokio::io::{AsyncRead, AsyncWrite};
+
+ use super::{Connecting, UpgradeableConnection};
+ use crate::body::{Body, Payload};
+ use crate::common::exec::H2Exec;
+ use crate::common::{task, Future, Pin, Poll, Unpin};
+ use crate::service::HttpService;
+ use pin_project::{pin_project, project};
+
+ // Used by `SpawnAll` to optionally watch a `Connection` future.
+ //
+ // The regular `hyper::Server` just uses a `NoopWatcher`, which does
+ // not need to watch anything, and so returns the `Connection` untouched.
+ //
+ // The `Server::with_graceful_shutdown` needs to keep track of all active
+ // connections, and signal that they start to shutdown when prompted, so
+ // it has a `GracefulWatcher` implementation to do that.
+ pub trait Watcher<I, S: HttpService<Body>, E>: Clone {
+ type Future: Future<Output = crate::Result<()>>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
+ }
+
+ #[allow(missing_debug_implementations)]
+ #[derive(Copy, Clone)]
+ pub struct NoopWatcher;
+
+ impl<I, S, E> Watcher<I, S, E> for NoopWatcher
+ where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: HttpService<Body>,
+ E: H2Exec<S::Future, S::ResBody>,
+ {
+ type Future = UpgradeableConnection<I, S, E>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
+ conn
+ }
+ }
+
+ // This is a `Future<Item=(), Error=()>` spawned to an `Executor` inside
+ // the `SpawnAll`. By being a nameable type, we can be generic over the
+ // user's `Service::Future`, and thus an `Executor` can execute it.
+ //
+ // Doing this allows for the server to conditionally require `Send` futures,
+ // depending on the `Executor` configured.
+ //
+ // Users cannot import this type, nor the associated `NewSvcExec`. Instead,
+ // a blanket implementation for `Executor<impl Future>` is sufficient.
+
+ #[pin_project]
+ #[allow(missing_debug_implementations)]
+ pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
+ #[pin]
+ state: State<I, N, S, E, W>,
+ }
+
+ #[pin_project]
+ pub enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
+ Connecting(#[pin] Connecting<I, N, E>, W),
+ Connected(#[pin] W::Future),
+ }
+
+ impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
+ pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
+ NewSvcTask {
+ state: State::Connecting(connecting, watcher),
+ }
+ }
+ }
+
+ impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W>
+ where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ N: Future<Output = Result<S, NE>>,
+ NE: Into<Box<dyn StdError + Send + Sync>>,
+ S: HttpService<Body, ResBody = B>,
+ B: Payload,
+ E: H2Exec<S::Future, B>,
+ W: Watcher<I, S, E>,
+ {
+ type Output = ();
+
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ // If it weren't for needing to name this type so the `Send` bounds
+ // could be projected to the `Serve` executor, this could just be
+ // an `async fn`, and much safer. Woe is me.
+
+ let mut me = self.project();
+ loop {
+ let next = {
+ #[project]
+ match me.state.as_mut().project() {
+ State::Connecting(connecting, watcher) => {
+ let res = ready!(connecting.poll(cx));
+ let conn = match res {
+ Ok(conn) => conn,
+ Err(err) => {
+ let err = crate::Error::new_user_make_service(err);
+ debug!("connecting error: {}", err);
+ return Poll::Ready(());
+ }
+ };
+ let connected = watcher.watch(conn.with_upgrades());
+ State::Connected(connected)
+ }
+ State::Connected(future) => {
+ return future.poll(cx).map(|res| {
+ if let Err(err) = res {
+ debug!("connection error: {}", err);
+ }
+ });
+ }
+ }
+ };
+
+ me.state.set(next);
+ }
+ }
+ }
+}
+
+mod upgrades {
+ use super::*;
+
+ // A future binding a connection with a Service with Upgrade support.
+ //
+ // This type is unnameable outside the crate, and so basically just an
+ // `impl Future`, without requiring Rust 1.26.
+ #[must_use = "futures do nothing unless polled"]
+ #[allow(missing_debug_implementations)]
+ pub struct UpgradeableConnection<T, S, E>
+ where
+ S: HttpService<Body>,
+ {
+ pub(super) inner: Connection<T, S, E>,
+ }
+
+ impl<I, B, S, E> UpgradeableConnection<I, S, E>
+ where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ B: Payload + 'static,
+ E: H2Exec<S::Future, B>,
+ {
+ /// Start a graceful shutdown process for this connection.
+ ///
+ /// This `Connection` should continue to be polled until shutdown
+ /// can finish.
+ pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
+ Pin::new(&mut self.inner).graceful_shutdown()
+ }
+ }
+
+ impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
+ where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ B: Payload + 'static,
+ E: super::H2Exec<S::Future, B>,
+ {
+ type Output = crate::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ loop {
+ match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
+ Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
+ Ok(proto::Dispatched::Upgrade(pending)) => {
+ let h1 = match mem::replace(&mut self.inner.conn, None) {
+ Some(ProtoServer::H1(h1)) => h1,
+ _ => unreachable!("Upgrade expects h1"),
+ };
+
+ let (io, buf, _) = h1.into_inner();
+ pending.fulfill(Upgraded::new(io, buf));
+ return Poll::Ready(Ok(()));
+ }
+ Err(e) => match *e.kind() {
+ Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
+ self.inner.upgrade_h2();
+ continue;
+ }
+ _ => return Poll::Ready(Err(e)),
+ },
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/hyper/src/server/mod.rs b/third_party/rust/hyper/src/server/mod.rs
new file mode 100644
index 0000000000..ed6068c867
--- /dev/null
+++ b/third_party/rust/hyper/src/server/mod.rs
@@ -0,0 +1,480 @@
+//! HTTP Server
+//!
+//! A `Server` is created to listen on a port, parse HTTP requests, and hand
+//! them off to a `Service`.
+//!
+//! There are two levels of APIs provide for constructing HTTP servers:
+//!
+//! - The higher-level [`Server`](Server) type.
+//! - The lower-level [`conn`](conn) module.
+//!
+//! # Server
+//!
+//! The [`Server`](Server) is main way to start listening for HTTP requests.
+//! It wraps a listener with a [`MakeService`](crate::service), and then should
+//! be executed to start serving requests.
+//!
+//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default.
+//!
+//! ## Example
+//!
+//! ```no_run
+//! use std::convert::Infallible;
+//! use std::net::SocketAddr;
+//! use hyper::{Body, Request, Response, Server};
+//! use hyper::service::{make_service_fn, service_fn};
+//!
+//! async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
+//! Ok(Response::new(Body::from("Hello World")))
+//! }
+//!
+//! # #[cfg(feature = "runtime")]
+//! #[tokio::main]
+//! async fn main() {
+//! // Construct our SocketAddr to listen on...
+//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
+//!
+//! // And a MakeService to handle each connection...
+//! let make_service = make_service_fn(|_conn| async {
+//! Ok::<_, Infallible>(service_fn(handle))
+//! });
+//!
+//! // Then bind and serve...
+//! let server = Server::bind(&addr).serve(make_service);
+//!
+//! // And run forever...
+//! if let Err(e) = server.await {
+//! eprintln!("server error: {}", e);
+//! }
+//! }
+//! # #[cfg(not(feature = "runtime"))]
+//! # fn main() {}
+//! ```
+
+pub mod accept;
+pub mod conn;
+mod shutdown;
+#[cfg(feature = "tcp")]
+mod tcp;
+
+use std::error::Error as StdError;
+use std::fmt;
+#[cfg(feature = "tcp")]
+use std::net::{SocketAddr, TcpListener as StdTcpListener};
+
+#[cfg(feature = "tcp")]
+use std::time::Duration;
+
+use pin_project::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use self::accept::Accept;
+use crate::body::{Body, Payload};
+use crate::common::exec::{Exec, H2Exec, NewSvcExec};
+use crate::common::{task, Future, Pin, Poll, Unpin};
+use crate::service::{HttpService, MakeServiceRef};
+// Renamed `Http` as `Http_` for now so that people upgrading don't see an
+// error that `hyper::server::Http` is private...
+use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
+use self::shutdown::{Graceful, GracefulWatcher};
+#[cfg(feature = "tcp")]
+use self::tcp::AddrIncoming;
+
+/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
+///
+/// `Server` is a `Future` mapping a bound listener with a set of service
+/// handlers. It is built using the [`Builder`](Builder), and the future
+/// completes when the server has been shutdown. It should be run by an
+/// `Executor`.
+#[pin_project]
+pub struct Server<I, S, E = Exec> {
+ #[pin]
+ spawn_all: SpawnAll<I, S, E>,
+}
+
+/// A builder for a [`Server`](Server).
+#[derive(Debug)]
+pub struct Builder<I, E = Exec> {
+ incoming: I,
+ protocol: Http_<E>,
+}
+
+// ===== impl Server =====
+
+impl<I> Server<I, ()> {
+ /// Starts a [`Builder`](Builder) with the provided incoming stream.
+ pub fn builder(incoming: I) -> Builder<I> {
+ Builder {
+ incoming,
+ protocol: Http_::new(),
+ }
+ }
+}
+
+#[cfg(feature = "tcp")]
+impl Server<AddrIncoming, ()> {
+ /// Binds to the provided address, and returns a [`Builder`](Builder).
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if binding to the address fails. For a method
+ /// to bind to an address and return a `Result`, see `Server::try_bind`.
+ pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
+ let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
+ panic!("error binding to {}: {}", addr, e);
+ });
+ Server::builder(incoming)
+ }
+
+ /// Tries to bind to the provided address, and returns a [`Builder`](Builder).
+ pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
+ AddrIncoming::new(addr).map(Server::builder)
+ }
+
+ /// Create a new instance from a `std::net::TcpListener` instance.
+ pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
+ AddrIncoming::from_std(listener).map(Server::builder)
+ }
+}
+
+#[cfg(feature = "tcp")]
+impl<S, E> Server<AddrIncoming, S, E> {
+ /// Returns the local address that this server is bound to.
+ pub fn local_addr(&self) -> SocketAddr {
+ self.spawn_all.local_addr()
+ }
+}
+
+impl<I, IO, IE, S, E, B> Server<I, S, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Payload,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
+{
+ /// Prepares a server to handle graceful shutdown when the provided future
+ /// completes.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # fn main() {}
+ /// # #[cfg(feature = "tcp")]
+ /// # async fn run() {
+ /// # use hyper::{Body, Response, Server, Error};
+ /// # use hyper::service::{make_service_fn, service_fn};
+ /// # let make_service = make_service_fn(|_| async {
+ /// # Ok::<_, Error>(service_fn(|_req| async {
+ /// # Ok::<_, Error>(Response::new(Body::from("Hello World")))
+ /// # }))
+ /// # });
+ /// // Make a server from the previous examples...
+ /// let server = Server::bind(&([127, 0, 0, 1], 3000).into())
+ /// .serve(make_service);
+ ///
+ /// // Prepare some signal for when the server should start shutting down...
+ /// let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+ /// let graceful = server
+ /// .with_graceful_shutdown(async {
+ /// rx.await.ok();
+ /// });
+ ///
+ /// // Await the `server` receiving the signal...
+ /// if let Err(e) = graceful.await {
+ /// eprintln!("server error: {}", e);
+ /// }
+ ///
+ /// // And later, trigger the signal by calling `tx.send(())`.
+ /// let _ = tx.send(());
+ /// # }
+ /// ```
+ pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
+ where
+ F: Future<Output = ()>,
+ {
+ Graceful::new(self.spawn_all, signal)
+ }
+}
+
+impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Payload,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.project().spawn_all.poll_watch(cx, &NoopWatcher)
+ }
+}
+
+impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Server")
+ .field("listener", &self.spawn_all.incoming_ref())
+ .finish()
+ }
+}
+
+// ===== impl Builder =====
+
+impl<I, E> Builder<I, E> {
+ /// Start a new builder, wrapping an incoming stream and low-level options.
+ ///
+ /// For a more convenient constructor, see [`Server::bind`](Server::bind).
+ pub fn new(incoming: I, protocol: Http_<E>) -> Self {
+ Builder { incoming, protocol }
+ }
+
+ /// Sets whether to use keep-alive for HTTP/1 connections.
+ ///
+ /// Default is `true`.
+ pub fn http1_keepalive(mut self, val: bool) -> Self {
+ self.protocol.http1_keep_alive(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should support half-closures.
+ ///
+ /// Clients can chose to shutdown their write-side while waiting
+ /// for the server to respond. Setting this to `true` will
+ /// prevent closing the connection immediately if `read`
+ /// detects an EOF in the middle of a request.
+ ///
+ /// Default is `false`.
+ pub fn http1_half_close(mut self, val: bool) -> Self {
+ self.protocol.http1_half_close(val);
+ self
+ }
+
+ /// Set the maximum buffer size.
+ ///
+ /// Default is ~ 400kb.
+ pub fn http1_max_buf_size(mut self, val: usize) -> Self {
+ self.protocol.max_buf_size(val);
+ self
+ }
+
+ // Sets whether to bunch up HTTP/1 writes until the read buffer is empty.
+ //
+ // This isn't really desirable in most cases, only really being useful in
+ // silly pipeline benchmarks.
+ #[doc(hidden)]
+ pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
+ self.protocol.pipeline_flush(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// # Note
+ ///
+ /// Setting this to `false` may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Default is `true`.
+ pub fn http1_writev(mut self, val: bool) -> Self {
+ self.protocol.http1_writev(val);
+ self
+ }
+
+ /// Sets whether HTTP/1 is required.
+ ///
+ /// Default is `false`.
+ pub fn http1_only(mut self, val: bool) -> Self {
+ self.protocol.http1_only(val);
+ self
+ }
+
+ /// Sets whether HTTP/2 is required.
+ ///
+ /// Default is `false`.
+ pub fn http2_only(mut self, val: bool) -> Self {
+ self.protocol.http2_only(val);
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
+ self.protocol.http2_initial_stream_window_size(sz.into());
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
+ self.protocol
+ .http2_initial_connection_window_size(sz.into());
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
+ self.protocol.http2_adaptive_window(enabled);
+ self
+ }
+
+ /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
+ /// connections.
+ ///
+ /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
+ pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
+ self.protocol.http2_max_concurrent_streams(max.into());
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
+ self.protocol.http2_keep_alive_interval(interval);
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
+ self.protocol.http2_keep_alive_timeout(timeout);
+ self
+ }
+
+ /// Sets the `Executor` to deal with connection tasks.
+ ///
+ /// Default is `tokio::spawn`.
+ pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
+ Builder {
+ incoming: self.incoming,
+ protocol: self.protocol.with_executor(executor),
+ }
+ }
+
+ /// Consume this `Builder`, creating a [`Server`](Server).
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # #[cfg(feature = "tcp")]
+ /// # async fn run() {
+ /// use hyper::{Body, Error, Response, Server};
+ /// use hyper::service::{make_service_fn, service_fn};
+ ///
+ /// // Construct our SocketAddr to listen on...
+ /// let addr = ([127, 0, 0, 1], 3000).into();
+ ///
+ /// // And a MakeService to handle each connection...
+ /// let make_svc = make_service_fn(|_| async {
+ /// Ok::<_, Error>(service_fn(|_req| async {
+ /// Ok::<_, Error>(Response::new(Body::from("Hello World")))
+ /// }))
+ /// });
+ ///
+ /// // Then bind and serve...
+ /// let server = Server::bind(&addr)
+ /// .serve(make_svc);
+ ///
+ /// // Run forever-ish...
+ /// if let Err(err) = server.await {
+ /// eprintln!("server error: {}", err);
+ /// }
+ /// # }
+ /// ```
+ pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
+ where
+ I: Accept,
+ I::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<I::Conn, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Payload,
+ E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
+ {
+ let serve = self.protocol.serve(self.incoming, new_service);
+ let spawn_all = serve.spawn_all();
+ Server { spawn_all }
+ }
+}
+
+#[cfg(feature = "tcp")]
+impl<E> Builder<AddrIncoming, E> {
+ /// Set whether TCP keepalive messages are enabled on accepted connections.
+ ///
+ /// If `None` is specified, keepalive is disabled, otherwise the duration
+ /// specified will be the time to remain idle before sending TCP keepalive
+ /// probes.
+ pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
+ self.incoming.set_keepalive(keepalive);
+ self
+ }
+
+ /// Set the value of `TCP_NODELAY` option for accepted connections.
+ pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
+ self.incoming.set_nodelay(enabled);
+ self
+ }
+
+ /// Set whether to sleep on accept errors.
+ ///
+ /// A possible scenario is that the process has hit the max open files
+ /// allowed, and so trying to accept a new connection will fail with
+ /// EMFILE. In some cases, it's preferable to just wait for some time, if
+ /// the application will likely close some files (or connections), and try
+ /// to accept the connection again. If this option is true, the error will
+ /// be logged at the error level, since it is still a big deal, and then
+ /// the listener will sleep for 1 second.
+ ///
+ /// In other cases, hitting the max open files should be treat similarly
+ /// to being out-of-memory, and simply error (and shutdown). Setting this
+ /// option to false will allow that.
+ ///
+ /// For more details see [`AddrIncoming::set_sleep_on_errors`]
+ pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
+ self.incoming.set_sleep_on_errors(val);
+ self
+ }
+}
diff --git a/third_party/rust/hyper/src/server/shutdown.rs b/third_party/rust/hyper/src/server/shutdown.rs
new file mode 100644
index 0000000000..1dc668ce40
--- /dev/null
+++ b/third_party/rust/hyper/src/server/shutdown.rs
@@ -0,0 +1,119 @@
+use std::error::Error as StdError;
+
+use pin_project::{pin_project, project};
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
+use super::Accept;
+use crate::body::{Body, Payload};
+use crate::common::drain::{self, Draining, Signal, Watch, Watching};
+use crate::common::exec::{H2Exec, NewSvcExec};
+use crate::common::{task, Future, Pin, Poll, Unpin};
+use crate::service::{HttpService, MakeServiceRef};
+
+#[allow(missing_debug_implementations)]
+#[pin_project]
+pub struct Graceful<I, S, F, E> {
+ #[pin]
+ state: State<I, S, F, E>,
+}
+
+#[pin_project]
+pub(super) enum State<I, S, F, E> {
+ Running {
+ drain: Option<(Signal, Watch)>,
+ #[pin]
+ spawn_all: SpawnAll<I, S, E>,
+ #[pin]
+ signal: F,
+ },
+ Draining(Draining),
+}
+
+impl<I, S, F, E> Graceful<I, S, F, E> {
+ pub(super) fn new(spawn_all: SpawnAll<I, S, E>, signal: F) -> Self {
+ let drain = Some(drain::channel());
+ Graceful {
+ state: State::Running {
+ drain,
+ spawn_all,
+ signal,
+ },
+ }
+ }
+}
+
+impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Payload,
+ F: Future<Output = ()>,
+ E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
+{
+ type Output = crate::Result<()>;
+
+ #[project]
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ let next = {
+ #[project]
+ match me.state.as_mut().project() {
+ State::Running {
+ drain,
+ spawn_all,
+ signal,
+ } => match signal.poll(cx) {
+ Poll::Ready(()) => {
+ debug!("signal received, starting graceful shutdown");
+ let sig = drain.take().expect("drain channel").0;
+ State::Draining(sig.drain())
+ }
+ Poll::Pending => {
+ let watch = drain.as_ref().expect("drain channel").1.clone();
+ return spawn_all.poll_watch(cx, &GracefulWatcher(watch));
+ }
+ },
+ State::Draining(ref mut draining) => {
+ return Pin::new(draining).poll(cx).map(Ok);
+ }
+ }
+ };
+ me.state.set(next);
+ }
+ }
+}
+
+#[allow(missing_debug_implementations)]
+#[derive(Clone)]
+pub struct GracefulWatcher(Watch);
+
+impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
+where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: HttpService<Body>,
+ E: H2Exec<S::Future, S::ResBody>,
+{
+ type Future =
+ Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
+ self.0.clone().watch(conn, on_drain)
+ }
+}
+
+fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
+where
+ S: HttpService<Body>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ S::ResBody: Payload + 'static,
+ E: H2Exec<S::Future, S::ResBody>,
+{
+ conn.graceful_shutdown()
+}
diff --git a/third_party/rust/hyper/src/server/tcp.rs b/third_party/rust/hyper/src/server/tcp.rs
new file mode 100644
index 0000000000..b823818693
--- /dev/null
+++ b/third_party/rust/hyper/src/server/tcp.rs
@@ -0,0 +1,299 @@
+use std::fmt;
+use std::io;
+use std::net::{SocketAddr, TcpListener as StdTcpListener};
+use std::time::Duration;
+
+use futures_util::FutureExt as _;
+use tokio::net::TcpListener;
+use tokio::time::Delay;
+
+use crate::common::{task, Future, Pin, Poll};
+
+pub use self::addr_stream::AddrStream;
+use super::Accept;
+
+/// A stream of connections from binding to an address.
+#[must_use = "streams do nothing unless polled"]
+pub struct AddrIncoming {
+ addr: SocketAddr,
+ listener: TcpListener,
+ sleep_on_errors: bool,
+ tcp_keepalive_timeout: Option<Duration>,
+ tcp_nodelay: bool,
+ timeout: Option<Delay>,
+}
+
+impl AddrIncoming {
+ pub(super) fn new(addr: &SocketAddr) -> crate::Result<Self> {
+ let std_listener = StdTcpListener::bind(addr).map_err(crate::Error::new_listen)?;
+
+ AddrIncoming::from_std(std_listener)
+ }
+
+ pub(super) fn from_std(std_listener: StdTcpListener) -> crate::Result<Self> {
+ let listener = TcpListener::from_std(std_listener).map_err(crate::Error::new_listen)?;
+ let addr = listener.local_addr().map_err(crate::Error::new_listen)?;
+ Ok(AddrIncoming {
+ listener,
+ addr,
+ sleep_on_errors: true,
+ tcp_keepalive_timeout: None,
+ tcp_nodelay: false,
+ timeout: None,
+ })
+ }
+
+ /// Creates a new `AddrIncoming` binding to provided socket address.
+ pub fn bind(addr: &SocketAddr) -> crate::Result<Self> {
+ AddrIncoming::new(addr)
+ }
+
+ /// Get the local address bound to this listener.
+ pub fn local_addr(&self) -> SocketAddr {
+ self.addr
+ }
+
+ /// Set whether TCP keepalive messages are enabled on accepted connections.
+ ///
+ /// If `None` is specified, keepalive is disabled, otherwise the duration
+ /// specified will be the time to remain idle before sending TCP keepalive
+ /// probes.
+ pub fn set_keepalive(&mut self, keepalive: Option<Duration>) -> &mut Self {
+ self.tcp_keepalive_timeout = keepalive;
+ self
+ }
+
+ /// Set the value of `TCP_NODELAY` option for accepted connections.
+ pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self {
+ self.tcp_nodelay = enabled;
+ self
+ }
+
+ /// Set whether to sleep on accept errors.
+ ///
+ /// A possible scenario is that the process has hit the max open files
+ /// allowed, and so trying to accept a new connection will fail with
+ /// `EMFILE`. In some cases, it's preferable to just wait for some time, if
+ /// the application will likely close some files (or connections), and try
+ /// to accept the connection again. If this option is `true`, the error
+ /// will be logged at the `error` level, since it is still a big deal,
+ /// and then the listener will sleep for 1 second.
+ ///
+ /// In other cases, hitting the max open files should be treat similarly
+ /// to being out-of-memory, and simply error (and shutdown). Setting
+ /// this option to `false` will allow that.
+ ///
+ /// Default is `true`.
+ pub fn set_sleep_on_errors(&mut self, val: bool) {
+ self.sleep_on_errors = val;
+ }
+
+ fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> {
+ // Check if a previous timeout is active that was set by IO errors.
+ if let Some(ref mut to) = self.timeout {
+ match Pin::new(to).poll(cx) {
+ Poll::Ready(()) => {}
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ self.timeout = None;
+
+ let accept = self.listener.accept();
+ futures_util::pin_mut!(accept);
+
+ loop {
+ match accept.poll_unpin(cx) {
+ Poll::Ready(Ok((socket, addr))) => {
+ if let Some(dur) = self.tcp_keepalive_timeout {
+ if let Err(e) = socket.set_keepalive(Some(dur)) {
+ trace!("error trying to set TCP keepalive: {}", e);
+ }
+ }
+ if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
+ trace!("error trying to set TCP nodelay: {}", e);
+ }
+ return Poll::Ready(Ok(AddrStream::new(socket, addr)));
+ }
+ Poll::Pending => return Poll::Pending,
+ Poll::Ready(Err(e)) => {
+ // Connection errors can be ignored directly, continue by
+ // accepting the next request.
+ if is_connection_error(&e) {
+ debug!("accepted connection already errored: {}", e);
+ continue;
+ }
+
+ if self.sleep_on_errors {
+ error!("accept error: {}", e);
+
+ // Sleep 1s.
+ let mut timeout = tokio::time::delay_for(Duration::from_secs(1));
+
+ match Pin::new(&mut timeout).poll(cx) {
+ Poll::Ready(()) => {
+ // Wow, it's been a second already? Ok then...
+ continue;
+ }
+ Poll::Pending => {
+ self.timeout = Some(timeout);
+ return Poll::Pending;
+ }
+ }
+ } else {
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+ }
+}
+
+impl Accept for AddrIncoming {
+ type Conn = AddrStream;
+ type Error = io::Error;
+
+ fn poll_accept(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ let result = ready!(self.poll_next_(cx));
+ Poll::Ready(Some(result))
+ }
+}
+
+/// This function defines errors that are per-connection. Which basically
+/// means that if we get this error from `accept()` system call it means
+/// next connection might be ready to be accepted.
+///
+/// All other errors will incur a timeout before next `accept()` is performed.
+/// The timeout is useful to handle resource exhaustion errors like ENFILE
+/// and EMFILE. Otherwise, could enter into tight loop.
+fn is_connection_error(e: &io::Error) -> bool {
+ match e.kind() {
+ io::ErrorKind::ConnectionRefused
+ | io::ErrorKind::ConnectionAborted
+ | io::ErrorKind::ConnectionReset => true,
+ _ => false,
+ }
+}
+
+impl fmt::Debug for AddrIncoming {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AddrIncoming")
+ .field("addr", &self.addr)
+ .field("sleep_on_errors", &self.sleep_on_errors)
+ .field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout)
+ .field("tcp_nodelay", &self.tcp_nodelay)
+ .finish()
+ }
+}
+
+mod addr_stream {
+ use bytes::{Buf, BufMut};
+ use std::io;
+ use std::net::SocketAddr;
+ use tokio::io::{AsyncRead, AsyncWrite};
+ use tokio::net::TcpStream;
+
+ use crate::common::{task, Pin, Poll};
+
+ /// A transport returned yieled by `AddrIncoming`.
+ #[derive(Debug)]
+ pub struct AddrStream {
+ inner: TcpStream,
+ pub(super) remote_addr: SocketAddr,
+ }
+
+ impl AddrStream {
+ pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
+ AddrStream {
+ inner: tcp,
+ remote_addr: addr,
+ }
+ }
+
+ /// Returns the remote (peer) address of this connection.
+ #[inline]
+ pub fn remote_addr(&self) -> SocketAddr {
+ self.remote_addr
+ }
+
+ /// Consumes the AddrStream and returns the underlying IO object
+ #[inline]
+ pub fn into_inner(self) -> TcpStream {
+ self.inner
+ }
+
+ /// Attempt to receive data on the socket, without removing that data
+ /// from the queue, registering the current task for wakeup if data is
+ /// not yet available.
+ pub fn poll_peek(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_peek(cx, buf)
+ }
+ }
+
+ impl AsyncRead for AddrStream {
+ unsafe fn prepare_uninitialized_buffer(
+ &self,
+ buf: &mut [std::mem::MaybeUninit<u8>],
+ ) -> bool {
+ self.inner.prepare_uninitialized_buffer(buf)
+ }
+
+ #[inline]
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+
+ #[inline]
+ fn poll_read_buf<B: BufMut>(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_read_buf(cx, buf)
+ }
+ }
+
+ impl AsyncWrite for AddrStream {
+ #[inline]
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ #[inline]
+ fn poll_write_buf<B: Buf>(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write_buf(cx, buf)
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ // TCP flush is a noop
+ Poll::Ready(Ok(()))
+ }
+
+ #[inline]
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+ }
+}