summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/server/tcp.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/hyper/src/server/tcp.rs
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/hyper/src/server/tcp.rs')
-rw-r--r--third_party/rust/hyper/src/server/tcp.rs299
1 files changed, 299 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/server/tcp.rs b/third_party/rust/hyper/src/server/tcp.rs
new file mode 100644
index 0000000000..b823818693
--- /dev/null
+++ b/third_party/rust/hyper/src/server/tcp.rs
@@ -0,0 +1,299 @@
+use std::fmt;
+use std::io;
+use std::net::{SocketAddr, TcpListener as StdTcpListener};
+use std::time::Duration;
+
+use futures_util::FutureExt as _;
+use tokio::net::TcpListener;
+use tokio::time::Delay;
+
+use crate::common::{task, Future, Pin, Poll};
+
+pub use self::addr_stream::AddrStream;
+use super::Accept;
+
+/// A stream of connections from binding to an address.
+#[must_use = "streams do nothing unless polled"]
+pub struct AddrIncoming {
+ addr: SocketAddr,
+ listener: TcpListener,
+ sleep_on_errors: bool,
+ tcp_keepalive_timeout: Option<Duration>,
+ tcp_nodelay: bool,
+ timeout: Option<Delay>,
+}
+
+impl AddrIncoming {
+ pub(super) fn new(addr: &SocketAddr) -> crate::Result<Self> {
+ let std_listener = StdTcpListener::bind(addr).map_err(crate::Error::new_listen)?;
+
+ AddrIncoming::from_std(std_listener)
+ }
+
+ pub(super) fn from_std(std_listener: StdTcpListener) -> crate::Result<Self> {
+ let listener = TcpListener::from_std(std_listener).map_err(crate::Error::new_listen)?;
+ let addr = listener.local_addr().map_err(crate::Error::new_listen)?;
+ Ok(AddrIncoming {
+ listener,
+ addr,
+ sleep_on_errors: true,
+ tcp_keepalive_timeout: None,
+ tcp_nodelay: false,
+ timeout: None,
+ })
+ }
+
+ /// Creates a new `AddrIncoming` binding to provided socket address.
+ pub fn bind(addr: &SocketAddr) -> crate::Result<Self> {
+ AddrIncoming::new(addr)
+ }
+
+ /// Get the local address bound to this listener.
+ pub fn local_addr(&self) -> SocketAddr {
+ self.addr
+ }
+
+ /// Set whether TCP keepalive messages are enabled on accepted connections.
+ ///
+ /// If `None` is specified, keepalive is disabled, otherwise the duration
+ /// specified will be the time to remain idle before sending TCP keepalive
+ /// probes.
+ pub fn set_keepalive(&mut self, keepalive: Option<Duration>) -> &mut Self {
+ self.tcp_keepalive_timeout = keepalive;
+ self
+ }
+
+ /// Set the value of `TCP_NODELAY` option for accepted connections.
+ pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self {
+ self.tcp_nodelay = enabled;
+ self
+ }
+
+ /// Set whether to sleep on accept errors.
+ ///
+ /// A possible scenario is that the process has hit the max open files
+ /// allowed, and so trying to accept a new connection will fail with
+ /// `EMFILE`. In some cases, it's preferable to just wait for some time, if
+ /// the application will likely close some files (or connections), and try
+ /// to accept the connection again. If this option is `true`, the error
+ /// will be logged at the `error` level, since it is still a big deal,
+ /// and then the listener will sleep for 1 second.
+ ///
+ /// In other cases, hitting the max open files should be treat similarly
+ /// to being out-of-memory, and simply error (and shutdown). Setting
+ /// this option to `false` will allow that.
+ ///
+ /// Default is `true`.
+ pub fn set_sleep_on_errors(&mut self, val: bool) {
+ self.sleep_on_errors = val;
+ }
+
+ fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> {
+ // Check if a previous timeout is active that was set by IO errors.
+ if let Some(ref mut to) = self.timeout {
+ match Pin::new(to).poll(cx) {
+ Poll::Ready(()) => {}
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ self.timeout = None;
+
+ let accept = self.listener.accept();
+ futures_util::pin_mut!(accept);
+
+ loop {
+ match accept.poll_unpin(cx) {
+ Poll::Ready(Ok((socket, addr))) => {
+ if let Some(dur) = self.tcp_keepalive_timeout {
+ if let Err(e) = socket.set_keepalive(Some(dur)) {
+ trace!("error trying to set TCP keepalive: {}", e);
+ }
+ }
+ if let Err(e) = socket.set_nodelay(self.tcp_nodelay) {
+ trace!("error trying to set TCP nodelay: {}", e);
+ }
+ return Poll::Ready(Ok(AddrStream::new(socket, addr)));
+ }
+ Poll::Pending => return Poll::Pending,
+ Poll::Ready(Err(e)) => {
+ // Connection errors can be ignored directly, continue by
+ // accepting the next request.
+ if is_connection_error(&e) {
+ debug!("accepted connection already errored: {}", e);
+ continue;
+ }
+
+ if self.sleep_on_errors {
+ error!("accept error: {}", e);
+
+ // Sleep 1s.
+ let mut timeout = tokio::time::delay_for(Duration::from_secs(1));
+
+ match Pin::new(&mut timeout).poll(cx) {
+ Poll::Ready(()) => {
+ // Wow, it's been a second already? Ok then...
+ continue;
+ }
+ Poll::Pending => {
+ self.timeout = Some(timeout);
+ return Poll::Pending;
+ }
+ }
+ } else {
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+ }
+}
+
+impl Accept for AddrIncoming {
+ type Conn = AddrStream;
+ type Error = io::Error;
+
+ fn poll_accept(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+ let result = ready!(self.poll_next_(cx));
+ Poll::Ready(Some(result))
+ }
+}
+
+/// This function defines errors that are per-connection. Which basically
+/// means that if we get this error from `accept()` system call it means
+/// next connection might be ready to be accepted.
+///
+/// All other errors will incur a timeout before next `accept()` is performed.
+/// The timeout is useful to handle resource exhaustion errors like ENFILE
+/// and EMFILE. Otherwise, could enter into tight loop.
+fn is_connection_error(e: &io::Error) -> bool {
+ match e.kind() {
+ io::ErrorKind::ConnectionRefused
+ | io::ErrorKind::ConnectionAborted
+ | io::ErrorKind::ConnectionReset => true,
+ _ => false,
+ }
+}
+
+impl fmt::Debug for AddrIncoming {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AddrIncoming")
+ .field("addr", &self.addr)
+ .field("sleep_on_errors", &self.sleep_on_errors)
+ .field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout)
+ .field("tcp_nodelay", &self.tcp_nodelay)
+ .finish()
+ }
+}
+
+mod addr_stream {
+ use bytes::{Buf, BufMut};
+ use std::io;
+ use std::net::SocketAddr;
+ use tokio::io::{AsyncRead, AsyncWrite};
+ use tokio::net::TcpStream;
+
+ use crate::common::{task, Pin, Poll};
+
+ /// A transport returned yieled by `AddrIncoming`.
+ #[derive(Debug)]
+ pub struct AddrStream {
+ inner: TcpStream,
+ pub(super) remote_addr: SocketAddr,
+ }
+
+ impl AddrStream {
+ pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream {
+ AddrStream {
+ inner: tcp,
+ remote_addr: addr,
+ }
+ }
+
+ /// Returns the remote (peer) address of this connection.
+ #[inline]
+ pub fn remote_addr(&self) -> SocketAddr {
+ self.remote_addr
+ }
+
+ /// Consumes the AddrStream and returns the underlying IO object
+ #[inline]
+ pub fn into_inner(self) -> TcpStream {
+ self.inner
+ }
+
+ /// Attempt to receive data on the socket, without removing that data
+ /// from the queue, registering the current task for wakeup if data is
+ /// not yet available.
+ pub fn poll_peek(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_peek(cx, buf)
+ }
+ }
+
+ impl AsyncRead for AddrStream {
+ unsafe fn prepare_uninitialized_buffer(
+ &self,
+ buf: &mut [std::mem::MaybeUninit<u8>],
+ ) -> bool {
+ self.inner.prepare_uninitialized_buffer(buf)
+ }
+
+ #[inline]
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+
+ #[inline]
+ fn poll_read_buf<B: BufMut>(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_read_buf(cx, buf)
+ }
+ }
+
+ impl AsyncWrite for AddrStream {
+ #[inline]
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ #[inline]
+ fn poll_write_buf<B: Buf>(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write_buf(cx, buf)
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ // TCP flush is a noop
+ Poll::Ready(Ok(()))
+ }
+
+ #[inline]
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+ }
+}