summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-timer/src/timeout.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-timer/src/timeout.rs
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-timer/src/timeout.rs')
-rw-r--r--third_party/rust/tokio-timer/src/timeout.rs311
1 files changed, 311 insertions, 0 deletions
diff --git a/third_party/rust/tokio-timer/src/timeout.rs b/third_party/rust/tokio-timer/src/timeout.rs
new file mode 100644
index 0000000000..1c02a68675
--- /dev/null
+++ b/third_party/rust/tokio-timer/src/timeout.rs
@@ -0,0 +1,311 @@
+//! Allows a future or stream to execute for a maximum amount of time.
+//!
+//! See [`Timeout`] documentation for more details.
+//!
+//! [`Timeout`]: struct.Timeout.html
+
+use clock::now;
+use Delay;
+
+use futures::{Async, Future, Poll, Stream};
+
+use std::error;
+use std::fmt;
+use std::time::{Duration, Instant};
+
+/// Allows a `Future` or `Stream` to execute for a limited amount of time.
+///
+/// If the future or stream completes before the timeout has expired, then
+/// `Timeout` returns the completed value. Otherwise, `Timeout` returns an
+/// [`Error`].
+///
+/// # Futures and Streams
+///
+/// The exact behavor depends on if the inner value is a `Future` or a `Stream`.
+/// In the case of a `Future`, `Timeout` will require the future to complete by
+/// a fixed deadline. In the case of a `Stream`, `Timeout` will allow each item
+/// to take the entire timeout before returning an error.
+///
+/// In order to set an upper bound on the processing of the *entire* stream,
+/// then a timeout should be set on the future that processes the stream. For
+/// example:
+///
+/// ```rust
+/// # extern crate futures;
+/// # extern crate tokio;
+/// // import the `timeout` function, usually this is done
+/// // with `use tokio::prelude::*`
+/// use tokio::prelude::FutureExt;
+/// use futures::Stream;
+/// use futures::sync::mpsc;
+/// use std::time::Duration;
+///
+/// # fn main() {
+/// let (tx, rx) = mpsc::unbounded();
+/// # tx.unbounded_send(()).unwrap();
+/// # drop(tx);
+///
+/// let process = rx.for_each(|item| {
+/// // do something with `item`
+/// # drop(item);
+/// # Ok(())
+/// });
+///
+/// # tokio::runtime::current_thread::block_on_all(
+/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
+/// process.timeout(Duration::from_millis(10))
+/// # ).unwrap();
+/// # }
+/// ```
+///
+/// # Cancelation
+///
+/// Cancelling a `Timeout` is done by dropping the value. No additional cleanup
+/// or other work is required.
+///
+/// The original future or stream may be obtained by calling [`Timeout::into_inner`]. This
+/// consumes the `Timeout`.
+///
+/// [`Error`]: struct.Error.html
+/// [`Timeout::into_inner`]: struct.Timeout.html#method.into_iter
+#[must_use = "futures do nothing unless polled"]
+#[derive(Debug)]
+pub struct Timeout<T> {
+ value: T,
+ delay: Delay,
+}
+
+/// Error returned by `Timeout`.
+#[derive(Debug)]
+pub struct Error<T>(Kind<T>);
+
+/// Timeout error variants
+#[derive(Debug)]
+enum Kind<T> {
+ /// Inner value returned an error
+ Inner(T),
+
+ /// The timeout elapsed.
+ Elapsed,
+
+ /// Timer returned an error.
+ Timer(::Error),
+}
+
+impl<T> Timeout<T> {
+ /// Create a new `Timeout` that allows `value` to execute for a duration of
+ /// at most `timeout`.
+ ///
+ /// The exact behavior depends on if `value` is a `Future` or a `Stream`.
+ ///
+ /// See [type] level documentation for more details.
+ ///
+ /// [type]: #
+ ///
+ /// # Examples
+ ///
+ /// Create a new `Timeout` set to expire in 10 milliseconds.
+ ///
+ /// ```rust
+ /// # extern crate futures;
+ /// # extern crate tokio;
+ /// use tokio::timer::Timeout;
+ /// use futures::Future;
+ /// use futures::sync::oneshot;
+ /// use std::time::Duration;
+ ///
+ /// # fn main() {
+ /// let (tx, rx) = oneshot::channel();
+ /// # tx.send(()).unwrap();
+ ///
+ /// # tokio::runtime::current_thread::block_on_all(
+ /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
+ /// Timeout::new(rx, Duration::from_millis(10))
+ /// # ).unwrap();
+ /// # }
+ /// ```
+ pub fn new(value: T, timeout: Duration) -> Timeout<T> {
+ let delay = Delay::new_timeout(now() + timeout, timeout);
+ Timeout::new_with_delay(value, delay)
+ }
+
+ pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
+ Timeout { value, delay }
+ }
+
+ /// Gets a reference to the underlying value in this timeout.
+ pub fn get_ref(&self) -> &T {
+ &self.value
+ }
+
+ /// Gets a mutable reference to the underlying value in this timeout.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.value
+ }
+
+ /// Consumes this timeout, returning the underlying value.
+ pub fn into_inner(self) -> T {
+ self.value
+ }
+}
+
+impl<T: Future> Timeout<T> {
+ /// Create a new `Timeout` that completes when `future` completes or when
+ /// `deadline` is reached.
+ ///
+ /// This function differs from `new` in that:
+ ///
+ /// * It only accepts `Future` arguments.
+ /// * It sets an explicit `Instant` at which the timeout expires.
+ pub fn new_at(future: T, deadline: Instant) -> Timeout<T> {
+ let delay = Delay::new(deadline);
+
+ Timeout {
+ value: future,
+ delay,
+ }
+ }
+}
+
+impl<T> Future for Timeout<T>
+where
+ T: Future,
+{
+ type Item = T::Item;
+ type Error = Error<T::Error>;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ // First, try polling the future
+ match self.value.poll() {
+ Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
+ Ok(Async::NotReady) => {}
+ Err(e) => return Err(Error::inner(e)),
+ }
+
+ // Now check the timer
+ match self.delay.poll() {
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Ok(Async::Ready(_)) => Err(Error::elapsed()),
+ Err(e) => Err(Error::timer(e)),
+ }
+ }
+}
+
+impl<T> Stream for Timeout<T>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+ type Error = Error<T::Error>;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ // First, try polling the future
+ match self.value.poll() {
+ Ok(Async::Ready(v)) => {
+ if v.is_some() {
+ self.delay.reset_timeout();
+ }
+ return Ok(Async::Ready(v));
+ }
+ Ok(Async::NotReady) => {}
+ Err(e) => return Err(Error::inner(e)),
+ }
+
+ // Now check the timer
+ match self.delay.poll() {
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Ok(Async::Ready(_)) => {
+ self.delay.reset_timeout();
+ Err(Error::elapsed())
+ }
+ Err(e) => Err(Error::timer(e)),
+ }
+ }
+}
+
+// ===== impl Error =====
+
+impl<T> Error<T> {
+ /// Create a new `Error` representing the inner value completing with `Err`.
+ pub fn inner(err: T) -> Error<T> {
+ Error(Kind::Inner(err))
+ }
+
+ /// Returns `true` if the error was caused by the inner value completing
+ /// with `Err`.
+ pub fn is_inner(&self) -> bool {
+ match self.0 {
+ Kind::Inner(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Consumes `self`, returning the inner future error.
+ pub fn into_inner(self) -> Option<T> {
+ match self.0 {
+ Kind::Inner(err) => Some(err),
+ _ => None,
+ }
+ }
+
+ /// Create a new `Error` representing the inner value not completing before
+ /// the deadline is reached.
+ pub fn elapsed() -> Error<T> {
+ Error(Kind::Elapsed)
+ }
+
+ /// Returns `true` if the error was caused by the inner value not completing
+ /// before the deadline is reached.
+ pub fn is_elapsed(&self) -> bool {
+ match self.0 {
+ Kind::Elapsed => true,
+ _ => false,
+ }
+ }
+
+ /// Creates a new `Error` representing an error encountered by the timer
+ /// implementation
+ pub fn timer(err: ::Error) -> Error<T> {
+ Error(Kind::Timer(err))
+ }
+
+ /// Returns `true` if the error was caused by the timer.
+ pub fn is_timer(&self) -> bool {
+ match self.0 {
+ Kind::Timer(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Consumes `self`, returning the error raised by the timer implementation.
+ pub fn into_timer(self) -> Option<::Error> {
+ match self.0 {
+ Kind::Timer(err) => Some(err),
+ _ => None,
+ }
+ }
+}
+
+impl<T: error::Error> error::Error for Error<T> {
+ fn description(&self) -> &str {
+ use self::Kind::*;
+
+ match self.0 {
+ Inner(ref e) => e.description(),
+ Elapsed => "deadline has elapsed",
+ Timer(ref e) => e.description(),
+ }
+ }
+}
+
+impl<T: fmt::Display> fmt::Display for Error<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ use self::Kind::*;
+
+ match self.0 {
+ Inner(ref e) => e.fmt(fmt),
+ Elapsed => "deadline has elapsed".fmt(fmt),
+ Timer(ref e) => e.fmt(fmt),
+ }
+ }
+}