summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/server
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/hyper/src/server
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--third_party/rust/hyper/src/server/accept.rs111
-rw-r--r--third_party/rust/hyper/src/server/conn.rs1045
-rw-r--r--third_party/rust/hyper/src/server/mod.rs172
-rw-r--r--third_party/rust/hyper/src/server/server.rs789
-rw-r--r--third_party/rust/hyper/src/server/server_stub.rs16
-rw-r--r--third_party/rust/hyper/src/server/shutdown.rs128
-rw-r--r--third_party/rust/hyper/src/server/tcp.rs484
7 files changed, 2745 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/server/accept.rs b/third_party/rust/hyper/src/server/accept.rs
new file mode 100644
index 0000000000..4b7a1487dd
--- /dev/null
+++ b/third_party/rust/hyper/src/server/accept.rs
@@ -0,0 +1,111 @@
+//! The `Accept` trait and supporting types.
+//!
+//! This module contains:
+//!
+//! - The [`Accept`](Accept) trait used to asynchronously accept incoming
+//! connections.
+//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
+
+#[cfg(feature = "stream")]
+use futures_core::Stream;
+#[cfg(feature = "stream")]
+use pin_project_lite::pin_project;
+
+use crate::common::{
+ task::{self, Poll},
+ Pin,
+};
+
+/// Asynchronously accept incoming connections.
+pub trait Accept {
+ /// The connection type that can be accepted.
+ type Conn;
+ /// The error type that can occur when accepting a connection.
+ type Error;
+
+ /// Poll to accept the next connection.
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>>;
+}
+
+/// Create an `Accept` with a polling function.
+///
+/// # Example
+///
+/// ```
+/// use std::task::Poll;
+/// use hyper::server::{accept, Server};
+///
+/// # let mock_conn = ();
+/// // If we created some mocked connection...
+/// let mut conn = Some(mock_conn);
+///
+/// // And accept just the mocked conn once...
+/// let once = accept::poll_fn(move |cx| {
+/// Poll::Ready(conn.take().map(Ok::<_, ()>))
+/// });
+///
+/// let builder = Server::builder(once);
+/// ```
+pub fn poll_fn<F, IO, E>(func: F) -> impl Accept<Conn = IO, Error = E>
+where
+ F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
+{
+ struct PollFn<F>(F);
+
+ // The closure `F` is never pinned
+ impl<F> Unpin for PollFn<F> {}
+
+ impl<F, IO, E> Accept for PollFn<F>
+ where
+ F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
+ {
+ type Conn = IO;
+ type Error = E;
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ (self.get_mut().0)(cx)
+ }
+ }
+
+ PollFn(func)
+}
+
+/// Adapt a `Stream` of incoming connections into an `Accept`.
+///
+/// # Optional
+///
+/// This function requires enabling the `stream` feature in your
+/// `Cargo.toml`.
+#[cfg(feature = "stream")]
+pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
+where
+ S: Stream<Item = Result<IO, E>>,
+{
+ pin_project! {
+ struct FromStream<S> {
+ #[pin]
+ stream: S,
+ }
+ }
+
+ impl<S, IO, E> Accept for FromStream<S>
+ where
+ S: Stream<Item = Result<IO, E>>,
+ {
+ type Conn = IO;
+ type Error = E;
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ self.project().stream.poll_next(cx)
+ }
+ }
+
+ FromStream { stream }
+}
diff --git a/third_party/rust/hyper/src/server/conn.rs b/third_party/rust/hyper/src/server/conn.rs
new file mode 100644
index 0000000000..d5370b0f14
--- /dev/null
+++ b/third_party/rust/hyper/src/server/conn.rs
@@ -0,0 +1,1045 @@
+//! Lower-level Server connection API.
+//!
+//! The types in this module are to provide a lower-level API based around a
+//! single connection. Accepting a connection and binding it with a service
+//! are not handled at this level. This module provides the building blocks to
+//! customize those things externally.
+//!
+//! If you don't have need to manage connections yourself, consider using the
+//! higher-level [Server](super) API.
+//!
+//! ## Example
+//! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream
+//! ```no_run
+//! # #[cfg(all(feature = "http1", feature = "runtime"))]
+//! # mod rt {
+//! use http::{Request, Response, StatusCode};
+//! use hyper::{server::conn::Http, service::service_fn, Body};
+//! use std::{net::SocketAddr, convert::Infallible};
+//! use tokio::net::TcpListener;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+//! let addr: SocketAddr = ([127, 0, 0, 1], 8080).into();
+//!
+//! let mut tcp_listener = TcpListener::bind(addr).await?;
+//! loop {
+//! let (tcp_stream, _) = tcp_listener.accept().await?;
+//! tokio::task::spawn(async move {
+//! if let Err(http_err) = Http::new()
+//! .http1_only(true)
+//! .http1_keep_alive(true)
+//! .serve_connection(tcp_stream, service_fn(hello))
+//! .await {
+//! eprintln!("Error while serving HTTP connection: {}", http_err);
+//! }
+//! });
+//! }
+//! }
+//!
+//! async fn hello(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
+//! Ok(Response::new(Body::from("Hello World!")))
+//! }
+//! # }
+//! ```
+
+#[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ not(all(feature = "http1", feature = "http2"))
+))]
+use std::marker::PhantomData;
+#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
+use std::time::Duration;
+
+#[cfg(feature = "http2")]
+use crate::common::io::Rewind;
+#[cfg(all(feature = "http1", feature = "http2"))]
+use crate::error::{Kind, Parse};
+#[cfg(feature = "http1")]
+use crate::upgrade::Upgraded;
+
+cfg_feature! {
+ #![any(feature = "http1", feature = "http2")]
+
+ use std::error::Error as StdError;
+ use std::fmt;
+
+ use bytes::Bytes;
+ use pin_project_lite::pin_project;
+ use tokio::io::{AsyncRead, AsyncWrite};
+ use tracing::trace;
+
+ pub use super::server::Connecting;
+ use crate::body::{Body, HttpBody};
+ use crate::common::{task, Future, Pin, Poll, Unpin};
+ #[cfg(not(all(feature = "http1", feature = "http2")))]
+ use crate::common::Never;
+ use crate::common::exec::{ConnStreamExec, Exec};
+ use crate::proto;
+ use crate::service::HttpService;
+
+ pub(super) use self::upgrades::UpgradeableConnection;
+}
+
+#[cfg(feature = "tcp")]
+pub use super::tcp::{AddrIncoming, AddrStream};
+
+/// A lower-level configuration of the HTTP protocol.
+///
+/// This structure is used to configure options for an HTTP server connection.
+///
+/// If you don't have need to manage connections yourself, consider using the
+/// higher-level [Server](super) API.
+#[derive(Clone, Debug)]
+#[cfg(any(feature = "http1", feature = "http2"))]
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+pub struct Http<E = Exec> {
+ pub(crate) exec: E,
+ h1_half_close: bool,
+ h1_keep_alive: bool,
+ h1_title_case_headers: bool,
+ h1_preserve_header_case: bool,
+ #[cfg(all(feature = "http1", feature = "runtime"))]
+ h1_header_read_timeout: Option<Duration>,
+ h1_writev: Option<bool>,
+ #[cfg(feature = "http2")]
+ h2_builder: proto::h2::server::Config,
+ mode: ConnectionMode,
+ max_buf_size: Option<usize>,
+ pipeline_flush: bool,
+}
+
+/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs.
+#[cfg(any(feature = "http1", feature = "http2"))]
+#[derive(Clone, Debug, PartialEq)]
+enum ConnectionMode {
+ /// Always use HTTP/1 and do not upgrade when a parse error occurs.
+ #[cfg(feature = "http1")]
+ H1Only,
+ /// Always use HTTP/2.
+ #[cfg(feature = "http2")]
+ H2Only,
+ /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs.
+ #[cfg(all(feature = "http1", feature = "http2"))]
+ Fallback,
+}
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+pin_project! {
+ /// A future binding a connection with a Service.
+ ///
+ /// Polling this future will drive HTTP forward.
+ #[must_use = "futures do nothing unless polled"]
+ #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+ pub struct Connection<T, S, E = Exec>
+ where
+ S: HttpService<Body>,
+ {
+ pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
+ fallback: Fallback<E>,
+ }
+}
+
+#[cfg(feature = "http1")]
+type Http1Dispatcher<T, B, S> =
+ proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
+
+#[cfg(all(not(feature = "http1"), feature = "http2"))]
+type Http1Dispatcher<T, B, S> = (Never, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
+
+#[cfg(feature = "http2")]
+type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
+
+#[cfg(all(not(feature = "http2"), feature = "http1"))]
+type Http2Server<T, B, S, E> = (
+ Never,
+ PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
+);
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+pin_project! {
+ #[project = ProtoServerProj]
+ pub(super) enum ProtoServer<T, B, S, E = Exec>
+ where
+ S: HttpService<Body>,
+ B: HttpBody,
+ {
+ H1 {
+ #[pin]
+ h1: Http1Dispatcher<T, B, S>,
+ },
+ H2 {
+ #[pin]
+ h2: Http2Server<T, B, S, E>,
+ },
+ }
+}
+
+#[cfg(all(feature = "http1", feature = "http2"))]
+#[derive(Clone, Debug)]
+enum Fallback<E> {
+ ToHttp2(proto::h2::server::Config, E),
+ Http1Only,
+}
+
+#[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ not(all(feature = "http1", feature = "http2"))
+))]
+type Fallback<E> = PhantomData<E>;
+
+#[cfg(all(feature = "http1", feature = "http2"))]
+impl<E> Fallback<E> {
+ fn to_h2(&self) -> bool {
+ match *self {
+ Fallback::ToHttp2(..) => true,
+ Fallback::Http1Only => false,
+ }
+ }
+}
+
+#[cfg(all(feature = "http1", feature = "http2"))]
+impl<E> Unpin for Fallback<E> {}
+
+/// Deconstructed parts of a `Connection`.
+///
+/// This allows taking apart a `Connection` at a later time, in order to
+/// reclaim the IO object, and additional related pieces.
+#[derive(Debug)]
+#[cfg(any(feature = "http1", feature = "http2"))]
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+pub struct Parts<T, S> {
+ /// The original IO object used in the handshake.
+ pub io: T,
+ /// A buffer of bytes that have been read but not processed as HTTP.
+ ///
+ /// If the client sent additional bytes after its last request, and
+ /// this connection "ended" with an upgrade, the read buffer will contain
+ /// those bytes.
+ ///
+ /// You will want to check for any existing bytes if you plan to continue
+ /// communicating on the IO object.
+ pub read_buf: Bytes,
+ /// The `Service` used to serve this connection.
+ pub service: S,
+ _inner: (),
+}
+
+// ===== impl Http =====
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl Http {
+ /// Creates a new instance of the HTTP protocol, ready to spawn a server or
+ /// start accepting connections.
+ pub fn new() -> Http {
+ Http {
+ exec: Exec::Default,
+ h1_half_close: false,
+ h1_keep_alive: true,
+ h1_title_case_headers: false,
+ h1_preserve_header_case: false,
+ #[cfg(all(feature = "http1", feature = "runtime"))]
+ h1_header_read_timeout: None,
+ h1_writev: None,
+ #[cfg(feature = "http2")]
+ h2_builder: Default::default(),
+ mode: ConnectionMode::default(),
+ max_buf_size: None,
+ pipeline_flush: false,
+ }
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl<E> Http<E> {
+ /// Sets whether HTTP1 is required.
+ ///
+ /// Default is false
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_only(&mut self, val: bool) -> &mut Self {
+ if val {
+ self.mode = ConnectionMode::H1Only;
+ } else {
+ #[cfg(feature = "http2")]
+ {
+ self.mode = ConnectionMode::Fallback;
+ }
+ }
+ self
+ }
+
+ /// Set whether HTTP/1 connections should support half-closures.
+ ///
+ /// Clients can chose to shutdown their write-side while waiting
+ /// for the server to respond. Setting this to `true` will
+ /// prevent closing the connection immediately if `read`
+ /// detects an EOF in the middle of a request.
+ ///
+ /// Default is `false`.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
+ self.h1_half_close = val;
+ self
+ }
+
+ /// Enables or disables HTTP/1 keep-alive.
+ ///
+ /// Default is true.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
+ self.h1_keep_alive = val;
+ self
+ }
+
+ /// Set whether HTTP/1 connections will write header names as title case at
+ /// the socket level.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
+ self.h1_title_case_headers = enabled;
+ self
+ }
+
+ /// Set whether to support preserving original header cases.
+ ///
+ /// Currently, this will record the original cases received, and store them
+ /// in a private extension on the `Request`. It will also look for and use
+ /// such an extension in any provided `Response`.
+ ///
+ /// Since the relevant extension is still private, there is no way to
+ /// interact with the original cases. The only effect this can have now is
+ /// to forward the cases in a proxy-like fashion.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
+ self.h1_preserve_header_case = enabled;
+ self
+ }
+
+ /// Set a timeout for reading client request headers. If a client does not
+ /// transmit the entire header within this time, the connection is closed.
+ ///
+ /// Default is None.
+ #[cfg(all(feature = "http1", feature = "runtime"))]
+ #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
+ pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
+ self.h1_header_read_timeout = Some(read_timeout);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// Note that setting this to false may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Setting this to true will force hyper to use queued strategy
+ /// which may eliminate unnecessary cloning on some TLS backends
+ ///
+ /// Default is `auto`. In this mode hyper will try to guess which
+ /// mode to use
+ #[inline]
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_writev(&mut self, val: bool) -> &mut Self {
+ self.h1_writev = Some(val);
+ self
+ }
+
+ /// Sets whether HTTP2 is required.
+ ///
+ /// Default is false
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_only(&mut self, val: bool) -> &mut Self {
+ if val {
+ self.mode = ConnectionMode::H2Only;
+ } else {
+ #[cfg(feature = "http1")]
+ {
+ self.mode = ConnectionMode::Fallback;
+ }
+ }
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.adaptive_window = false;
+ self.h2_builder.initial_stream_window_size = sz;
+ }
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_connection_window_size(
+ &mut self,
+ sz: impl Into<Option<u32>>,
+ ) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.adaptive_window = false;
+ self.h2_builder.initial_conn_window_size = sz;
+ }
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
+ use proto::h2::SPEC_WINDOW_SIZE;
+
+ self.h2_builder.adaptive_window = enabled;
+ if enabled {
+ self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
+ self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
+ }
+ self
+ }
+
+ /// Sets the maximum frame size to use for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.max_frame_size = sz;
+ }
+ self
+ }
+
+ /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
+ /// connections.
+ ///
+ /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
+ self.h2_builder.max_concurrent_streams = max.into();
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_interval(
+ &mut self,
+ interval: impl Into<Option<Duration>>,
+ ) -> &mut Self {
+ self.h2_builder.keep_alive_interval = interval.into();
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
+ self.h2_builder.keep_alive_timeout = timeout;
+ self
+ }
+
+ /// Set the maximum write buffer size for each HTTP/2 stream.
+ ///
+ /// Default is currently ~400KB, but may change.
+ ///
+ /// # Panics
+ ///
+ /// The value must be no larger than `u32::MAX`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
+ assert!(max <= std::u32::MAX as usize);
+ self.h2_builder.max_send_buffer_size = max;
+ self
+ }
+
+ /// Enables the [extended CONNECT protocol].
+ ///
+ /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
+ #[cfg(feature = "http2")]
+ pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
+ self.h2_builder.enable_connect_protocol = true;
+ self
+ }
+
+ /// Sets the max size of received header frames.
+ ///
+ /// Default is currently ~16MB, but may change.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
+ self.h2_builder.max_header_list_size = max;
+ self
+ }
+
+ /// Set the maximum buffer size for the connection.
+ ///
+ /// Default is ~400kb.
+ ///
+ /// # Panics
+ ///
+ /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
+ assert!(
+ max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
+ "the max_buf_size cannot be smaller than the minimum that h1 specifies."
+ );
+ self.max_buf_size = Some(max);
+ self
+ }
+
+ /// Aggregates flushes to better support pipelined responses.
+ ///
+ /// Experimental, may have bugs.
+ ///
+ /// Default is false.
+ pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
+ self.pipeline_flush = enabled;
+ self
+ }
+
+ /// Set the executor used to spawn background tasks.
+ ///
+ /// Default uses implicit default (like `tokio::spawn`).
+ pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
+ Http {
+ exec,
+ h1_half_close: self.h1_half_close,
+ h1_keep_alive: self.h1_keep_alive,
+ h1_title_case_headers: self.h1_title_case_headers,
+ h1_preserve_header_case: self.h1_preserve_header_case,
+ #[cfg(all(feature = "http1", feature = "runtime"))]
+ h1_header_read_timeout: self.h1_header_read_timeout,
+ h1_writev: self.h1_writev,
+ #[cfg(feature = "http2")]
+ h2_builder: self.h2_builder,
+ mode: self.mode,
+ max_buf_size: self.max_buf_size,
+ pipeline_flush: self.pipeline_flush,
+ }
+ }
+
+ /// Bind a connection together with a [`Service`](crate::service::Service).
+ ///
+ /// This returns a Future that must be polled in order for HTTP to be
+ /// driven on the connection.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use hyper::{Body, Request, Response};
+ /// # use hyper::service::Service;
+ /// # use hyper::server::conn::Http;
+ /// # use tokio::io::{AsyncRead, AsyncWrite};
+ /// # async fn run<I, S>(some_io: I, some_service: S)
+ /// # where
+ /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ /// # S: Service<hyper::Request<Body>, Response=hyper::Response<Body>> + Send + 'static,
+ /// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ /// # S::Future: Send,
+ /// # {
+ /// let http = Http::new();
+ /// let conn = http.serve_connection(some_io, some_service);
+ ///
+ /// if let Err(e) = conn.await {
+ /// eprintln!("server connection error: {}", e);
+ /// }
+ /// # }
+ /// # fn main() {}
+ /// ```
+ pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
+ where
+ S: HttpService<Body, ResBody = Bd>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ Bd: HttpBody + 'static,
+ Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ E: ConnStreamExec<S::Future, Bd>,
+ {
+ #[cfg(feature = "http1")]
+ macro_rules! h1 {
+ () => {{
+ let mut conn = proto::Conn::new(io);
+ if !self.h1_keep_alive {
+ conn.disable_keep_alive();
+ }
+ if self.h1_half_close {
+ conn.set_allow_half_close();
+ }
+ if self.h1_title_case_headers {
+ conn.set_title_case_headers();
+ }
+ if self.h1_preserve_header_case {
+ conn.set_preserve_header_case();
+ }
+ #[cfg(all(feature = "http1", feature = "runtime"))]
+ if let Some(header_read_timeout) = self.h1_header_read_timeout {
+ conn.set_http1_header_read_timeout(header_read_timeout);
+ }
+ if let Some(writev) = self.h1_writev {
+ if writev {
+ conn.set_write_strategy_queue();
+ } else {
+ conn.set_write_strategy_flatten();
+ }
+ }
+ conn.set_flush_pipeline(self.pipeline_flush);
+ if let Some(max) = self.max_buf_size {
+ conn.set_max_buf_size(max);
+ }
+ let sd = proto::h1::dispatch::Server::new(service);
+ ProtoServer::H1 {
+ h1: proto::h1::Dispatcher::new(sd, conn),
+ }
+ }};
+ }
+
+ let proto = match self.mode {
+ #[cfg(feature = "http1")]
+ #[cfg(not(feature = "http2"))]
+ ConnectionMode::H1Only => h1!(),
+ #[cfg(feature = "http2")]
+ #[cfg(feature = "http1")]
+ ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
+ #[cfg(feature = "http2")]
+ ConnectionMode::H2Only => {
+ let rewind_io = Rewind::new(io);
+ let h2 =
+ proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
+ ProtoServer::H2 { h2 }
+ }
+ };
+
+ Connection {
+ conn: Some(proto),
+ #[cfg(all(feature = "http1", feature = "http2"))]
+ fallback: if self.mode == ConnectionMode::Fallback {
+ Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
+ } else {
+ Fallback::Http1Only
+ },
+ #[cfg(not(all(feature = "http1", feature = "http2")))]
+ fallback: PhantomData,
+ }
+ }
+}
+
+// ===== impl Connection =====
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl<I, B, S, E> Connection<I, S, E>
+where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+{
+ /// Start a graceful shutdown process for this connection.
+ ///
+ /// This `Connection` should continue to be polled until shutdown
+ /// can finish.
+ ///
+ /// # Note
+ ///
+ /// This should only be called while the `Connection` future is still
+ /// pending. If called after `Connection::poll` has resolved, this does
+ /// nothing.
+ pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
+ match self.conn {
+ #[cfg(feature = "http1")]
+ Some(ProtoServer::H1 { ref mut h1, .. }) => {
+ h1.disable_keep_alive();
+ }
+ #[cfg(feature = "http2")]
+ Some(ProtoServer::H2 { ref mut h2 }) => {
+ h2.graceful_shutdown();
+ }
+ None => (),
+
+ #[cfg(not(feature = "http1"))]
+ Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
+ #[cfg(not(feature = "http2"))]
+ Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
+ }
+ }
+
+ /// Return the inner IO object, and additional information.
+ ///
+ /// If the IO object has been "rewound" the io will not contain those bytes rewound.
+ /// This should only be called after `poll_without_shutdown` signals
+ /// that the connection is "done". Otherwise, it may not have finished
+ /// flushing all necessary HTTP bytes.
+ ///
+ /// # Panics
+ /// This method will panic if this connection is using an h2 protocol.
+ pub fn into_parts(self) -> Parts<I, S> {
+ self.try_into_parts()
+ .unwrap_or_else(|| panic!("h2 cannot into_inner"))
+ }
+
+ /// Return the inner IO object, and additional information, if available.
+ ///
+ /// This method will return a `None` if this connection is using an h2 protocol.
+ pub fn try_into_parts(self) -> Option<Parts<I, S>> {
+ match self.conn.unwrap() {
+ #[cfg(feature = "http1")]
+ ProtoServer::H1 { h1, .. } => {
+ let (io, read_buf, dispatch) = h1.into_inner();
+ Some(Parts {
+ io,
+ read_buf,
+ service: dispatch.into_service(),
+ _inner: (),
+ })
+ }
+ ProtoServer::H2 { .. } => None,
+
+ #[cfg(not(feature = "http1"))]
+ ProtoServer::H1 { h1, .. } => match h1.0 {},
+ }
+ }
+
+ /// Poll the connection for completion, but without calling `shutdown`
+ /// on the underlying IO.
+ ///
+ /// This is useful to allow running a connection while doing an HTTP
+ /// upgrade. Once the upgrade is completed, the connection would be "done",
+ /// but it is not desired to actually shutdown the IO object. Instead you
+ /// would take it back using `into_parts`.
+ pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
+ where
+ S: Unpin,
+ S::Future: Unpin,
+ B: Unpin,
+ {
+ loop {
+ match *self.conn.as_mut().unwrap() {
+ #[cfg(feature = "http1")]
+ ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
+ Ok(()) => return Poll::Ready(Ok(())),
+ Err(e) => {
+ #[cfg(feature = "http2")]
+ match *e.kind() {
+ Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
+ self.upgrade_h2();
+ continue;
+ }
+ _ => (),
+ }
+
+ return Poll::Ready(Err(e));
+ }
+ },
+ #[cfg(feature = "http2")]
+ ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
+
+ #[cfg(not(feature = "http1"))]
+ ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
+ #[cfg(not(feature = "http2"))]
+ ProtoServer::H2 { ref mut h2 } => match h2.0 {},
+ };
+ }
+ }
+
+ /// Prevent shutdown of the underlying IO object at the end of service the request,
+ /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
+ ///
+ /// # Error
+ ///
+ /// This errors if the underlying connection protocol is not HTTP/1.
+ pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>>
+ where
+ S: Unpin,
+ S::Future: Unpin,
+ B: Unpin,
+ {
+ let mut conn = Some(self);
+ futures_util::future::poll_fn(move |cx| {
+ ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
+ Poll::Ready(conn.take().unwrap().try_into_parts().ok_or_else(crate::Error::new_without_shutdown_not_h1))
+ })
+ }
+
+ #[cfg(all(feature = "http1", feature = "http2"))]
+ fn upgrade_h2(&mut self) {
+ trace!("Trying to upgrade connection to h2");
+ let conn = self.conn.take();
+
+ let (io, read_buf, dispatch) = match conn.unwrap() {
+ ProtoServer::H1 { h1, .. } => h1.into_inner(),
+ ProtoServer::H2 { .. } => {
+ panic!("h2 cannot into_inner");
+ }
+ };
+ let mut rewind_io = Rewind::new(io);
+ rewind_io.rewind(read_buf);
+ let (builder, exec) = match self.fallback {
+ Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
+ Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
+ };
+ let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
+
+ debug_assert!(self.conn.is_none());
+ self.conn = Some(ProtoServer::H2 { h2 });
+ }
+
+ /// Enable this connection to support higher-level HTTP upgrades.
+ ///
+ /// See [the `upgrade` module](crate::upgrade) for more.
+ pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
+ where
+ I: Send,
+ {
+ UpgradeableConnection { inner: self }
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl<I, B, S, E> Future for Connection<I, S, E>
+where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin + 'static,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ loop {
+ match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
+ Ok(done) => {
+ match done {
+ proto::Dispatched::Shutdown => {}
+ #[cfg(feature = "http1")]
+ proto::Dispatched::Upgrade(pending) => {
+ // With no `Send` bound on `I`, we can't try to do
+ // upgrades here. In case a user was trying to use
+ // `Body::on_upgrade` with this API, send a special
+ // error letting them know about that.
+ pending.manual();
+ }
+ };
+ return Poll::Ready(Ok(()));
+ }
+ Err(e) => {
+ #[cfg(feature = "http1")]
+ #[cfg(feature = "http2")]
+ match *e.kind() {
+ Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
+ self.upgrade_h2();
+ continue;
+ }
+ _ => (),
+ }
+
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl<I, S> fmt::Debug for Connection<I, S>
+where
+ S: HttpService<Body>,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Connection").finish()
+ }
+}
+
+// ===== impl ConnectionMode =====
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl Default for ConnectionMode {
+ #[cfg(all(feature = "http1", feature = "http2"))]
+ fn default() -> ConnectionMode {
+ ConnectionMode::Fallback
+ }
+
+ #[cfg(all(feature = "http1", not(feature = "http2")))]
+ fn default() -> ConnectionMode {
+ ConnectionMode::H1Only
+ }
+
+ #[cfg(all(not(feature = "http1"), feature = "http2"))]
+ fn default() -> ConnectionMode {
+ ConnectionMode::H2Only
+ }
+}
+
+// ===== impl ProtoServer =====
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
+where
+ T: AsyncRead + AsyncWrite + Unpin,
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+{
+ type Output = crate::Result<proto::Dispatched>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match self.project() {
+ #[cfg(feature = "http1")]
+ ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
+ #[cfg(feature = "http2")]
+ ProtoServerProj::H2 { h2 } => h2.poll(cx),
+
+ #[cfg(not(feature = "http1"))]
+ ProtoServerProj::H1 { h1, .. } => match h1.0 {},
+ #[cfg(not(feature = "http2"))]
+ ProtoServerProj::H2 { h2 } => match h2.0 {},
+ }
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+mod upgrades {
+ use super::*;
+
+ // A future binding a connection with a Service with Upgrade support.
+ //
+ // This type is unnameable outside the crate, and so basically just an
+ // `impl Future`, without requiring Rust 1.26.
+ #[must_use = "futures do nothing unless polled"]
+ #[allow(missing_debug_implementations)]
+ pub struct UpgradeableConnection<T, S, E>
+ where
+ S: HttpService<Body>,
+ {
+ pub(super) inner: Connection<T, S, E>,
+ }
+
+ impl<I, B, S, E> UpgradeableConnection<I, S, E>
+ where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+ {
+ /// Start a graceful shutdown process for this connection.
+ ///
+ /// This `Connection` should continue to be polled until shutdown
+ /// can finish.
+ pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
+ Pin::new(&mut self.inner).graceful_shutdown()
+ }
+ }
+
+ impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
+ where
+ S: HttpService<Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+ {
+ type Output = crate::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ loop {
+ match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
+ Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
+ #[cfg(feature = "http1")]
+ Ok(proto::Dispatched::Upgrade(pending)) => {
+ match self.inner.conn.take() {
+ Some(ProtoServer::H1 { h1, .. }) => {
+ let (io, buf, _) = h1.into_inner();
+ pending.fulfill(Upgraded::new(io, buf));
+ return Poll::Ready(Ok(()));
+ }
+ _ => {
+ drop(pending);
+ unreachable!("Upgrade expects h1")
+ }
+ };
+ }
+ Err(e) => {
+ #[cfg(feature = "http1")]
+ #[cfg(feature = "http2")]
+ match *e.kind() {
+ Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
+ self.inner.upgrade_h2();
+ continue;
+ }
+ _ => (),
+ }
+
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/hyper/src/server/mod.rs b/third_party/rust/hyper/src/server/mod.rs
new file mode 100644
index 0000000000..e763d0e7c0
--- /dev/null
+++ b/third_party/rust/hyper/src/server/mod.rs
@@ -0,0 +1,172 @@
+//! HTTP Server
+//!
+//! A `Server` is created to listen on a port, parse HTTP requests, and hand
+//! them off to a `Service`.
+//!
+//! There are two levels of APIs provide for constructing HTTP servers:
+//!
+//! - The higher-level [`Server`](Server) type.
+//! - The lower-level [`conn`](conn) module.
+//!
+//! # Server
+//!
+//! The [`Server`](Server) is main way to start listening for HTTP requests.
+//! It wraps a listener with a [`MakeService`](crate::service), and then should
+//! be executed to start serving requests.
+//!
+//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default.
+//!
+//! ## Examples
+//!
+//! ```no_run
+//! use std::convert::Infallible;
+//! use std::net::SocketAddr;
+//! use hyper::{Body, Request, Response, Server};
+//! use hyper::service::{make_service_fn, service_fn};
+//!
+//! async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
+//! Ok(Response::new(Body::from("Hello World")))
+//! }
+//!
+//! # #[cfg(feature = "runtime")]
+//! #[tokio::main]
+//! async fn main() {
+//! // Construct our SocketAddr to listen on...
+//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
+//!
+//! // And a MakeService to handle each connection...
+//! let make_service = make_service_fn(|_conn| async {
+//! Ok::<_, Infallible>(service_fn(handle))
+//! });
+//!
+//! // Then bind and serve...
+//! let server = Server::bind(&addr).serve(make_service);
+//!
+//! // And run forever...
+//! if let Err(e) = server.await {
+//! eprintln!("server error: {}", e);
+//! }
+//! }
+//! # #[cfg(not(feature = "runtime"))]
+//! # fn main() {}
+//! ```
+//!
+//! If you don't need the connection and your service implements `Clone` you can use
+//! [`tower::make::Shared`] instead of `make_service_fn` which is a bit simpler:
+//!
+//! ```no_run
+//! # use std::convert::Infallible;
+//! # use std::net::SocketAddr;
+//! # use hyper::{Body, Request, Response, Server};
+//! # use hyper::service::{make_service_fn, service_fn};
+//! # use tower::make::Shared;
+//! # async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
+//! # Ok(Response::new(Body::from("Hello World")))
+//! # }
+//! # #[cfg(feature = "runtime")]
+//! #[tokio::main]
+//! async fn main() {
+//! // Construct our SocketAddr to listen on...
+//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
+//!
+//! // Shared is a MakeService that produces services by cloning an inner service...
+//! let make_service = Shared::new(service_fn(handle));
+//!
+//! // Then bind and serve...
+//! let server = Server::bind(&addr).serve(make_service);
+//!
+//! // And run forever...
+//! if let Err(e) = server.await {
+//! eprintln!("server error: {}", e);
+//! }
+//! }
+//! # #[cfg(not(feature = "runtime"))]
+//! # fn main() {}
+//! ```
+//!
+//! Passing data to your request handler can be done like so:
+//!
+//! ```no_run
+//! use std::convert::Infallible;
+//! use std::net::SocketAddr;
+//! use hyper::{Body, Request, Response, Server};
+//! use hyper::service::{make_service_fn, service_fn};
+//! # #[cfg(feature = "runtime")]
+//! use hyper::server::conn::AddrStream;
+//!
+//! #[derive(Clone)]
+//! struct AppContext {
+//! // Whatever data your application needs can go here
+//! }
+//!
+//! async fn handle(
+//! context: AppContext,
+//! addr: SocketAddr,
+//! req: Request<Body>
+//! ) -> Result<Response<Body>, Infallible> {
+//! Ok(Response::new(Body::from("Hello World")))
+//! }
+//!
+//! # #[cfg(feature = "runtime")]
+//! #[tokio::main]
+//! async fn main() {
+//! let context = AppContext {
+//! // ...
+//! };
+//!
+//! // A `MakeService` that produces a `Service` to handle each connection.
+//! let make_service = make_service_fn(move |conn: &AddrStream| {
+//! // We have to clone the context to share it with each invocation of
+//! // `make_service`. If your data doesn't implement `Clone` consider using
+//! // an `std::sync::Arc`.
+//! let context = context.clone();
+//!
+//! // You can grab the address of the incoming connection like so.
+//! let addr = conn.remote_addr();
+//!
+//! // Create a `Service` for responding to the request.
+//! let service = service_fn(move |req| {
+//! handle(context.clone(), addr, req)
+//! });
+//!
+//! // Return the service to hyper.
+//! async move { Ok::<_, Infallible>(service) }
+//! });
+//!
+//! // Run the server like above...
+//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
+//!
+//! let server = Server::bind(&addr).serve(make_service);
+//!
+//! if let Err(e) = server.await {
+//! eprintln!("server error: {}", e);
+//! }
+//! }
+//! # #[cfg(not(feature = "runtime"))]
+//! # fn main() {}
+//! ```
+//!
+//! [`tower::make::Shared`]: https://docs.rs/tower/latest/tower/make/struct.Shared.html
+
+pub mod accept;
+pub mod conn;
+#[cfg(feature = "tcp")]
+mod tcp;
+
+pub use self::server::Server;
+
+cfg_feature! {
+ #![any(feature = "http1", feature = "http2")]
+
+ pub(crate) mod server;
+ pub use self::server::Builder;
+
+ mod shutdown;
+}
+
+cfg_feature! {
+ #![not(any(feature = "http1", feature = "http2"))]
+
+ mod server_stub;
+ use server_stub as server;
+}
diff --git a/third_party/rust/hyper/src/server/server.rs b/third_party/rust/hyper/src/server/server.rs
new file mode 100644
index 0000000000..caa4aebd14
--- /dev/null
+++ b/third_party/rust/hyper/src/server/server.rs
@@ -0,0 +1,789 @@
+use std::error::Error as StdError;
+use std::fmt;
+#[cfg(feature = "tcp")]
+use std::net::{SocketAddr, TcpListener as StdTcpListener};
+
+#[cfg(feature = "tcp")]
+use std::time::Duration;
+
+use pin_project_lite::pin_project;
+
+use tokio::io::{AsyncRead, AsyncWrite};
+use tracing::trace;
+
+use super::accept::Accept;
+#[cfg(all(feature = "tcp"))]
+use super::tcp::AddrIncoming;
+use crate::body::{Body, HttpBody};
+use crate::common::exec::Exec;
+use crate::common::exec::{ConnStreamExec, NewSvcExec};
+use crate::common::{task, Future, Pin, Poll, Unpin};
+// Renamed `Http` as `Http_` for now so that people upgrading don't see an
+// error that `hyper::server::Http` is private...
+use super::conn::{Connection, Http as Http_, UpgradeableConnection};
+use super::shutdown::{Graceful, GracefulWatcher};
+use crate::service::{HttpService, MakeServiceRef};
+
+use self::new_svc::NewSvcTask;
+
+pin_project! {
+ /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
+ ///
+ /// `Server` is a `Future` mapping a bound listener with a set of service
+ /// handlers. It is built using the [`Builder`](Builder), and the future
+ /// completes when the server has been shutdown. It should be run by an
+ /// `Executor`.
+ pub struct Server<I, S, E = Exec> {
+ #[pin]
+ incoming: I,
+ make_service: S,
+ protocol: Http_<E>,
+ }
+}
+
+/// A builder for a [`Server`](Server).
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+pub struct Builder<I, E = Exec> {
+ incoming: I,
+ protocol: Http_<E>,
+}
+
+// ===== impl Server =====
+
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+impl<I> Server<I, ()> {
+ /// Starts a [`Builder`](Builder) with the provided incoming stream.
+ pub fn builder(incoming: I) -> Builder<I> {
+ Builder {
+ incoming,
+ protocol: Http_::new(),
+ }
+ }
+}
+
+#[cfg(feature = "tcp")]
+#[cfg_attr(
+ docsrs,
+ doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
+)]
+impl Server<AddrIncoming, ()> {
+ /// Binds to the provided address, and returns a [`Builder`](Builder).
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if binding to the address fails. For a method
+ /// to bind to an address and return a `Result`, see `Server::try_bind`.
+ pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
+ let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
+ panic!("error binding to {}: {}", addr, e);
+ });
+ Server::builder(incoming)
+ }
+
+ /// Tries to bind to the provided address, and returns a [`Builder`](Builder).
+ pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
+ AddrIncoming::new(addr).map(Server::builder)
+ }
+
+ /// Create a new instance from a `std::net::TcpListener` instance.
+ pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
+ AddrIncoming::from_std(listener).map(Server::builder)
+ }
+}
+
+#[cfg(feature = "tcp")]
+#[cfg_attr(
+ docsrs,
+ doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
+)]
+impl<S, E> Server<AddrIncoming, S, E> {
+ /// Returns the local address that this server is bound to.
+ pub fn local_addr(&self) -> SocketAddr {
+ self.incoming.local_addr()
+ }
+}
+
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+impl<I, IO, IE, S, E, B> Server<I, S, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
+{
+ /// Prepares a server to handle graceful shutdown when the provided future
+ /// completes.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # fn main() {}
+ /// # #[cfg(feature = "tcp")]
+ /// # async fn run() {
+ /// # use hyper::{Body, Response, Server, Error};
+ /// # use hyper::service::{make_service_fn, service_fn};
+ /// # let make_service = make_service_fn(|_| async {
+ /// # Ok::<_, Error>(service_fn(|_req| async {
+ /// # Ok::<_, Error>(Response::new(Body::from("Hello World")))
+ /// # }))
+ /// # });
+ /// // Make a server from the previous examples...
+ /// let server = Server::bind(&([127, 0, 0, 1], 3000).into())
+ /// .serve(make_service);
+ ///
+ /// // Prepare some signal for when the server should start shutting down...
+ /// let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+ /// let graceful = server
+ /// .with_graceful_shutdown(async {
+ /// rx.await.ok();
+ /// });
+ ///
+ /// // Await the `server` receiving the signal...
+ /// if let Err(e) = graceful.await {
+ /// eprintln!("server error: {}", e);
+ /// }
+ ///
+ /// // And later, trigger the signal by calling `tx.send(())`.
+ /// let _ = tx.send(());
+ /// # }
+ /// ```
+ pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
+ where
+ F: Future<Output = ()>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
+ {
+ Graceful::new(self, signal)
+ }
+
+ fn poll_next_(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
+ let me = self.project();
+ match ready!(me.make_service.poll_ready_ref(cx)) {
+ Ok(()) => (),
+ Err(e) => {
+ trace!("make_service closed");
+ return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
+ }
+ }
+
+ if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
+ let io = item.map_err(crate::Error::new_accept)?;
+ let new_fut = me.make_service.make_service_ref(&io);
+ Poll::Ready(Some(Ok(Connecting {
+ future: new_fut,
+ io: Some(io),
+ protocol: me.protocol.clone(),
+ })))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ pub(super) fn poll_watch<W>(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ watcher: &W,
+ ) -> Poll<crate::Result<()>>
+ where
+ E: NewSvcExec<IO, S::Future, S::Service, E, W>,
+ W: Watcher<IO, S::Service, E>,
+ {
+ loop {
+ if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) {
+ let fut = NewSvcTask::new(connecting, watcher.clone());
+ self.as_mut().project().protocol.exec.execute_new_svc(fut);
+ } else {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+}
+
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.poll_watch(cx, &NoopWatcher)
+ }
+}
+
+impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let mut st = f.debug_struct("Server");
+ st.field("listener", &self.incoming);
+ st.finish()
+ }
+}
+
+// ===== impl Builder =====
+
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+impl<I, E> Builder<I, E> {
+ /// Start a new builder, wrapping an incoming stream and low-level options.
+ ///
+ /// For a more convenient constructor, see [`Server::bind`](Server::bind).
+ pub fn new(incoming: I, protocol: Http_<E>) -> Self {
+ Builder { incoming, protocol }
+ }
+
+ /// Sets whether to use keep-alive for HTTP/1 connections.
+ ///
+ /// Default is `true`.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_keepalive(mut self, val: bool) -> Self {
+ self.protocol.http1_keep_alive(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should support half-closures.
+ ///
+ /// Clients can chose to shutdown their write-side while waiting
+ /// for the server to respond. Setting this to `true` will
+ /// prevent closing the connection immediately if `read`
+ /// detects an EOF in the middle of a request.
+ ///
+ /// Default is `false`.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_half_close(mut self, val: bool) -> Self {
+ self.protocol.http1_half_close(val);
+ self
+ }
+
+ /// Set the maximum buffer size.
+ ///
+ /// Default is ~ 400kb.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_max_buf_size(mut self, val: usize) -> Self {
+ self.protocol.max_buf_size(val);
+ self
+ }
+
+ // Sets whether to bunch up HTTP/1 writes until the read buffer is empty.
+ //
+ // This isn't really desirable in most cases, only really being useful in
+ // silly pipeline benchmarks.
+ #[doc(hidden)]
+ #[cfg(feature = "http1")]
+ pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
+ self.protocol.pipeline_flush(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// Note that setting this to false may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Setting this to true will force hyper to use queued strategy
+ /// which may eliminate unnecessary cloning on some TLS backends
+ ///
+ /// Default is `auto`. In this mode hyper will try to guess which
+ /// mode to use
+ #[cfg(feature = "http1")]
+ pub fn http1_writev(mut self, enabled: bool) -> Self {
+ self.protocol.http1_writev(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will write header names as title case at
+ /// the socket level.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_title_case_headers(mut self, val: bool) -> Self {
+ self.protocol.http1_title_case_headers(val);
+ self
+ }
+
+ /// Set whether to support preserving original header cases.
+ ///
+ /// Currently, this will record the original cases received, and store them
+ /// in a private extension on the `Request`. It will also look for and use
+ /// such an extension in any provided `Response`.
+ ///
+ /// Since the relevant extension is still private, there is no way to
+ /// interact with the original cases. The only effect this can have now is
+ /// to forward the cases in a proxy-like fashion.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_preserve_header_case(mut self, val: bool) -> Self {
+ self.protocol.http1_preserve_header_case(val);
+ self
+ }
+
+ /// Set a timeout for reading client request headers. If a client does not
+ /// transmit the entire header within this time, the connection is closed.
+ ///
+ /// Default is None.
+ #[cfg(all(feature = "http1", feature = "runtime"))]
+ #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
+ pub fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self {
+ self.protocol.http1_header_read_timeout(read_timeout);
+ self
+ }
+
+ /// Sets whether HTTP/1 is required.
+ ///
+ /// Default is `false`.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_only(mut self, val: bool) -> Self {
+ self.protocol.http1_only(val);
+ self
+ }
+
+ /// Sets whether HTTP/2 is required.
+ ///
+ /// Default is `false`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_only(mut self, val: bool) -> Self {
+ self.protocol.http2_only(val);
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
+ self.protocol.http2_initial_stream_window_size(sz.into());
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
+ self.protocol
+ .http2_initial_connection_window_size(sz.into());
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
+ self.protocol.http2_adaptive_window(enabled);
+ self
+ }
+
+ /// Sets the maximum frame size to use for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
+ self.protocol.http2_max_frame_size(sz);
+ self
+ }
+
+ /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
+ /// connections.
+ ///
+ /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
+ self.protocol.http2_max_concurrent_streams(max.into());
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(all(feature = "runtime", feature = "http2"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
+ self.protocol.http2_keep_alive_interval(interval);
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(all(feature = "runtime", feature = "http2"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
+ self.protocol.http2_keep_alive_timeout(timeout);
+ self
+ }
+
+ /// Set the maximum write buffer size for each HTTP/2 stream.
+ ///
+ /// Default is currently ~400KB, but may change.
+ ///
+ /// # Panics
+ ///
+ /// The value must be no larger than `u32::MAX`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_send_buf_size(mut self, max: usize) -> Self {
+ self.protocol.http2_max_send_buf_size(max);
+ self
+ }
+
+ /// Enables the [extended CONNECT protocol].
+ ///
+ /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
+ #[cfg(feature = "http2")]
+ pub fn http2_enable_connect_protocol(mut self) -> Self {
+ self.protocol.http2_enable_connect_protocol();
+ self
+ }
+
+ /// Sets the `Executor` to deal with connection tasks.
+ ///
+ /// Default is `tokio::spawn`.
+ pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
+ Builder {
+ incoming: self.incoming,
+ protocol: self.protocol.with_executor(executor),
+ }
+ }
+
+ /// Consume this `Builder`, creating a [`Server`](Server).
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # #[cfg(feature = "tcp")]
+ /// # async fn run() {
+ /// use hyper::{Body, Error, Response, Server};
+ /// use hyper::service::{make_service_fn, service_fn};
+ ///
+ /// // Construct our SocketAddr to listen on...
+ /// let addr = ([127, 0, 0, 1], 3000).into();
+ ///
+ /// // And a MakeService to handle each connection...
+ /// let make_svc = make_service_fn(|_| async {
+ /// Ok::<_, Error>(service_fn(|_req| async {
+ /// Ok::<_, Error>(Response::new(Body::from("Hello World")))
+ /// }))
+ /// });
+ ///
+ /// // Then bind and serve...
+ /// let server = Server::bind(&addr)
+ /// .serve(make_svc);
+ ///
+ /// // Run forever-ish...
+ /// if let Err(err) = server.await {
+ /// eprintln!("server error: {}", err);
+ /// }
+ /// # }
+ /// ```
+ pub fn serve<S, B>(self, make_service: S) -> Server<I, S, E>
+ where
+ I: Accept,
+ I::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<I::Conn, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
+ E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
+ {
+ Server {
+ incoming: self.incoming,
+ make_service,
+ protocol: self.protocol.clone(),
+ }
+ }
+}
+
+#[cfg(feature = "tcp")]
+#[cfg_attr(
+ docsrs,
+ doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
+)]
+impl<E> Builder<AddrIncoming, E> {
+ /// Set the duration to remain idle before sending TCP keepalive probes.
+ ///
+ /// If `None` is specified, keepalive is disabled.
+ pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
+ self.incoming.set_keepalive(keepalive);
+ self
+ }
+
+ /// Set the duration between two successive TCP keepalive retransmissions,
+ /// if acknowledgement to the previous keepalive transmission is not received.
+ pub fn tcp_keepalive_interval(mut self, interval: Option<Duration>) -> Self {
+ self.incoming.set_keepalive_interval(interval);
+ self
+ }
+
+ /// Set the number of retransmissions to be carried out before declaring that remote end is not available.
+ pub fn tcp_keepalive_retries(mut self, retries: Option<u32>) -> Self {
+ self.incoming.set_keepalive_retries(retries);
+ self
+ }
+
+ /// Set the value of `TCP_NODELAY` option for accepted connections.
+ pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
+ self.incoming.set_nodelay(enabled);
+ self
+ }
+
+ /// Set whether to sleep on accept errors.
+ ///
+ /// A possible scenario is that the process has hit the max open files
+ /// allowed, and so trying to accept a new connection will fail with
+ /// EMFILE. In some cases, it's preferable to just wait for some time, if
+ /// the application will likely close some files (or connections), and try
+ /// to accept the connection again. If this option is true, the error will
+ /// be logged at the error level, since it is still a big deal, and then
+ /// the listener will sleep for 1 second.
+ ///
+ /// In other cases, hitting the max open files should be treat similarly
+ /// to being out-of-memory, and simply error (and shutdown). Setting this
+ /// option to false will allow that.
+ ///
+ /// For more details see [`AddrIncoming::set_sleep_on_errors`]
+ pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
+ self.incoming.set_sleep_on_errors(val);
+ self
+ }
+}
+
+// Used by `Server` to optionally watch a `Connection` future.
+//
+// The regular `hyper::Server` just uses a `NoopWatcher`, which does
+// not need to watch anything, and so returns the `Connection` untouched.
+//
+// The `Server::with_graceful_shutdown` needs to keep track of all active
+// connections, and signal that they start to shutdown when prompted, so
+// it has a `GracefulWatcher` implementation to do that.
+pub trait Watcher<I, S: HttpService<Body>, E>: Clone {
+ type Future: Future<Output = crate::Result<()>>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
+}
+
+#[allow(missing_debug_implementations)]
+#[derive(Copy, Clone)]
+pub struct NoopWatcher;
+
+impl<I, S, E> Watcher<I, S, E> for NoopWatcher
+where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: HttpService<Body>,
+ E: ConnStreamExec<S::Future, S::ResBody>,
+ S::ResBody: 'static,
+ <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Future = UpgradeableConnection<I, S, E>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
+ conn
+ }
+}
+
+// used by exec.rs
+pub(crate) mod new_svc {
+ use std::error::Error as StdError;
+ use tokio::io::{AsyncRead, AsyncWrite};
+ use tracing::debug;
+
+ use super::{Connecting, Watcher};
+ use crate::body::{Body, HttpBody};
+ use crate::common::exec::ConnStreamExec;
+ use crate::common::{task, Future, Pin, Poll, Unpin};
+ use crate::service::HttpService;
+ use pin_project_lite::pin_project;
+
+ // This is a `Future<Item=(), Error=()>` spawned to an `Executor` inside
+ // the `Server`. By being a nameable type, we can be generic over the
+ // user's `Service::Future`, and thus an `Executor` can execute it.
+ //
+ // Doing this allows for the server to conditionally require `Send` futures,
+ // depending on the `Executor` configured.
+ //
+ // Users cannot import this type, nor the associated `NewSvcExec`. Instead,
+ // a blanket implementation for `Executor<impl Future>` is sufficient.
+
+ pin_project! {
+ #[allow(missing_debug_implementations)]
+ pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
+ #[pin]
+ state: State<I, N, S, E, W>,
+ }
+ }
+
+ pin_project! {
+ #[project = StateProj]
+ pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
+ Connecting {
+ #[pin]
+ connecting: Connecting<I, N, E>,
+ watcher: W,
+ },
+ Connected {
+ #[pin]
+ future: W::Future,
+ },
+ }
+ }
+
+ impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
+ pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
+ NewSvcTask {
+ state: State::Connecting {
+ connecting,
+ watcher,
+ },
+ }
+ }
+ }
+
+ impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W>
+ where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ N: Future<Output = Result<S, NE>>,
+ NE: Into<Box<dyn StdError + Send + Sync>>,
+ S: HttpService<Body, ResBody = B>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+ W: Watcher<I, S, E>,
+ {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ // If it weren't for needing to name this type so the `Send` bounds
+ // could be projected to the `Serve` executor, this could just be
+ // an `async fn`, and much safer. Woe is me.
+
+ let mut me = self.project();
+ loop {
+ let next = {
+ match me.state.as_mut().project() {
+ StateProj::Connecting {
+ connecting,
+ watcher,
+ } => {
+ let res = ready!(connecting.poll(cx));
+ let conn = match res {
+ Ok(conn) => conn,
+ Err(err) => {
+ let err = crate::Error::new_user_make_service(err);
+ debug!("connecting error: {}", err);
+ return Poll::Ready(());
+ }
+ };
+ let future = watcher.watch(conn.with_upgrades());
+ State::Connected { future }
+ }
+ StateProj::Connected { future } => {
+ return future.poll(cx).map(|res| {
+ if let Err(err) = res {
+ debug!("connection error: {}", err);
+ }
+ });
+ }
+ }
+ };
+
+ me.state.set(next);
+ }
+ }
+ }
+}
+
+pin_project! {
+ /// A future building a new `Service` to a `Connection`.
+ ///
+ /// Wraps the future returned from `MakeService` into one that returns
+ /// a `Connection`.
+ #[must_use = "futures do nothing unless polled"]
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+ pub struct Connecting<I, F, E = Exec> {
+ #[pin]
+ future: F,
+ io: Option<I>,
+ protocol: Http_<E>,
+ }
+}
+
+impl<I, F, S, FE, E, B> Future for Connecting<I, F, E>
+where
+ I: AsyncRead + AsyncWrite + Unpin,
+ F: Future<Output = Result<S, FE>>,
+ S: HttpService<Body, ResBody = B>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, B>,
+{
+ type Output = Result<Connection<I, S, E>, FE>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ let service = ready!(me.future.poll(cx))?;
+ let io = Option::take(&mut me.io).expect("polled after complete");
+ Poll::Ready(Ok(me.protocol.serve_connection(io, service)))
+ }
+}
diff --git a/third_party/rust/hyper/src/server/server_stub.rs b/third_party/rust/hyper/src/server/server_stub.rs
new file mode 100644
index 0000000000..87b1f5131f
--- /dev/null
+++ b/third_party/rust/hyper/src/server/server_stub.rs
@@ -0,0 +1,16 @@
+use std::fmt;
+
+use crate::common::exec::Exec;
+
+/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
+///
+/// Needs at least one of the `http1` and `http2` features to be activated to actually be useful.
+pub struct Server<I, S, E = Exec> {
+ _marker: std::marker::PhantomData<(I, S, E)>,
+}
+
+impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Server").finish()
+ }
+}
diff --git a/third_party/rust/hyper/src/server/shutdown.rs b/third_party/rust/hyper/src/server/shutdown.rs
new file mode 100644
index 0000000000..96937d0827
--- /dev/null
+++ b/third_party/rust/hyper/src/server/shutdown.rs
@@ -0,0 +1,128 @@
+use std::error::Error as StdError;
+
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tracing::debug;
+
+use super::accept::Accept;
+use super::conn::UpgradeableConnection;
+use super::server::{Server, Watcher};
+use crate::body::{Body, HttpBody};
+use crate::common::drain::{self, Draining, Signal, Watch, Watching};
+use crate::common::exec::{ConnStreamExec, NewSvcExec};
+use crate::common::{task, Future, Pin, Poll, Unpin};
+use crate::service::{HttpService, MakeServiceRef};
+
+pin_project! {
+ #[allow(missing_debug_implementations)]
+ pub struct Graceful<I, S, F, E> {
+ #[pin]
+ state: State<I, S, F, E>,
+ }
+}
+
+pin_project! {
+ #[project = StateProj]
+ pub(super) enum State<I, S, F, E> {
+ Running {
+ drain: Option<(Signal, Watch)>,
+ #[pin]
+ server: Server<I, S, E>,
+ #[pin]
+ signal: F,
+ },
+ Draining { draining: Draining },
+ }
+}
+
+impl<I, S, F, E> Graceful<I, S, F, E> {
+ pub(super) fn new(server: Server<I, S, E>, signal: F) -> Self {
+ let drain = Some(drain::channel());
+ Graceful {
+ state: State::Running {
+ drain,
+ server,
+ signal,
+ },
+ }
+ }
+}
+
+impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
+where
+ I: Accept<Conn = IO, Error = IE>,
+ IE: Into<Box<dyn StdError + Send + Sync>>,
+ IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: MakeServiceRef<IO, Body, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: HttpBody + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ F: Future<Output = ()>,
+ E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
+ E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+ loop {
+ let next = {
+ match me.state.as_mut().project() {
+ StateProj::Running {
+ drain,
+ server,
+ signal,
+ } => match signal.poll(cx) {
+ Poll::Ready(()) => {
+ debug!("signal received, starting graceful shutdown");
+ let sig = drain.take().expect("drain channel").0;
+ State::Draining {
+ draining: sig.drain(),
+ }
+ }
+ Poll::Pending => {
+ let watch = drain.as_ref().expect("drain channel").1.clone();
+ return server.poll_watch(cx, &GracefulWatcher(watch));
+ }
+ },
+ StateProj::Draining { ref mut draining } => {
+ return Pin::new(draining).poll(cx).map(Ok);
+ }
+ }
+ };
+ me.state.set(next);
+ }
+ }
+}
+
+#[allow(missing_debug_implementations)]
+#[derive(Clone)]
+pub struct GracefulWatcher(Watch);
+
+impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
+where
+ I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ S: HttpService<Body>,
+ E: ConnStreamExec<S::Future, S::ResBody>,
+ S::ResBody: 'static,
+ <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Future =
+ Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
+
+ fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
+ self.0.clone().watch(conn, on_drain)
+ }
+}
+
+fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
+where
+ S: HttpService<Body>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: AsyncRead + AsyncWrite + Unpin,
+ S::ResBody: HttpBody + 'static,
+ <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: ConnStreamExec<S::Future, S::ResBody>,
+{
+ conn.graceful_shutdown()
+}
diff --git a/third_party/rust/hyper/src/server/tcp.rs b/third_party/rust/hyper/src/server/tcp.rs
new file mode 100644
index 0000000000..3f937154be
--- /dev/null
+++ b/third_party/rust/hyper/src/server/tcp.rs
@@ -0,0 +1,484 @@
+use std::fmt;
+use std::io;
+use std::net::{SocketAddr, TcpListener as StdTcpListener};
+use std::time::Duration;
+use socket2::TcpKeepalive;
+
+use tokio::net::TcpListener;
+use tokio::time::Sleep;
+use tracing::{debug, error, trace};
+
+use crate::common::{task, Future, Pin, Poll};
+
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::addr_stream::AddrStream;
+use super::accept::Accept;
+
+#[derive(Default, Debug, Clone, Copy)]
+struct TcpKeepaliveConfig {
+ time: Option<Duration>,
+ interval: Option<Duration>,
+ retries: Option<u32>,
+}
+
+impl TcpKeepaliveConfig {
+ /// Converts into a `socket2::TcpKeealive` if there is any keep alive configuration.
+ fn into_socket2(self) -> Option<TcpKeepalive> {
+ let mut dirty = false;
+ let mut ka = TcpKeepalive::new();
+ if let Some(time) = self.time {
+ ka = ka.with_time(time);
+ dirty = true
+ }
+ if let Some(interval) = self.interval {
+ ka = Self::ka_with_interval(ka, interval, &mut dirty)
+ };
+ if let Some(retries) = self.retries {
+ ka = Self::ka_with_retries(ka, retries, &mut dirty)
+ };
+ if dirty {
+ Some(ka)
+ } else {
+ None
+ }
+ }
+
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_vendor = "apple",
+ windows,
+ ))]
+ fn ka_with_interval(ka: TcpKeepalive, interval: Duration, dirty: &mut bool) -> TcpKeepalive {
+ *dirty = true;
+ ka.with_interval(interval)
+ }
+
+ #[cfg(not(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_vendor = "apple",
+ windows,
+ )))]
+ fn ka_with_interval(ka: TcpKeepalive, _: Duration, _: &mut bool) -> TcpKeepalive {
+ ka // no-op as keepalive interval is not supported on this platform
+ }
+
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_vendor = "apple",
+ ))]
+ fn ka_with_retries(ka: TcpKeepalive, retries: u32, dirty: &mut bool) -> TcpKeepalive {
+ *dirty = true;
+ ka.with_retries(retries)
+ }
+
+ #[cfg(not(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_vendor = "apple",
+ )))]
+ fn ka_with_retries(ka: TcpKeepalive, _: u32, _: &mut bool) -> TcpKeepalive {
+ ka // no-op as keepalive retries is not supported on this platform
+ }
+}
+
+/// A stream of connections from binding to an address.
+#[must_use = "streams do nothing unless polled"]
+pub struct AddrIncoming {
+ addr: SocketAddr,
+ listener: TcpListener,
+ sleep_on_errors: bool,
+ tcp_keepalive_config: TcpKeepaliveConfig,
+ tcp_nodelay: bool,
+ timeout: Option<Pin<Box<Sleep>>>,
+}
+
+impl AddrIncoming {
+ pub(super) fn new(addr: &SocketAddr) -> crate::Result<Self> {
+ let std_listener = StdTcpListener::bind(addr).map_err(crate::Error::new_listen)?;
+
+ AddrIncoming::from_std(std_listener)
+ }
+
+ pub(super) fn from_std(std_listener: StdTcpListener) -> crate::Result<Self> {
+ // TcpListener::from_std doesn't set O_NONBLOCK
+ std_listener
+ .set_nonblocking(true)
+ .map_err(crate::Error::new_listen)?;
+ let listener = TcpListener::from_std(std_listener).map_err(crate::Error::new_listen)?;
+ AddrIncoming::from_listener(listener)
+ }
+
+ /// Creates a new `AddrIncoming` binding to provided socket address.
+ pub fn bind(addr: &SocketAddr) -> crate::Result<Self> {
+ AddrIncoming::new(addr)
+ }
+
+ /// Creates a new `AddrIncoming` from an existing `tokio::net::TcpListener`.
+ pub fn from_listener(listener: TcpListener) -> crate::Result<Self> {
+ let addr = listener.local_addr().map_err(crate::Error::new_listen)?;
+ Ok(AddrIncoming {
+ listener,
+ addr,
+ sleep_on_errors: true,
+ tcp_keepalive_config: TcpKeepaliveConfig::default(),
+ tcp_nodelay: false,
+ timeout: None,
+ })
+ }
+
+ /// Get the local address bound to this listener.
+ pub fn local_addr(&self) -> SocketAddr {
+ self.addr
+ }
+
+ /// Set the duration to remain idle before sending TCP keepalive probes.
+ ///
+ /// If `None` is specified, keepalive is disabled.
+ pub fn set_keepalive(&mut self, time: Option<Duration>) -> &mut Self {
+ self.tcp_keepalive_config.time = time;
+ self
+ }
+
+ /// Set the duration between two successive TCP keepalive retransmissions,
+ /// if acknowledgement to the previous keepalive transmission is not received.
+ pub fn set_keepalive_interval(&mut self, interval: Option<Duration>) -> &mut Self {
+ self.tcp_keepalive_config.interval = interval;
+ self
+ }
+
+ /// Set the number of retransmissions to be carried out before declaring that remote end is not available.
+ pub fn set_keepalive_retries(&mut self, retries: Option<u32>) -> &mut Self {
+ self.tcp_keepalive_config.retries = retries;
+ self
+ }
+
+ /// Set the value of `TCP_NODELAY` option for accepted connections.
+ pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self {
+ self.tcp_nodelay = enabled;
+ self
+ }
+
+ /// Set whether to sleep on accept errors.
+ ///
+ /// A possible scenario is that the process has hit the max open files
+ /// allowed, and so trying to accept a new connection will fail with
+ /// `EMFILE`. In some cases, it's preferable to just wait for some time, if
+ /// the application will likely close some files (or connections), and try
+ /// to accept the connection again. If this option is `true`, the error
+ /// will be logged at the `error` level, since it is still a big deal,
+ /// and then the listener will sleep for 1 second.
+ ///
+ /// In other cases, hitting the max open files should be treat similarly
+ /// to being out-of-memory, and simply error (and shutdown). Setting
+ /// this option to `false` will allow that.
+ ///
+ /// Default is `true`.
+ pub fn set_sleep_on_errors(&mut self, val: bool) {
+ self.sleep_on_errors = val;
+ }
+
+ fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> {
+ // Check if a previous timeout is active that was set by IO errors.
+ if let Some(ref mut to) = self.timeout {
+ ready!(Pin::new(to).poll(cx));
+ }
+ self.timeout = None;
+
+ loop {
+ match ready!(self.listener.poll_accept(cx)) {
+ Ok((socket, remote_addr)) => {
+ if let Some(tcp_keepalive) = &self.tcp_keepalive_config.into_socket2() {
+ let sock_ref = socket2::SockRef::from(&socket);
+ if let Err(e) = sock_ref.set_tcp_keepalive(tcp_keepalive) {
+ trace!("error trying to set TCP keepalive: {}", e);
+ }
+ }
+ if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
+ trace!("error trying to set TCP nodelay: {}", e);
+ }
+ let local_addr = socket.local_addr()?;
+ return Poll::Ready(Ok(AddrStream::new(socket, remote_addr, local_addr)));
+ }
+ Err(e) => {
+ // Connection errors can be ignored directly, continue by
+ // accepting the next request.
+ if is_connection_error(&e) {
+ debug!("accepted connection already errored: {}", e);
+ continue;
+ }
+
+ if self.sleep_on_errors {
+ error!("accept error: {}", e);
+
+ // Sleep 1s.
+ let mut timeout = Box::pin(tokio::time::sleep(Duration::from_secs(1)));
+
+ match timeout.as_mut().poll(cx) {
+ Poll::Ready(()) => {
+ // Wow, it's been a second already? Ok then...
+ continue;
+ }
+ Poll::Pending => {
+ self.timeout = Some(timeout);
+ return Poll::Pending;
+ }
+ }
+ } else {
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+ }
+}
+
+impl Accept for AddrIncoming {
+ type Conn = AddrStream;
+ type Error = io::Error;
+
+ fn poll_accept(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ let result = ready!(self.poll_next_(cx));
+ Poll::Ready(Some(result))
+ }
+}
+
+/// This function defines errors that are per-connection. Which basically
+/// means that if we get this error from `accept()` system call it means
+/// next connection might be ready to be accepted.
+///
+/// All other errors will incur a timeout before next `accept()` is performed.
+/// The timeout is useful to handle resource exhaustion errors like ENFILE
+/// and EMFILE. Otherwise, could enter into tight loop.
+fn is_connection_error(e: &io::Error) -> bool {
+ matches!(
+ e.kind(),
+ io::ErrorKind::ConnectionRefused
+ | io::ErrorKind::ConnectionAborted
+ | io::ErrorKind::ConnectionReset
+ )
+}
+
+impl fmt::Debug for AddrIncoming {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AddrIncoming")
+ .field("addr", &self.addr)
+ .field("sleep_on_errors", &self.sleep_on_errors)
+ .field("tcp_keepalive_config", &self.tcp_keepalive_config)
+ .field("tcp_nodelay", &self.tcp_nodelay)
+ .finish()
+ }
+}
+
+mod addr_stream {
+ use std::io;
+ use std::net::SocketAddr;
+ #[cfg(unix)]
+ use std::os::unix::io::{AsRawFd, RawFd};
+ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+ use tokio::net::TcpStream;
+
+ use crate::common::{task, Pin, Poll};
+
+ pin_project_lite::pin_project! {
+ /// A transport returned yieled by `AddrIncoming`.
+ #[derive(Debug)]
+ pub struct AddrStream {
+ #[pin]
+ inner: TcpStream,
+ pub(super) remote_addr: SocketAddr,
+ pub(super) local_addr: SocketAddr
+ }
+ }
+
+ impl AddrStream {
+ pub(super) fn new(
+ tcp: TcpStream,
+ remote_addr: SocketAddr,
+ local_addr: SocketAddr,
+ ) -> AddrStream {
+ AddrStream {
+ inner: tcp,
+ remote_addr,
+ local_addr,
+ }
+ }
+
+ /// Returns the remote (peer) address of this connection.
+ #[inline]
+ pub fn remote_addr(&self) -> SocketAddr {
+ self.remote_addr
+ }
+
+ /// Returns the local address of this connection.
+ #[inline]
+ pub fn local_addr(&self) -> SocketAddr {
+ self.local_addr
+ }
+
+ /// Consumes the AddrStream and returns the underlying IO object
+ #[inline]
+ pub fn into_inner(self) -> TcpStream {
+ self.inner
+ }
+
+ /// Attempt to receive data on the socket, without removing that data
+ /// from the queue, registering the current task for wakeup if data is
+ /// not yet available.
+ pub fn poll_peek(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_peek(cx, buf)
+ }
+ }
+
+ impl AsyncRead for AddrStream {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().inner.poll_read(cx, buf)
+ }
+ }
+
+ impl AsyncWrite for AddrStream {
+ #[inline]
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write(cx, buf)
+ }
+
+ #[inline]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write_vectored(cx, bufs)
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ // TCP flush is a noop
+ Poll::Ready(Ok(()))
+ }
+
+ #[inline]
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ self.project().inner.poll_shutdown(cx)
+ }
+
+ #[inline]
+ fn is_write_vectored(&self) -> bool {
+ // Note that since `self.inner` is a `TcpStream`, this could
+ // *probably* be hard-coded to return `true`...but it seems more
+ // correct to ask it anyway (maybe we're on some platform without
+ // scatter-gather IO?)
+ self.inner.is_write_vectored()
+ }
+ }
+
+ #[cfg(unix)]
+ impl AsRawFd for AddrStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+ use crate::server::tcp::TcpKeepaliveConfig;
+
+ #[test]
+ fn no_tcp_keepalive_config() {
+ assert!(TcpKeepaliveConfig::default().into_socket2().is_none());
+ }
+
+ #[test]
+ fn tcp_keepalive_time_config() {
+ let mut kac = TcpKeepaliveConfig::default();
+ kac.time = Some(Duration::from_secs(60));
+ if let Some(tcp_keepalive) = kac.into_socket2() {
+ assert!(format!("{tcp_keepalive:?}").contains("time: Some(60s)"));
+ } else {
+ panic!("test failed");
+ }
+ }
+
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_vendor = "apple",
+ windows,
+ ))]
+ #[test]
+ fn tcp_keepalive_interval_config() {
+ let mut kac = TcpKeepaliveConfig::default();
+ kac.interval = Some(Duration::from_secs(1));
+ if let Some(tcp_keepalive) = kac.into_socket2() {
+ assert!(format!("{tcp_keepalive:?}").contains("interval: Some(1s)"));
+ } else {
+ panic!("test failed");
+ }
+ }
+
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_vendor = "apple",
+ ))]
+ #[test]
+ fn tcp_keepalive_retries_config() {
+ let mut kac = TcpKeepaliveConfig::default();
+ kac.retries = Some(3);
+ if let Some(tcp_keepalive) = kac.into_socket2() {
+ assert!(format!("{tcp_keepalive:?}").contains("retries: Some(3)"));
+ } else {
+ panic!("test failed");
+ }
+ }
+}