diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-0.1.11/src | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.11/src')
24 files changed, 3962 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/src/async_await.rs b/third_party/rust/tokio-0.1.11/src/async_await.rs new file mode 100644 index 0000000000..88903643ff --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/async_await.rs @@ -0,0 +1,26 @@ +use std::future::{Future as StdFuture}; + +async fn map_ok<T: StdFuture>(future: T) -> Result<(), ()> { + let _ = await!(future); + Ok(()) +} + +/// Like `tokio::run`, but takes an `async` block +pub fn run_async<F>(future: F) +where F: StdFuture<Output = ()> + Send + 'static, +{ + use tokio_async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); + + ::run(future); +} + +/// Like `tokio::spawn`, but takes an `async` block +pub fn spawn_async<F>(future: F) +where F: StdFuture<Output = ()> + Send + 'static, +{ + use tokio_async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); + + ::spawn(future); +} diff --git a/third_party/rust/tokio-0.1.11/src/clock.rs b/third_party/rust/tokio-0.1.11/src/clock.rs new file mode 100644 index 0000000000..7ddbbf37fe --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/clock.rs @@ -0,0 +1,15 @@ +//! A configurable source of time. +//! +//! This module provides the [`now`][n] function, which returns an `Instant` +//! representing "now". The source of time used by this function is configurable +//! (via the [`tokio-timer`] crate) and allows mocking out the source of time in +//! tests or performing caching operations to reduce the number of syscalls. +//! +//! Note that, because the source of time is configurable, it is possible to +//! observe non-monotonic behavior when calling [`now`][n] from different +//! executors. +//! +//! [n]: fn.now.html +//! [`tokio-timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/clock/index.html + +pub use tokio_timer::clock::now; diff --git a/third_party/rust/tokio-0.1.11/src/codec/length_delimited.rs b/third_party/rust/tokio-0.1.11/src/codec/length_delimited.rs new file mode 100644 index 0000000000..54ec202bb1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/codec/length_delimited.rs @@ -0,0 +1,971 @@ +//! Frame a stream of bytes based on a length prefix +//! +//! Many protocols delimit their frames by prefacing frame data with a +//! frame head that specifies the length of the frame. The +//! `length_delimited` module provides utilities for handling the length +//! based framing. This allows the consumer to work with entire frames +//! without having to worry about buffering or other framing logic. +//! +//! # Getting started +//! +//! If implementing a protocol from scratch, using length delimited framing +//! is an easy way to get started. [`Codec::new()`] will return a length +//! delimited codec using default configuration values. This can then be +//! used to construct a framer to adapt a full-duplex byte stream into a +//! stream of frames. +//! +//! ``` +//! # extern crate tokio; +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use tokio::codec::*; +//! +//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) +//! -> Framed<T, LengthDelimitedCodec> +//! { +//! Framed::new(io, LengthDelimitedCodec::new()) +//! } +//! # pub fn main() {} +//! ``` +//! +//! The returned transport implements `Sink + Stream` for `BytesMut`. It +//! encodes the frame with a big-endian `u32` header denoting the frame +//! payload length: +//! +//! ```text +//! +----------+--------------------------------+ +//! | len: u32 | frame payload | +//! +----------+--------------------------------+ +//! ``` +//! +//! Specifically, given the following: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate bytes; +//! # extern crate futures; +//! # +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use tokio::codec::*; +//! use bytes::Bytes; +//! use futures::{Sink, Future}; +//! +//! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { +//! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); +//! let frame = Bytes::from("hello world"); +//! +//! transport.send(frame).wait().unwrap(); +//! } +//! # +//! # pub fn main() {} +//! ``` +//! +//! The encoded frame will look like this: +//! +//! ```text +//! +---- len: u32 ----+---- data ----+ +//! | \x00\x00\x00\x0b | hello world | +//! +------------------+--------------+ +//! ``` +//! +//! # Decoding +//! +//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], +//! such that each yielded [`BytesMut`] value contains the contents of an +//! entire frame. There are many configuration parameters enabling +//! [`FramedRead`] to handle a wide range of protocols. Here are some +//! examples that will cover the various options at a high level. +//! +//! ## Example 1 +//! +//! The following will parse a `u16` length field at offset 0, including the +//! frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(0) // default value +//! .num_skip(0) // Do not strip frame header +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ +//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | +//! +----------+---------------+ +----------+---------------+ +//! ``` +//! +//! The value of the length field is 11 (`\x0B`) which represents the length +//! of the payload, `hello world`. By default, [`FramedRead`] assumes that +//! the length field represents the number of bytes that **follows** the +//! length field. Thus, the entire frame has a length of 13: 2 bytes for the +//! frame head + 11 bytes for the payload. +//! +//! ## Example 2 +//! +//! The following will parse a `u16` length field at offset 0, omitting the +//! frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(0) // default value +//! // `num_skip` is not needed, the default is to skip +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +--- Payload ---+ +//! | \x00\x0B | Hello world | --> | Hello world | +//! +----------+---------------+ +---------------+ +//! ``` +//! +//! This is similar to the first example, the only difference is that the +//! frame head is **not** included in the yielded `BytesMut` value. +//! +//! ## Example 3 +//! +//! The following will parse a `u16` length field at offset 0, including the +//! frame head in the yielded `BytesMut`. In this case, the length field +//! **includes** the frame head length. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(-2) // size of head +//! .num_skip(0) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ +//! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | +//! +----------+---------------+ +----------+---------------+ +//! ``` +//! +//! In most cases, the length field represents the length of the payload +//! only, as shown in the previous examples. However, in some protocols the +//! length field represents the length of the whole frame, including the +//! head. In such cases, we specify a negative `length_adjustment` to adjust +//! the value provided in the frame head to represent the payload length. +//! +//! ## Example 4 +//! +//! The following will parse a 3 byte length field at offset 0 in a 5 byte +//! frame head, including the frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(3) +//! .length_adjustment(2) // remaining head +//! .num_skip(0) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +---- len -----+- head -+--- Payload ---+ +//! | \x00\x00\x0B | \xCAFE | Hello world | +//! +--------------+--------+---------------+ +//! +//! DECODED +//! +---- len -----+- head -+--- Payload ---+ +//! | \x00\x00\x0B | \xCAFE | Hello world | +//! +--------------+--------+---------------+ +//! ``` +//! +//! A more advanced example that shows a case where there is extra frame +//! head data between the length field and the payload. In such cases, it is +//! usually desirable to include the frame head as part of the yielded +//! `BytesMut`. This lets consumers of the length delimited framer to +//! process the frame head as needed. +//! +//! The positive `length_adjustment` value lets `FramedRead` factor in the +//! additional head into the frame length calculation. +//! +//! ## Example 5 +//! +//! The following will parse a `u16` length field at offset 1 of a 4 byte +//! frame head. The first byte and the length field will be omitted from the +//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be +//! included. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(1) // length of hdr1 +//! .length_field_length(2) +//! .length_adjustment(1) // length of hdr2 +//! .num_skip(3) // length of hdr1 + LEN +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ +//! | \xCA | \x00\x0B | \xFE | Hello world | +//! +--------+----------+--------+---------------+ +//! +//! DECODED +//! +- hdr2 -+--- Payload ---+ +//! | \xFE | Hello world | +//! +--------+---------------+ +//! ``` +//! +//! The length field is situated in the middle of the frame head. In this +//! case, the first byte in the frame head could be a version or some other +//! identifier that is not needed for processing. On the other hand, the +//! second half of the head is needed. +//! +//! `length_field_offset` indicates how many bytes to skip before starting +//! to read the length field. `length_adjustment` is the number of bytes to +//! skip starting at the end of the length field. In this case, it is the +//! second half of the head. +//! +//! ## Example 6 +//! +//! The following will parse a `u16` length field at offset 1 of a 4 byte +//! frame head. The first byte and the length field will be omitted from the +//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be +//! included. In this case, the length field **includes** the frame head +//! length. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(1) // length of hdr1 +//! .length_field_length(2) +//! .length_adjustment(-3) // length of hdr1 + LEN, negative +//! .num_skip(3) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ +//! | \xCA | \x00\x0F | \xFE | Hello world | +//! +--------+----------+--------+---------------+ +//! +//! DECODED +//! +- hdr2 -+--- Payload ---+ +//! | \xFE | Hello world | +//! +--------+---------------+ +//! ``` +//! +//! Similar to the example above, the difference is that the length field +//! represents the length of the entire frame instead of just the payload. +//! The length of `hdr1` and `len` must be counted in `length_adjustment`. +//! Note that the length of `hdr2` does **not** need to be explicitly set +//! anywhere because it already is factored into the total frame length that +//! is read from the byte stream. +//! +//! # Encoding +//! +//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], +//! such that each submitted [`BytesMut`] is prefaced by a length field. +//! There are fewer configuration options than [`FramedRead`]. Given +//! protocols that have more complex frame heads, an encoder should probably +//! be written by hand using [`Encoder`]. +//! +//! Here is a simple example, given a `FramedWrite` with the following +//! configuration: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate bytes; +//! # use tokio::io::AsyncWrite; +//! # use tokio::codec::length_delimited; +//! # use bytes::BytesMut; +//! # fn write_frame<T: AsyncWrite>(io: T) { +//! # let _ = +//! length_delimited::Builder::new() +//! .length_field_length(2) +//! .new_write(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! A payload of `hello world` will be encoded as: +//! +//! ```text +//! +- len: u16 -+---- data ----+ +//! | \x00\x0b | hello world | +//! +------------+--------------+ +//! ``` +//! +//! [`FramedRead`]: struct.FramedRead.html +//! [`FramedWrite`]: struct.FramedWrite.html +//! [`AsyncRead`]: ../../trait.AsyncRead.html +//! [`AsyncWrite`]: ../../trait.AsyncWrite.html +//! [`Encoder`]: ../trait.Encoder.html +//! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html + +use { + codec::{ + Decoder, Encoder, FramedRead, FramedWrite, Framed + }, + io::{ + AsyncRead, AsyncWrite + }, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; + +use std::{cmp, fmt}; +use std::error::Error as StdError; +use std::io::{self, Cursor}; + +/// Configure length delimited `LengthDelimitedCodec`s. +/// +/// `Builder` enables constructing configured length delimited codecs. Note +/// that not all configuration settings apply to both encoding and decoding. See +/// the documentation for specific methods for more detail. +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + max_frame_len: usize, + + // Number of bytes representing the field length + length_field_len: usize, + + // Number of bytes in the header before the length field + length_field_offset: usize, + + // Adjust the length specified in the header field by this amount + length_adjustment: isize, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: Option<usize>, + + // Length field byte order (little or big endian) + length_field_is_big_endian: bool, +} + +/// An error when the number of bytes read is more than max frame length. +pub struct FrameTooBig { + _priv: (), +} + +/// A codec for frames delimited by a frame head specifying their lengths. +/// +/// This allows the consumer to work with entire frames without having to worry +/// about buffering or other framing logic. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[derive(Debug)] +pub struct LengthDelimitedCodec { + // Configuration values + builder: Builder, + + // Read state + state: DecodeState, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +// ===== impl LengthDelimitedCodec ====== + +impl LengthDelimitedCodec { + /// Creates a new `LengthDelimitedCodec` with the default configuration values. + pub fn new() -> Self { + Self { + builder: Builder::new(), + state: DecodeState::Head, + } + } + + /// Returns the current max frame setting + /// + /// This is the largest size this codec will accept from the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is decoded. In other + /// words, if a frame is currently in process of being decoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.builder.max_frame_length(val); + } + + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { + let head_len = self.builder.num_head_bytes(); + let field_len = self.builder.length_field_len; + + if src.len() < head_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + // Skip the required bytes + src.advance(self.builder.length_field_offset); + + // match endianess + let n = if self.builder.length_field_is_big_endian { + src.get_uint_be(field_len) + } else { + src.get_uint_le(field_len) + }; + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new(io::ErrorKind::InvalidData, FrameTooBig { + _priv: (), + })); + } + + // The check above ensures there is no overflow + let n = n as usize; + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_sub(-self.builder.length_adjustment as usize) + } else { + n.checked_add(self.builder.length_adjustment as usize) + }; + + // Error handling + match n { + Some(n) => n, + None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")), + } + }; + + let num_skip = self.builder.get_num_skip(); + + if num_skip > 0 { + let _ = src.split_to(num_skip); + } + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + return Ok(Some(n)); + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + +impl Decoder for LengthDelimitedCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + let n = match self.state { + DecodeState::Head => { + match try!(self.decode_head(src)) { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + } + } + DecodeState::Data(n) => n, + }; + + match try!(self.decode_data(n, src)) { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + src.reserve(self.builder.num_head_bytes()); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +impl Encoder for LengthDelimitedCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { + let n = (&data).into_buf().remaining(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new(io::ErrorKind::InvalidInput, FrameTooBig { + _priv: (), + })); + } + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_add(-self.builder.length_adjustment as usize) + } else { + n.checked_sub(self.builder.length_adjustment as usize) + }; + + let n = n.ok_or_else(|| io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + ))?; + + if self.builder.length_field_is_big_endian { + dst.put_uint_be(n as u64, self.builder.length_field_len); + } else { + dst.put_uint_le(n as u64, self.builder.length_field_len); + } + + // Write the frame to the buffer + dst.extend_from_slice(&data[..]); + + Ok(()) + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited codec builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new() -> Builder { + Builder { + // Default max frame length of 8MB + max_frame_len: 8 * 1_024 * 1_024, + + // Default byte length of 4 + length_field_len: 4, + + // Default to the header field being at the start of the header. + length_field_offset: 0, + + length_adjustment: 0, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: None, + + // Default to reading the length field in network (big) endian. + length_field_is_big_endian: true, + } + } + + /// Read the length field as a big endian integer + /// + /// This is the default setting. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .big_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn big_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = true; + self + } + + /// Read the length field as a little endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .little_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn little_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = false; + self + } + + /// Read the length field as a native endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .native_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn native_endian(&mut self) -> &mut Self { + if cfg!(target_endian = "big") { + self.big_endian() + } else { + self.little_endian() + } + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `FrameTooBig` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Sets the number of bytes used to represent the length field + /// + /// The default value is `4`. The max value is `8`. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_length(4) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_field_length(&mut self, val: usize) -> &mut Self { + assert!(val > 0 && val <= 8, "invalid length field length"); + self.length_field_len = val; + self + } + + /// Sets the number of bytes in the header before the length field + /// + /// This configuration option only applies to decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(1) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_field_offset(&mut self, val: usize) -> &mut Self { + self.length_field_offset = val; + self + } + + /// Delta between the payload length specified in the header and the real + /// payload length + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_adjustment(-2) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_adjustment(&mut self, val: isize) -> &mut Self { + self.length_adjustment = val; + self + } + + /// Sets the number of bytes to skip before reading the payload + /// + /// Default value is `length_field_len + length_field_offset` + /// + /// This configuration option only applies to decoding + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .num_skip(4) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn num_skip(&mut self, val: usize) -> &mut Self { + self.num_skip = Some(val); + self + } + + /// Create a configured length delimited `LengthDelimitedCodec` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// # pub fn main() { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_codec(); + /// # } + /// ``` + pub fn new_codec(&self) -> LengthDelimitedCodec { + LengthDelimitedCodec { + builder: *self, + state: DecodeState::Head, + } + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> + where T: AsyncRead, + { + FramedRead::new(upstream, self.new_codec()) + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate bytes; + /// # use tokio::io::AsyncWrite; + /// # use tokio::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncWrite>(io: T) { + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> + where T: AsyncWrite, + { + FramedWrite::new(inner, self.new_codec()) + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate bytes; + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use tokio::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + /// # let _ = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> + where T: AsyncRead + AsyncWrite, + { + Framed::new(inner, self.new_codec()) + } + + fn num_head_bytes(&self) -> usize { + let num = self.length_field_offset + self.length_field_len; + cmp::max(num, self.num_skip.unwrap_or(0)) + } + + fn get_num_skip(&self) -> usize { + self.num_skip.unwrap_or(self.length_field_offset + self.length_field_len) + } +} + + +// ===== impl FrameTooBig ===== + +impl fmt::Debug for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameTooBig") + .finish() + } +} + +impl fmt::Display for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for FrameTooBig { + fn description(&self) -> &str { + "frame size too big" + } +} diff --git a/third_party/rust/tokio-0.1.11/src/codec/mod.rs b/third_party/rust/tokio-0.1.11/src/codec/mod.rs new file mode 100644 index 0000000000..cb0fc922de --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/codec/mod.rs @@ -0,0 +1,26 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: ../io/trait.AsyncRead.html +//! [`AsyncWrite`]: ../io/trait.AsyncWrite.html +//! [`Sink`]: https://docs.rs/futures/0.1/futures/sink/trait.Sink.html +//! [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html +//! [transports]: https://tokio.rs/docs/going-deeper/frames/ + +pub use tokio_codec::{ + Decoder, + Encoder, + Framed, + FramedParts, + FramedRead, + FramedWrite, + BytesCodec, + LinesCodec, +}; + +pub mod length_delimited; + +pub use self::length_delimited::LengthDelimitedCodec; diff --git a/third_party/rust/tokio-0.1.11/src/executor/current_thread/mod.rs b/third_party/rust/tokio-0.1.11/src/executor/current_thread/mod.rs new file mode 100644 index 0000000000..6036aa997b --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/executor/current_thread/mod.rs @@ -0,0 +1,170 @@ +#![allow(deprecated)] + +//! Execute many tasks concurrently on the current thread. +//! +//! [`CurrentThread`] is an executor that keeps tasks on the same thread that +//! they were spawned from. This allows it to execute futures that are not +//! `Send`. +//! +//! A single [`CurrentThread`] instance is able to efficiently manage a large +//! number of tasks and will attempt to schedule all tasks fairly. +//! +//! All tasks that are being managed by a [`CurrentThread`] executor are able to +//! spawn additional tasks by calling [`spawn`]. This function only works from +//! within the context of a running [`CurrentThread`] instance. +//! +//! The easiest way to start a new [`CurrentThread`] executor is to call +//! [`block_on_all`] with an initial task to seed the executor. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread; +//! use futures::future::lazy; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! current_thread::block_on_all(lazy(|| { +//! // The execution context is setup, futures may be executed. +//! current_thread::spawn(lazy(|| { +//! println!("called from the current thread executor"); +//! Ok(()) +//! })); +//! +//! Ok::<_, ()>(()) +//! })); +//! # } +//! ``` +//! +//! The `block_on_all` function will block the current thread until **all** +//! tasks that have been spawned onto the [`CurrentThread`] instance have +//! completed. +//! +//! More fine-grain control can be achieved by using [`CurrentThread`] directly. +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread::CurrentThread; +//! use futures::future::{lazy, empty}; +//! use std::time::Duration; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! let mut current_thread = CurrentThread::new(); +//! +//! // Spawn a task, the task is not executed yet. +//! current_thread.spawn(lazy(|| { +//! println!("Spawning a task"); +//! Ok(()) +//! })); +//! +//! // Spawn a task that never completes +//! current_thread.spawn(empty()); +//! +//! // Run the executor, but only until the provided future completes. This +//! // provides the opportunity to start executing previously spawned tasks. +//! let res = current_thread.block_on(lazy(|| { +//! Ok::<_, ()>("Hello") +//! })).unwrap(); +//! +//! // Now, run the executor for *at most* 1 second. Since a task was spawned +//! // that never completes, this function will return with an error. +//! current_thread.run_timeout(Duration::from_secs(1)).unwrap_err(); +//! # } +//! ``` +//! +//! # Execution model +//! +//! Internally, [`CurrentThread`] maintains a queue. When one of its tasks is +//! notified, the task gets added to the queue. The executor will pop tasks from +//! the queue and call [`Future::poll`]. If the task gets notified while it is +//! being executed, it won't get re-executed until all other tasks currently in +//! the queue get polled. +//! +//! Before the task is polled, a thread-local variable referencing the current +//! [`CurrentThread`] instance is set. This enables [`spawn`] to spawn new tasks +//! onto the same executor without having to thread through a handle value. +//! +//! If the [`CurrentThread`] instance still has uncompleted tasks, but none of +//! these tasks are ready to be polled, the current thread is put to sleep. When +//! a task is notified, the thread is woken up and processing resumes. +//! +//! All tasks managed by [`CurrentThread`] remain on the current thread. When a +//! task completes, it is dropped. +//! +//! [`spawn`]: fn.spawn.html +//! [`block_on_all`]: fn.block_on_all.html +//! [`CurrentThread`]: struct.CurrentThread.html +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll + +pub use tokio_current_thread::{ + BlockError, + CurrentThread, + Entered, + Handle, + RunError, + RunTimeoutError, + TaskExecutor, + Turn, + TurnError, + block_on_all, + spawn, +}; + +use std::cell::Cell; +use std::marker::PhantomData; + +use futures::future::{self}; + +#[deprecated(since = "0.1.2", note = "use block_on_all instead")] +#[doc(hidden)] +#[derive(Debug)] +pub struct Context<'a> { + cancel: Cell<bool>, + _p: PhantomData<&'a ()>, +} + +impl<'a> Context<'a> { + /// Cancels *all* executing futures. + pub fn cancel_all_spawned(&self) { + self.cancel.set(true); + } +} + +#[deprecated(since = "0.1.2", note = "use block_on_all instead")] +#[doc(hidden)] +pub fn run<F, R>(f: F) -> R + where F: FnOnce(&mut Context) -> R +{ + let mut context = Context { + cancel: Cell::new(false), + _p: PhantomData, + }; + + let mut current_thread = CurrentThread::new(); + + let ret = current_thread + .block_on(future::lazy(|| Ok::<_, ()>(f(&mut context)))) + .unwrap(); + + if context.cancel.get() { + return ret; + } + + current_thread.run().unwrap(); + ret +} + +#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")] +#[doc(hidden)] +pub fn task_executor() -> TaskExecutor { + TaskExecutor::current() +} + diff --git a/third_party/rust/tokio-0.1.11/src/executor/mod.rs b/third_party/rust/tokio-0.1.11/src/executor/mod.rs new file mode 100644 index 0000000000..e1e47ae1d6 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/executor/mod.rs @@ -0,0 +1,145 @@ +//! Task execution utilities. +//! +//! In the Tokio execution model, futures are lazy. When a future is created, no +//! work is performed. In order for the work defined by the future to happen, +//! the future must be submitted to an executor. A future that is submitted to +//! an executor is called a "task". +//! +//! The executor is responsible for ensuring that [`Future::poll`] is +//! called whenever the task is [notified]. Notification happens when the +//! internal state of a task transitions from "not ready" to ready. For +//! example, a socket might have received data and a call to `read` will now be +//! able to succeed. +//! +//! The specific strategy used to manage the tasks is left up to the +//! executor. There are two main flavors of executors: single-threaded and +//! multi-threaded. Tokio provides implementation for both of these in the +//! [`runtime`] module. +//! +//! # `Executor` trait. +//! +//! This module provides the [`Executor`] trait (re-exported from +//! [`tokio-executor`]), which describes the API that all executors must +//! implement. +//! +//! A free [`spawn`] function is provided that allows spawning futures onto the +//! default executor (tracked via a thread-local variable) without referencing a +//! handle. It is expected that all executors will set a value for the default +//! executor. This value will often be set to the executor itself, but it is +//! possible that the default executor might be set to a different executor. +//! +//! For example, a single threaded executor might set the default executor to a +//! thread pool instead of itself, allowing futures to spawn new tasks onto the +//! thread pool when those tasks are `Send`. +//! +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll +//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify +//! [`runtime`]: ../runtime/index.html +//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1 +//! [`Executor`]: trait.Executor.html +//! [`spawn`]: fn.spawn.html + +#[deprecated( + since = "0.1.8", + note = "use tokio-current-thread crate or functions in tokio::runtime::current_thread instead", +)] +#[doc(hidden)] +pub mod current_thread; + +#[deprecated(since = "0.1.8", note = "use tokio-threadpool crate instead")] +#[doc(hidden)] +/// Re-exports of [`tokio-threadpool`], deprecated in favor of the crate. +/// +/// [`tokio-threadpool`]: https://docs.rs/tokio-threadpool/0.1 +pub mod thread_pool { + pub use tokio_threadpool::{ + Builder, + Sender, + Shutdown, + ThreadPool, + }; +} + +pub use tokio_executor::{Executor, DefaultExecutor, SpawnError}; + +use futures::{Future, IntoFuture}; +use futures::future::{self, FutureResult}; + +/// Return value from the `spawn` function. +/// +/// Currently this value doesn't actually provide any functionality. However, it +/// provides a way to add functionality later without breaking backwards +/// compatibility. +/// +/// This also implements `IntoFuture` so that it can be used as the return value +/// in a `for_each` loop. +/// +/// See [`spawn`] for more details. +/// +/// [`spawn`]: fn.spawn.html +#[derive(Debug)] +pub struct Spawn(()); + +/// Spawns a future on the default executor. +/// +/// In order for a future to do work, it must be spawned on an executor. The +/// `spawn` function is the easiest way to do this. It spawns a future on the +/// [default executor] for the current execution context (tracked using a +/// thread-local variable). +/// +/// The default executor is **usually** a thread pool. +/// +/// # Examples +/// +/// In this example, a server is started and `spawn` is used to start a new task +/// that processes each received connection. +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// [default executor]: struct.DefaultExecutor.html +/// +/// # Panics +/// +/// This function will panic if the default executor is not set or if spawning +/// onto the default executor returns an error. To avoid the panic, use +/// [`DefaultExecutor`]. +/// +/// [`DefaultExecutor`]: struct.DefaultExecutor.html +pub fn spawn<F>(f: F) -> Spawn +where F: Future<Item = (), Error = ()> + 'static + Send +{ + ::tokio_executor::spawn(f); + Spawn(()) +} + +impl IntoFuture for Spawn { + type Future = FutureResult<(), ()>; + type Item = (); + type Error = (); + + fn into_future(self) -> Self::Future { + future::ok(()) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/fs.rs b/third_party/rust/tokio-0.1.11/src/fs.rs new file mode 100644 index 0000000000..689a601368 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/fs.rs @@ -0,0 +1,12 @@ +//! Asynchronous filesystem manipulation operations. +//! +//! This module contains basic methods and types for manipulating the contents +//! of the local filesystem from within the context of the Tokio runtime. +//! +//! Unlike *most* other Tokio APIs, the filesystem APIs **must** be used from +//! the context of the Tokio runtime as they require Tokio specific features to +//! function. + +pub use tokio_fs::{create_dir, create_dir_all, file, hard_link, metadata, os, read_dir, read_link}; +pub use tokio_fs::{remove_dir, remove_file, rename, set_permissions, symlink_metadata, File}; +pub use tokio_fs::OpenOptions; diff --git a/third_party/rust/tokio-0.1.11/src/io.rs b/third_party/rust/tokio-0.1.11/src/io.rs new file mode 100644 index 0000000000..1d6bfd3a70 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/io.rs @@ -0,0 +1,93 @@ +//! Asynchronous I/O. +//! +//! This module is the asynchronous version of `std::io`. Primarily, it +//! defines two traits, [`AsyncRead`] and [`AsyncWrite`], which extend the +//! `Read` and `Write` traits of the standard library. +//! +//! # AsyncRead and AsyncWrite +//! +//! [`AsyncRead`] and [`AsyncWrite`] must only be implemented for +//! non-blocking I/O types that integrate with the futures type system. In +//! other words, these types must never block the thread, and instead the +//! current task is notified when the I/O resource is ready. +//! +//! # Standard input and output +//! +//! Tokio provides asynchronous APIs to standard [input], [output], and [error]. +//! These APIs are very similar to the ones provided by `std`, but they also +//! implement [`AsyncRead`] and [`AsyncWrite`]. +//! +//! Unlike *most* other Tokio APIs, the standard input / output APIs +//! **must** be used from the context of the Tokio runtime as they require +//! Tokio specific features to function. +//! +//! [input]: fn.stdin.html +//! [output]: fn.stdout.html +//! [error]: fn.stderr.html +//! +//! # Utility functions +//! +//! Utilities functions are provided for working with [`AsyncRead`] / +//! [`AsyncWrite`] types. For example, [`copy`] asynchronously copies all +//! data from a source to a destination. +//! +//! # `std` re-exports +//! +//! Additionally, [`Read`], [`Write`], [`Error`], [`ErrorKind`], and +//! [`Result`] are re-exported from `std::io` for ease of use. +//! +//! [`AsyncRead`]: trait.AsyncRead.html +//! [`AsyncWrite`]: trait.AsyncWrite.html +//! [`copy`]: fn.copy.html +//! [`Read`]: trait.Read.html +//! [`Write`]: trait.Write.html +//! [`Error`]: struct.Error.html +//! [`ErrorKind`]: enum.ErrorKind.html +//! [`Result`]: type.Result.html + +pub use tokio_io::{ + AsyncRead, + AsyncWrite, +}; + +// standard input, output, and error +pub use tokio_fs::{ + stdin, + Stdin, + stdout, + Stdout, + stderr, + Stderr, +}; + +// Utils +pub use tokio_io::io::{ + copy, + Copy, + flush, + Flush, + lines, + Lines, + read_exact, + ReadExact, + read_to_end, + ReadToEnd, + read_until, + ReadUntil, + ReadHalf, + shutdown, + Shutdown, + write_all, + WriteAll, + WriteHalf, +}; + +// Re-export io::Error so that users don't have to deal +// with conflicts when `use`ing `futures::io` and `std::io`. +pub use ::std::io::{ + Error, + ErrorKind, + Result, + Read, + Write, +}; diff --git a/third_party/rust/tokio-0.1.11/src/lib.rs b/third_party/rust/tokio-0.1.11/src/lib.rs new file mode 100644 index 0000000000..f652e53a90 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/lib.rs @@ -0,0 +1,120 @@ +#![doc(html_root_url = "https://docs.rs/tokio/0.1.11")] +#![deny(missing_docs, warnings, missing_debug_implementations)] +#![cfg_attr(feature = "async-await-preview", feature( + async_await, + await_macro, + futures_api, + ))] + +//! A runtime for writing reliable, asynchronous, and slim applications. +//! +//! Tokio is an event-driven, non-blocking I/O platform for writing asynchronous +//! applications with the Rust programming language. At a high level, it +//! provides a few major components: +//! +//! * A multi threaded, work-stealing based task [scheduler][runtime]. +//! * A [reactor] backed by the operating system's event queue (epoll, kqueue, +//! IOCP, etc...). +//! * Asynchronous [TCP and UDP][net] sockets. +//! * Asynchronous [filesystem][fs] operations. +//! * [Timer][timer] API for scheduling work in the future. +//! +//! Tokio is built using [futures] as the abstraction for managing the +//! complexity of asynchronous programming. +//! +//! Guide level documentation is found on the [website]. +//! +//! [website]: https://tokio.rs/docs/getting-started/hello-world/ +//! [futures]: http://docs.rs/futures/0.1 +//! +//! # Examples +//! +//! A simple TCP echo server: +//! +//! ```no_run +//! extern crate tokio; +//! +//! use tokio::prelude::*; +//! use tokio::io::copy; +//! use tokio::net::TcpListener; +//! +//! fn main() { +//! // Bind the server's socket. +//! let addr = "127.0.0.1:12345".parse().unwrap(); +//! let listener = TcpListener::bind(&addr) +//! .expect("unable to bind TCP listener"); +//! +//! // Pull out a stream of sockets for incoming connections +//! let server = listener.incoming() +//! .map_err(|e| eprintln!("accept failed = {:?}", e)) +//! .for_each(|sock| { +//! // Split up the reading and writing parts of the +//! // socket. +//! let (reader, writer) = sock.split(); +//! +//! // A future that echos the data and returns how +//! // many bytes were copied... +//! let bytes_copied = copy(reader, writer); +//! +//! // ... after which we'll print what happened. +//! let handle_conn = bytes_copied.map(|amt| { +//! println!("wrote {:?} bytes", amt) +//! }).map_err(|err| { +//! eprintln!("IO error {:?}", err) +//! }); +//! +//! // Spawn the future as a concurrent task. +//! tokio::spawn(handle_conn) +//! }); +//! +//! // Start the Tokio runtime +//! tokio::run(server); +//! } +//! ``` + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate mio; +extern crate tokio_current_thread; +extern crate tokio_io; +extern crate tokio_executor; +extern crate tokio_codec; +extern crate tokio_fs; +extern crate tokio_reactor; +extern crate tokio_threadpool; +extern crate tokio_timer; +extern crate tokio_tcp; +extern crate tokio_udp; + +#[cfg(feature = "async-await-preview")] +extern crate tokio_async_await; + +#[cfg(unix)] +extern crate tokio_uds; + +pub mod clock; +pub mod codec; +pub mod executor; +pub mod fs; +pub mod io; +pub mod net; +pub mod prelude; +pub mod reactor; +pub mod runtime; +pub mod timer; +pub mod util; + +pub use executor::spawn; +pub use runtime::run; + +// ===== Experimental async/await support ===== + +#[cfg(feature = "async-await-preview")] +mod async_await; + +#[cfg(feature = "async-await-preview")] +pub use async_await::{run_async, spawn_async}; + +#[cfg(feature = "async-await-preview")] +pub use tokio_async_await::await; diff --git a/third_party/rust/tokio-0.1.11/src/net.rs b/third_party/rust/tokio-0.1.11/src/net.rs new file mode 100644 index 0000000000..79810b6a73 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/net.rs @@ -0,0 +1,85 @@ +//! TCP/UDP/Unix bindings for `tokio`. +//! +//! This module contains the TCP/UDP/Unix networking types, similar to the standard +//! library, which can be used to implement networking protocols. +//! +//! # Organization +//! +//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP +//! * [`UdpSocket`] and [`UdpFramed`] provide functionality for communication over UDP +//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a +//! Unix Domain Socket **(available on Unix only)** +//! +//! [`TcpListener`]: struct.TcpListener.html +//! [`TcpStream`]: struct.TcpStream.html +//! [`UdpSocket`]: struct.UdpSocket.html +//! [`UdpFramed`]: struct.UdpFramed.html +//! [`UnixListener`]: struct.UnixListener.html +//! [`UnixStream`]: struct.UnixStream.html + +pub mod tcp { + //! TCP bindings for `tokio`. + //! + //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s + //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` + //! implements a future which returns a `TcpStream`. + //! + //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s + //! [`incoming`][incoming_method] method can be used to accept new connections. + //! It return the [`Incoming`] struct, which implements a stream which returns + //! `TcpStream`s. + //! + //! [`TcpStream`]: struct.TcpStream.html + //! [`connect`]: struct.TcpStream.html#method.connect + //! [`ConnectFuture`]: struct.ConnectFuture.html + //! [`TcpListener`]: struct.TcpListener.html + //! [incoming_method]: struct.TcpListener.html#method.incoming + //! [`Incoming`]: struct.Incoming.html + pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; +} +pub use self::tcp::{TcpListener, TcpStream}; + +#[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")] +#[doc(hidden)] +pub type ConnectFuture = self::tcp::ConnectFuture; +#[deprecated(note = "use `tokio::net::tcp::Incoming` instead")] +#[doc(hidden)] +pub type Incoming = self::tcp::Incoming; + +pub mod udp { + //! UDP bindings for `tokio`. + //! + //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. + //! Reading and writing to it can be done using futures, which return the + //! [`RecvDgram`] and [`SendDgram`] structs respectively. + //! + //! For convenience it's also possible to convert raw datagrams into higher-level + //! frames. + //! + //! [`UdpSocket`]: struct.UdpSocket.html + //! [`RecvDgram`]: struct.RecvDgram.html + //! [`SendDgram`]: struct.SendDgram.html + //! [`UdpFramed`]: struct.UdpFramed.html + //! [`framed`]: struct.UdpSocket.html#method.framed + pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket}; +} +pub use self::udp::{UdpFramed, UdpSocket}; + +#[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")] +#[doc(hidden)] +pub type RecvDgram<T> = self::udp::RecvDgram<T>; +#[deprecated(note = "use `tokio::net::udp::SendDgram` instead")] +#[doc(hidden)] +pub type SendDgram<T> = self::udp::SendDgram<T>; + +#[cfg(unix)] +pub mod unix { + //! Unix domain socket bindings for `tokio` (only available on unix systems). + + pub use tokio_uds::{ + ConnectFuture, Incoming, RecvDgram, SendDgram, UCred, UnixDatagram, UnixListener, + UnixStream, + }; +} +#[cfg(unix)] +pub use self::unix::{UnixListener, UnixStream}; diff --git a/third_party/rust/tokio-0.1.11/src/prelude.rs b/third_party/rust/tokio-0.1.11/src/prelude.rs new file mode 100644 index 0000000000..ecd82fe40c --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/prelude.rs @@ -0,0 +1,54 @@ +//! A "prelude" for users of the `tokio` crate. +//! +//! This prelude is similar to the standard library's prelude in that you'll +//! almost always want to import its entire contents, but unlike the standard +//! library's prelude you'll have to do so manually: +//! +//! ``` +//! use tokio::prelude::*; +//! ``` +//! +//! The prelude may grow over time as additional items see ubiquitous use. + +pub use tokio_io::{ + AsyncRead, + AsyncWrite, +}; + +pub use util::{ + FutureExt, + StreamExt, +}; + +pub use ::std::io::{ + Read, + Write, +}; + +pub use futures::{ + Future, + future, + Stream, + stream, + Sink, + IntoFuture, + Async, + AsyncSink, + Poll, + task, +}; + +#[cfg(feature = "async-await-preview")] +#[doc(inline)] +pub use tokio_async_await::{ + io::{ + AsyncReadExt, + AsyncWriteExt, + }, + sink::{ + SinkExt, + }, + stream::{ + StreamExt as StreamAsyncExt, + }, +}; diff --git a/third_party/rust/tokio-0.1.11/src/reactor/mod.rs b/third_party/rust/tokio-0.1.11/src/reactor/mod.rs new file mode 100644 index 0000000000..a7263fd83c --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/reactor/mod.rs @@ -0,0 +1,149 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ```rust +//! # extern crate tokio; +//! # fn dox() { +//! use tokio::prelude::*; +//! use tokio::net::TcpStream; +//! +//! let addr = "93.184.216.34:9243".parse().unwrap(); +//! +//! let connect_future = TcpStream::connect(&addr); +//! +//! let task = connect_future +//! .and_then(|socket| { +//! println!("successfully connected"); +//! Ok(()) +//! }) +//! .map_err(|e| println!("failed to connect; err={:?}", e)); +//! +//! tokio::run(task); +//! # } +//! # fn main() {} +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Lazy registration +//! +//! Notice how the snippet above does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with a reactor, +//! but no reactor is specified. This works because the registration process +//! mentioned above is actually lazy. It doesn't *actually* happen in the +//! [`connect`] function. Instead, the registration is established the first +//! time that the task is polled (again, see [runtime model]). +//! +//! A reactor instance is automatically made available when using the Tokio +//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor +//! sets a thread-local variable referencing the associated [`Reactor`] instance +//! and [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## Implementation +//! +//! The reactor implementation uses [`mio`] to interface with the operating +//! system's event queue. A call to [`Reactor::poll`] results in a single +//! call to [`Poll::poll`] which in turn results in a single call to the +//! operating system's selector. +//! +//! The reactor maintains state for each registered I/O resource. This tracks +//! the executor task to notify when events are provided by the operating +//! system's selector. This state is stored in a `Sync` data structure and +//! referenced by [`Registration`]. When the [`Registration`] instance is +//! dropped, this state is cleaned up. Because the state is stored in a `Sync` +//! data structure, the [`Registration`] instance is able to be moved to other +//! threads. +//! +//! By default, a runtime's default reactor runs on a background thread. This +//! ensures that application code cannot significantly impact the reactor's +//! responsiveness. +//! +//! ## Integrating with the reactor +//! +//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that +//! automatically integrate with the reactor. However, library authors or +//! applications may wish to implement their own resources that are also backed +//! by the reactor. +//! +//! There are a couple of ways to do this. +//! +//! If the custom I/O resource implements [`mio::Evented`] and implements +//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the +//! most suited. +//! +//! Otherwise, [`Registration`] can be used directly. This provides the lowest +//! level primitive needed for integrating with the reactor: a stream of +//! readiness events. +//! +//! [`Reactor`]: struct.Reactor.html +//! [`Registration`]: struct.Registration.html +//! [runtime model]: https://tokio.rs/docs/getting-started/runtime-model/ +//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html +//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx +//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect +//! [`connect`]: ../net/struct.TcpStream.html#method.connect +//! [connect-future]: ../net/struct.ConnectFuture.html +//! [`tokio::run`]: ../runtime/fn.run.html +//! [`TcpStream`]: ../net/struct.TcpStream.html +//! [runtime]: ../runtime +//! [`Handle::current`]: struct.Handle.html#method.current +//! [`mio`]: https://github.com/carllerche/mio +//! [`Reactor::poll`]: struct.Reactor.html#method.poll +//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll +//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +//! [`PollEvented`]: struct.PollEvented.html +//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +pub use tokio_reactor::{ + Reactor, + Handle, + Background, + Turn, + Registration, + PollEvented as PollEvented2, +}; + +mod poll_evented; +#[allow(deprecated)] +pub use self::poll_evented::PollEvented; diff --git a/third_party/rust/tokio-0.1.11/src/reactor/poll_evented.rs b/third_party/rust/tokio-0.1.11/src/reactor/poll_evented.rs new file mode 100644 index 0000000000..d5f6750b6b --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/reactor/poll_evented.rs @@ -0,0 +1,539 @@ +//! Readiness tracking streams, backing I/O objects. +//! +//! This module contains the core type which is used to back all I/O on object +//! in `tokio-core`. The `PollEvented` type is the implementation detail of +//! all I/O. Each `PollEvented` manages registration with a reactor, +//! acquisition of a token, and tracking of the readiness state on the +//! underlying I/O primitive. + +#![allow(deprecated, warnings)] + +use std::fmt; +use std::io::{self, Read, Write}; +use std::sync::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +use futures::{task, Async, Poll}; +use mio::event::Evented; +use mio::Ready; +use tokio_io::{AsyncRead, AsyncWrite}; + +use reactor::{Handle, Registration}; + +#[deprecated(since = "0.1.2", note = "PollEvented2 instead")] +#[doc(hidden)] +pub struct PollEvented<E> { + io: E, + inner: Inner, + handle: Handle, +} + +struct Inner { + registration: Mutex<Registration>, + + /// Currently visible read readiness + read_readiness: AtomicUsize, + + /// Currently visible write readiness + write_readiness: AtomicUsize, +} + +impl<E: fmt::Debug> fmt::Debug for PollEvented<E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PollEvented") + .field("io", &self.io) + .finish() + } +} + +impl<E> PollEvented<E> { + /// Creates a new readiness stream associated with the provided + /// `loop_handle` and for the given `source`. + pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> + where E: Evented, + { + let registration = Registration::new(); + registration.register(&io)?; + + Ok(PollEvented { + io: io, + inner: Inner { + registration: Mutex::new(registration), + read_readiness: AtomicUsize::new(0), + write_readiness: AtomicUsize::new(0), + }, + handle: handle.clone(), + }) + } + + /// Tests to see if this source is ready to be read from or not. + /// + /// If this stream is not ready for a read then `Async::NotReady` will be + /// returned and the current task will be scheduled to receive a + /// notification when the stream is readable again. In other words, this + /// method is only safe to call from within the context of a future's task, + /// typically done in a `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::readable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_read(&mut self) -> Async<()> { + if self.poll_read2().is_ready() { + return ().into(); + } + + Async::NotReady + } + + fn poll_read2(&self) -> Async<Ready> { + let r = self.inner.registration.lock().unwrap(); + + // Load the cached readiness + match self.inner.read_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = r.take_read_ready().unwrap() { + n |= ready2usize(ready); + self.inner.read_readiness.store(n, Relaxed); + } + + return usize2ready(n).into(); + } + } + + let ready = match r.poll_read_ready().unwrap() { + Async::Ready(r) => r, + _ => return Async::NotReady, + }; + + // Cache the value + self.inner.read_readiness.store(ready2usize(ready), Relaxed); + + ready.into() + } + + /// Tests to see if this source is ready to be written to or not. + /// + /// If this stream is not ready for a write then `Async::NotReady` will be returned + /// and the current task will be scheduled to receive a notification when + /// the stream is writable again. In other words, this method is only safe + /// to call from within the context of a future's task, typically done in a + /// `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::writable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_write(&mut self) -> Async<()> { + let r = self.inner.registration.lock().unwrap(); + + match self.inner.write_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = r.take_write_ready().unwrap() { + n |= ready2usize(ready); + self.inner.write_readiness.store(n, Relaxed); + } + + return ().into(); + } + } + + let ready = match r.poll_write_ready().unwrap() { + Async::Ready(r) => r, + _ => return Async::NotReady, + }; + + // Cache the value + self.inner.write_readiness.store(ready2usize(ready), Relaxed); + + ().into() + } + + /// Test to see whether this source fulfills any condition listed in `mask` + /// provided. + /// + /// The `mask` given here is a mio `Ready` set of possible events. This can + /// contain any events like read/write but also platform-specific events + /// such as hup and error. The `mask` indicates events that are interested + /// in being ready. + /// + /// If any event in `mask` is ready then it is returned through + /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty + /// and contains all events that are currently ready in the `mask` provided. + /// + /// If no events are ready in the `mask` provided then the current task is + /// scheduled to receive a notification when any of them become ready. If + /// the `writable` event is contained within `mask` then this + /// `PollEvented`'s `write` task will be blocked and otherwise the `read` + /// task will be blocked. This is generally only relevant if you're working + /// with this `PollEvented` object on multiple tasks. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> { + let mut ret = Ready::empty(); + + if mask.is_empty() { + return ret.into(); + } + + if mask.is_writable() { + if self.poll_write().is_ready() { + ret = Ready::writable(); + } + } + + let mask = mask - Ready::writable(); + + if !mask.is_empty() { + if let Async::Ready(v) = self.poll_read2() { + ret |= v & mask; + } + } + + if ret.is_empty() { + if mask.is_writable() { + let _ = self.need_write(); + } + + if mask.is_readable() { + let _ = self.need_read(); + } + + Async::NotReady + } else { + ret.into() + } + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer readable, but it needs to be. + /// + /// This function, like `poll_read`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// readable, typically because a "would block" error was seen. + /// + /// *All* readiness bits associated with this stream except the writable bit + /// will be reset when this method is called. The current task is then + /// scheduled to receive a notification whenever anything changes other than + /// the writable bit. Note that this typically just means the readable bit + /// is used here, but if you're using a custom I/O object for events like + /// hup/error this may also be relevant. + /// + /// Note that it is also only valid to call this method if `poll_read` + /// previously indicated that the object is readable. That is, this function + /// must always be paired with calls to `poll_read` previously. + /// + /// # Errors + /// + /// This function will return an error if the `Reactor` that this `PollEvented` + /// is associated with has gone away (been destroyed). The error means that + /// the ambient futures task could not be scheduled to receive a + /// notification and typically means that the error should be propagated + /// outwards. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_read(&mut self) -> io::Result<()> { + self.inner.read_readiness.store(0, Relaxed); + + if self.poll_read().is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer writable, but it needs to be. + /// + /// This function, like `poll_write`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// writable, typically because a "would block" error was seen. + /// + /// The flag indicating that this stream is writable is unset and the + /// current task is scheduled to receive a notification when the stream is + /// then again writable. + /// + /// Note that it is also only valid to call this method if `poll_write` + /// previously indicated that the object is writable. That is, this function + /// must always be paired with calls to `poll_write` previously. + /// + /// # Errors + /// + /// This function will return an error if the `Reactor` that this `PollEvented` + /// is associated with has gone away (been destroyed). The error means that + /// the ambient futures task could not be scheduled to receive a + /// notification and typically means that the error should be propagated + /// outwards. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_write(&mut self) -> io::Result<()> { + self.inner.write_readiness.store(0, Relaxed); + + if self.poll_write().is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Returns a reference to the event loop handle that this readiness stream + /// is associated with. + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Returns a shared reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_ref(&self) -> &E { + &self.io + } + + /// Returns a mutable reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_mut(&mut self) -> &mut E { + &mut self.io + } + + /// Consumes the `PollEvented` and returns the underlying I/O object + pub fn into_inner(self) -> E { + self.io + } + + /// Deregisters this source of events from the reactor core specified. + /// + /// This method can optionally be called to unregister the underlying I/O + /// object with the event loop that the `handle` provided points to. + /// Typically this method is not required as this automatically happens when + /// `E` is dropped, but for some use cases the `E` object doesn't represent + /// an owned reference, so dropping it won't automatically unregister with + /// the event loop. + /// + /// This consumes `self` as it will no longer provide events after the + /// method is called, and will likely return an error if this `PollEvented` + /// was created on a separate event loop from the `handle` specified. + pub fn deregister(&self) -> io::Result<()> + where E: Evented, + { + self.inner.registration.lock().unwrap() + .deregister(&self.io) + } +} + +impl<E: Read> Read for PollEvented<E> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_read() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().read(buf); + + if is_wouldblock(&r) { + self.need_read()?; + } + + return r + } +} + +impl<E: Write> Write for PollEvented<E> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().write(buf); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().flush(); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } +} + +impl<E: Read> AsyncRead for PollEvented<E> { +} + +impl<E: Write> AsyncWrite for PollEvented<E> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +fn is_wouldblock<T>(r: &io::Result<T>) -> bool { + match *r { + Ok(_) => false, + Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + } +} + +const READ: usize = 1 << 0; +const WRITE: usize = 1 << 1; + +fn ready2usize(ready: Ready) -> usize { + let mut bits = 0; + if ready.is_readable() { + bits |= READ; + } + if ready.is_writable() { + bits |= WRITE; + } + bits | platform::ready2usize(ready) +} + +fn usize2ready(bits: usize) -> Ready { + let mut ready = Ready::empty(); + if bits & READ != 0 { + ready.insert(Ready::readable()); + } + if bits & WRITE != 0 { + ready.insert(Ready::writable()); + } + ready | platform::usize2ready(bits) +} + +#[cfg(unix)] +mod platform { + use mio::Ready; + use mio::unix::UnixReady; + + const HUP: usize = 1 << 2; + const ERROR: usize = 1 << 3; + const AIO: usize = 1 << 4; + const LIO: usize = 1 << 5; + + #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] + fn is_aio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_aio() + } + + #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] + fn is_aio(_ready: &Ready) -> bool { + false + } + + #[cfg(target_os = "freebsd")] + fn is_lio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_lio() + } + + #[cfg(not(target_os = "freebsd"))] + fn is_lio(_ready: &Ready) -> bool { + false + } + + pub fn ready2usize(ready: Ready) -> usize { + let ready = UnixReady::from(ready); + let mut bits = 0; + if is_aio(&ready) { + bits |= AIO; + } + if is_lio(&ready) { + bits |= LIO; + } + if ready.is_error() { + bits |= ERROR; + } + if ready.is_hup() { + bits |= HUP; + } + bits + } + + #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios", + target_os = "macos"))] + fn usize2ready_aio(ready: &mut UnixReady) { + ready.insert(UnixReady::aio()); + } + + #[cfg(not(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos")))] + fn usize2ready_aio(_ready: &mut UnixReady) { + // aio not available here → empty + } + + #[cfg(target_os = "freebsd")] + fn usize2ready_lio(ready: &mut UnixReady) { + ready.insert(UnixReady::lio()); + } + + #[cfg(not(target_os = "freebsd"))] + fn usize2ready_lio(_ready: &mut UnixReady) { + // lio not available here → empty + } + + pub fn usize2ready(bits: usize) -> Ready { + let mut ready = UnixReady::from(Ready::empty()); + if bits & AIO != 0 { + usize2ready_aio(&mut ready); + } + if bits & LIO != 0 { + usize2ready_lio(&mut ready); + } + if bits & HUP != 0 { + ready.insert(UnixReady::hup()); + } + if bits & ERROR != 0 { + ready.insert(UnixReady::error()); + } + ready.into() + } +} + +#[cfg(windows)] +mod platform { + use mio::Ready; + + pub fn all() -> Ready { + // No platform-specific Readinesses for Windows + Ready::empty() + } + + pub fn hup() -> Ready { + Ready::empty() + } + + pub fn ready2usize(_r: Ready) -> usize { + 0 + } + + pub fn usize2ready(_r: usize) -> Ready { + Ready::empty() + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/builder.rs b/third_party/rust/tokio-0.1.11/src/runtime/builder.rs new file mode 100644 index 0000000000..43eb5ddee1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/builder.rs @@ -0,0 +1,261 @@ +use runtime::{Inner, Runtime}; + +use reactor::Reactor; + +use std::io; + +use tokio_reactor; +use tokio_threadpool::Builder as ThreadPoolBuilder; +use tokio_threadpool::park::DefaultPark; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; + +/// Builds Tokio Runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// # extern crate tokio; +/// # extern crate tokio_threadpool; +/// # use tokio::runtime::Builder; +/// +/// # pub fn main() { +/// // create and configure ThreadPool +/// let mut threadpool_builder = tokio_threadpool::Builder::new(); +/// threadpool_builder +/// .name_prefix("my-runtime-worker-") +/// .pool_size(4); +/// +/// // build Runtime +/// let runtime = Builder::new() +/// .threadpool_builder(threadpool_builder) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// Thread pool specific builder + threadpool_builder: ThreadPoolBuilder, + + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + let mut threadpool_builder = ThreadPoolBuilder::new(); + threadpool_builder.name_prefix("tokio-runtime-worker-"); + + Builder { + threadpool_builder, + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Set builder to set up the thread pool instance. + #[deprecated( + since="0.1.9", + note="use the `core_threads`, `blocking_threads`, `name_prefix`, \ + and `stack_size` functions on `runtime::Builder`, instead")] + #[doc(hidden)] + pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self { + self.threadpool_builder = val; + self + } + + /// Set the maximum number of worker threads for the `Runtime`'s thread pool. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .core_threads(4) + /// .build() + /// .unwrap(); + /// # } + /// ``` + pub fn core_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.pool_size(val); + self + } + + /// Set the maximum number of concurrent blocking sections in the `Runtime`'s + /// thread pool. + /// + /// When the maximum concurrent `blocking` calls is reached, any further + /// calls to `blocking` will return `NotReady` and the task is notified once + /// previously in-flight calls to `blocking` return. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is 100. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .blocking_threads(200) + /// .build(); + /// # } + /// ``` + pub fn blocking_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.max_blocking(val); + self + } + + /// Set name prefix of threads spawned by the `Runtime`'s thread pool. + /// + /// Thread name prefix is used for generating thread names. For example, if + /// prefix is `my-pool-`, then threads in the pool will get names like + /// `my-pool-1` etc. + /// + /// The default prefix is "tokio-runtime-worker-". + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .name_prefix("my-pool-") + /// .build(); + /// # } + /// ``` + pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { + self.threadpool_builder.name_prefix(val); + self + } + + /// Set the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .stack_size(32 * 1024) + /// .build(); + /// # } + /// ``` + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.stack_size(val); + self + } + + /// Create the configured `Runtime`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::runtime::Builder; + /// # pub fn main() { + /// let runtime = Builder::new().build().unwrap(); + /// // ... call runtime.run(...) + /// # let _ = runtime; + /// # } + /// ``` + pub fn build(&mut self) -> io::Result<Runtime> { + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + // Get a handle to the clock for the runtime. + let clock1 = self.clock.clone(); + let clock2 = clock1.clone(); + + let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new())); + let t1 = timers.clone(); + + // Spawn a reactor on a background thread. + let reactor = Reactor::new()?.background()?; + + // Get a handle to the reactor. + let reactor_handle = reactor.handle().clone(); + + let pool = self.threadpool_builder + .around_worker(move |w, enter| { + let timer_handle = t1.lock().unwrap() + .get(w.id()).unwrap() + .clone(); + + tokio_reactor::with_default(&reactor_handle, enter, |enter| { + clock::with_default(&clock1, enter, |enter| { + timer::with_default(&timer_handle, enter, |_| { + w.run(); + }); + }) + }); + }) + .custom_park(move |worker_id| { + // Create a new timer + let timer = Timer::new_with_now(DefaultPark::new(), clock2.clone()); + + timers.lock().unwrap() + .insert(worker_id.clone(), timer.handle()); + + timer + }) + .build(); + + Ok(Runtime { + inner: Some(Inner { + reactor, + pool, + }), + }) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs new file mode 100644 index 0000000000..72960fadf2 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs @@ -0,0 +1,88 @@ +use executor::current_thread::CurrentThread; +use runtime::current_thread::Runtime; + +use tokio_reactor::Reactor; +use tokio_timer::clock::Clock; +use tokio_timer::timer::Timer; + +use std::io; + +/// Builds a Single-threaded runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// extern crate tokio; +/// extern crate tokio_timer; +/// +/// use tokio::runtime::current_thread::Builder; +/// use tokio_timer::clock::Clock; +/// +/// # pub fn main() { +/// // build Runtime +/// let runtime = Builder::new() +/// .clock(Clock::new()) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + Builder { + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Create the configured `Runtime`. + pub fn build(&mut self) -> io::Result<Runtime> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new_with_now(reactor, self.clock.clone()); + let timer_handle = timer.handle(); + + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor to generate some new stimuli for the + // futures to continue in their life. + let executor = CurrentThread::new_with_park(timer); + + let runtime = Runtime::new2( + reactor_handle, + timer_handle, + self.clock.clone(), + executor); + + Ok(runtime) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs new file mode 100644 index 0000000000..dca41711e8 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs @@ -0,0 +1,92 @@ +//! A runtime implementation that runs everything on the current thread. +//! +//! [`current_thread::Runtime`][rt] is similar to the primary +//! [`Runtime`][concurrent-rt] except that it runs all components on the current +//! thread instead of using a thread pool. This means that it is able to spawn +//! futures that do not implement `Send`. +//! +//! Same as the default [`Runtime`][concurrent-rt], the +//! [`current_thread::Runtime`][rt] includes: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself +//! and cannot be safely moved to other threads. +//! +//! # Spawning from other threads +//! +//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot +//! safely be moved to other threads, it provides a `Handle` that can be sent +//! to other threads and allows to spawn new tasks from there. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! use std::thread; +//! +//! # fn main() { +//! let mut runtime = Runtime::new().unwrap(); +//! let handle = runtime.handle(); +//! +//! thread::spawn(move || { +//! handle.spawn(future::ok(())); +//! }).join().unwrap(); +//! +//! # /* +//! runtime.run().unwrap(); +//! # */ +//! # } +//! ``` +//! +//! # Examples +//! +//! Creating a new `Runtime` and running a future `f` until its completion and +//! returning its result. +//! +//! ``` +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! +//! let mut runtime = Runtime::new().unwrap(); +//! +//! // Use the runtime... +//! // runtime.block_on(f); // where f is a future +//! ``` +//! +//! [rt]: struct.Runtime.html +//! [concurrent-rt]: ../struct.Runtime.html +//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html +//! [reactor]: ../../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../../timer/index.html + +mod builder; +mod runtime; + +pub use self::builder::Builder; +pub use self::runtime::{Runtime, Handle}; +pub use tokio_current_thread::spawn; +pub use tokio_current_thread::TaskExecutor; + +use futures::Future; + +/// Run the provided future to completion using a runtime running on the current thread. +/// +/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future, +/// which blocks the current thread until the provided future completes. It then calls +/// [`Runtime::run`] to wait for any other spawned futures to resolve. +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where + F: Future, +{ + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + let v = r.block_on(future)?; + r.run().expect("failed to resolve remaining futures"); + Ok(v) +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs new file mode 100644 index 0000000000..262cb1e72d --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs @@ -0,0 +1,234 @@ +use tokio_current_thread::{self as current_thread, CurrentThread}; +use tokio_current_thread::Handle as ExecutorHandle; +use runtime::current_thread::Builder; + +use tokio_reactor::{self, Reactor}; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; +use tokio_executor; + +use futures::{future, Future}; + +use std::fmt; +use std::error::Error; +use std::io; + +/// Single-threaded runtime provides a way to start reactor +/// and executor on the current thread. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +#[derive(Debug)] +pub struct Runtime { + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>, +} + +/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance +#[derive(Debug, Clone)] +pub struct Handle(ExecutorHandle); + +impl Handle { + /// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError> + where F: Future<Item = (), Error = ()> + Send + 'static { + self.0.spawn(future) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), tokio_executor::SpawnError> { + self.0.status() + } +} + +impl<T> future::Executor<T> for Handle +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = self.status() { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + inner: current_thread::RunError, +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.inner) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + self.inner.description() + } + fn cause(&self) -> Option<&Error> { + self.inner.cause() + } +} + +impl Runtime { + /// Returns a new runtime initialized with default configuration values. + pub fn new() -> io::Result<Runtime> { + Builder::new().build() + } + + pub(super) fn new2( + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>) -> Runtime + { + Runtime { + reactor_handle, + timer_handle, + clock, + executor, + } + } + + /// Get a new handle to spawn futures on the single-threaded Tokio runtime + /// + /// Different to the runtime itself, the handle can be sent to different + /// threads. + pub fn handle(&self) -> Handle { + Handle(self.executor.handle().clone()) + } + + /// Spawn a future onto the single-threaded Tokio runtime. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::current_thread::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("running on the runtime"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + 'static, + { + self.executor.spawn(future); + self + } + + /// Runs the provided future, blocking the current thread until the future + /// completes. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. Once the function returns, any uncompleted futures + /// remain pending in the `Runtime` instance. These futures will not run + /// until `block_on` or `run` is called again. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution by calling `block_on` or `run`. + pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error> + where F: Future + { + self.enter(|executor| { + // Run the provided future + let ret = executor.block_on(f); + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + }) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.enter(|executor| executor.run()) + .map_err(|e| RunError { + inner: e, + }) + } + + fn enter<F, R>(&mut self, f: F) -> R + where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R + { + let Runtime { + ref reactor_handle, + ref timer_handle, + ref clock, + ref mut executor, + .. + } = *self; + + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + + // This will set the default handle and timer to use inside the closure + // and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + clock::with_default(clock, enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the + // current single-threaded executor when used. This is a trick, + // because we need two mutable references to the executor (one + // to run the provided future, another to install as the default + // one). We use the fake one here as the default one. + let mut default_executor = current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + f(&mut executor) + }) + }) + }) + }) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/mod.rs b/third_party/rust/tokio-0.1.11/src/runtime/mod.rs new file mode 100644 index 0000000000..9ff0cc4c2f --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/mod.rs @@ -0,0 +1,496 @@ +//! A batteries included runtime for applications using Tokio. +//! +//! Applications using Tokio require some runtime support in order to work: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! While it is possible to setup each component manually, this involves a bunch +//! of boilerplate. +//! +//! [`Runtime`] bundles all of these various runtime components into a single +//! handle that can be started and shutdown together, eliminating the necessary +//! boilerplate to run a Tokio application. +//! +//! Most applications wont need to use [`Runtime`] directly. Instead, they will +//! use the [`run`] function, which uses [`Runtime`] under the hood. +//! +//! Creating a [`Runtime`] does the following: +//! +//! * Spawn a background thread running a [`Reactor`] instance. +//! * Start a [`ThreadPool`] for executing futures. +//! * Run an instance of [`Timer`] **per** thread pool worker thread. +//! +//! The thread pool uses a work-stealing strategy and is configured to start a +//! worker thread for each CPU core available on the system. This tends to be +//! the ideal setup for Tokio applications. +//! +//! A timer per thread pool worker thread is used to minimize the amount of +//! synchronization that is required for working with the timer. +//! +//! # Usage +//! +//! Most applications will use the [`run`] function. This takes a future to +//! "seed" the application, blocking the thread until the runtime becomes +//! [idle]. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! tokio::run(server); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! In this function, the `run` function blocks until the runtime becomes idle. +//! See [`shutdown_on_idle`][idle] for more shutdown details. +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::runtime::Runtime; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! // Create the runtime +//! let mut rt = Runtime::new().unwrap(); +//! +//! // Spawn the server task +//! rt.spawn(server); +//! +//! // Wait until the runtime becomes idle and shut it down. +//! rt.shutdown_on_idle() +//! .wait().unwrap(); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! [reactor]: ../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../timer/index.html +//! [`Runtime`]: struct.Runtime.html +//! [`Reactor`]: ../reactor/struct.Reactor.html +//! [`ThreadPool`]: ../executor/thread_pool/struct.ThreadPool.html +//! [`run`]: fn.run.html +//! [idle]: struct.Runtime.html#method.shutdown_on_idle +//! [`tokio::spawn`]: ../executor/fn.spawn.html +//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html + +mod builder; +pub mod current_thread; +mod shutdown; +mod task_executor; + +pub use self::builder::Builder; +pub use self::shutdown::Shutdown; +pub use self::task_executor::TaskExecutor; + +use reactor::{Background, Handle}; + +use std::io; + +use tokio_executor::enter; +use tokio_threadpool as threadpool; + +use futures; +use futures::future::Future; + +/// Handle to the Tokio runtime. +/// +/// The Tokio runtime includes a reactor as well as an executor for running +/// tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use [`tokio::run`], which uses a `Runtime` internally. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct.Builder.html +/// [`tokio::run`]: fn.run.html +#[derive(Debug)] +pub struct Runtime { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + /// Reactor running on a background thread. + reactor: Background, + + /// Task execution pool. + pool: threadpool::ThreadPool, +} + +// ===== impl Runtime ===== + +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// This function is used to bootstrap the execution of a Tokio application. It +/// does the following: +/// +/// * Start the Tokio runtime using a default configuration. +/// * Spawn the given future onto the thread pool. +/// * Block the current thread until the runtime shuts down. +/// +/// Note that the function will not return immediately once `future` has +/// completed. Instead it waits for the entire runtime to become idle. +/// +/// See the [module level][mod] documentation for more details. +/// +/// # Examples +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +/// +/// [mod]: ../index.html +pub fn run<F>(future: F) +where F: Future<Item = (), Error = ()> + Send + 'static, +{ + let mut runtime = Runtime::new().unwrap(); + runtime.spawn(future); + enter().expect("nested tokio::run") + .block_on(runtime.shutdown_on_idle()) + .unwrap(); +} + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a reactor, thread pool, and timer being initialized. The + /// thread pool will not spawn any worker threads until it needs to, i.e. + /// tasks are scheduled to run. + /// + /// Most users will not need to call this function directly, instead they + /// will use [`tokio::run`](fn.run.html). + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn new() -> io::Result<Self> { + Builder::new().build() + } + + #[deprecated(since = "0.1.5", note = "use `reactor` instead")] + #[doc(hidden)] + pub fn handle(&self) -> &Handle { + self.reactor() + } + + /// Return a reference to the reactor handle for this runtime instance. + /// + /// The returned handle reference can be cloned in order to get an owned + /// value of the handle. This handle can be used to initialize I/O resources + /// (like TCP or UDP sockets) that will not be used on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let reactor_handle = rt.reactor().clone(); + /// + /// // use `reactor_handle` + /// ``` + pub fn reactor(&self) -> &Handle { + self.inner().reactor.handle() + } + + /// Return a handle to the runtime's executor. + /// + /// The returned handle can be used to spawn tasks that run on this runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let executor_handle = rt.executor(); + /// + /// // use `executor_handle` + /// ``` + pub fn executor(&self) -> TaskExecutor { + let inner = self.inner().pool.sender().clone(); + TaskExecutor { inner } + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner_mut().pool.sender().spawn(future).unwrap(); + self + } + + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() + } + + /// Run a future to completion on the Tokio runtime, then wait for all + /// background futures to complete too. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, waiting for background futures to complete, and yielding + /// its resolved result. Any tasks or timers which the future spawns + /// internally will be executed on the runtime and waited for completion. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let res = self.block_on(future); + self.shutdown_on_idle().wait().unwrap(); + res + } + + /// Signals the runtime to shutdown once it becomes idle. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function can be used to perform a graceful shutdown of the runtime. + /// + /// The runtime enters an idle state once **all** of the following occur. + /// + /// * The thread pool has no tasks to execute, i.e., all tasks that were + /// spawned have completed. + /// * The reactor is not managing any I/O resources. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_on_idle() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + + let inner = Box::new({ + let pool = inner.pool; + let reactor = inner.reactor; + + pool.shutdown_on_idle().and_then(|_| { + reactor.shutdown_on_idle() + }) + }); + + Shutdown { inner } + } + + /// Signals the runtime to shutdown immediately. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function will forcibly shutdown the runtime, causing any + /// in-progress work to become canceled. The shutdown steps are: + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + Shutdown::shutdown_now(inner) + } + + fn inner(&self) -> &Inner { + self.inner.as_ref().unwrap() + } + + fn inner_mut(&mut self) -> &mut Inner { + self.inner.as_mut().unwrap() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let shutdown = Shutdown::shutdown_now(inner); + let _ = shutdown.wait(); + } + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs b/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs new file mode 100644 index 0000000000..1aca557277 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs @@ -0,0 +1,46 @@ +use runtime::Inner; + +use std::fmt; + +use futures::{Future, Poll}; + +/// A future that resolves when the Tokio `Runtime` is shut down. +pub struct Shutdown { + pub(super) inner: Box<Future<Item = (), Error = ()> + Send>, +} + +impl Shutdown { + pub(super) fn shutdown_now(inner: Inner) -> Self { + let inner = Box::new({ + let pool = inner.pool; + let reactor = inner.reactor; + + pool.shutdown_now().and_then(|_| { + reactor.shutdown_now() + .then(|_| { + Ok(()) + }) + }) + }); + + Shutdown { inner } + } +} + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + try_ready!(self.inner.poll()); + Ok(().into()) + } +} + +impl fmt::Debug for Shutdown { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Shutdown") + .field("inner", &"Box<Future<Item = (), Error = ()>>") + .finish() + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs b/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs new file mode 100644 index 0000000000..e213201ab0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs @@ -0,0 +1,75 @@ + +use tokio_threadpool::Sender; + +use futures::future::{self, Future}; + +/// Executes futures on the runtime +/// +/// All futures spawned using this executor will be submitted to the associated +/// Runtime's executor. This executor is usually a thread pool. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + pub(super) inner: Sender, +} + +impl TaskExecutor { + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// let executor = rt.executor(); + /// + /// // Spawn a future onto the runtime + /// executor.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&self, future: F) + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner.spawn(future).unwrap(); + } +} + +impl<T> future::Executor<T> for TaskExecutor +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + self.inner.execute(future) + } +} + +impl ::executor::Executor for TaskExecutor { + fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) + -> Result<(), ::executor::SpawnError> + { + self.inner.spawn(future) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/timer.rs b/third_party/rust/tokio-0.1.11/src/timer.rs new file mode 100644 index 0000000000..fc85a2a724 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/timer.rs @@ -0,0 +1,102 @@ +//! Utilities for tracking time. +//! +//! This module provides a number of types for executing code after a set period +//! of time. +//! +//! * [`Delay`][Delay] is a future that does no work and completes at a specific `Instant` +//! in time. +//! +//! * [`Interval`][Interval] is a stream yielding a value at a fixed period. It +//! is initialized with a `Duration` and repeatedly yields each time the +//! duration elapses. +//! +//! * [`Timeout`][Timeout]: Wraps a future or stream, setting an upper bound to the +//! amount of time it is allowed to execute. If the future or stream does not +//! complete in time, then it is canceled and an error is returned. +//! +//! * [`DelayQueue`]: A queue where items are returned once the requested delay +//! has expired. +//! +//! These types are sufficient for handling a large number of scenarios +//! involving time. +//! +//! These types must be used from within the context of the +//! [`Runtime`][runtime] or a timer context must be setup explicitly. See the +//! [`tokio-timer`][tokio-timer] crate for more details on how to setup a timer +//! context. +//! +//! # Examples +//! +//! Wait 100ms and print "Hello World!" +//! +//! ``` +//! use tokio::prelude::*; +//! use tokio::timer::Delay; +//! +//! use std::time::{Duration, Instant}; +//! +//! let when = Instant::now() + Duration::from_millis(100); +//! +//! tokio::run({ +//! Delay::new(when) +//! .map_err(|e| panic!("timer failed; err={:?}", e)) +//! .and_then(|_| { +//! println!("Hello world!"); +//! Ok(()) +//! }) +//! }) +//! ``` +//! +//! Require that an operation takes no more than 300ms. Note that this uses the +//! [`timeout`][ext] function on the [`FutureExt`][ext] trait. This trait is +//! included in the prelude. +//! +//! ``` +//! # extern crate futures; +//! # extern crate tokio; +//! use tokio::prelude::*; +//! +//! use std::time::{Duration, Instant}; +//! +//! fn long_op() -> Box<Future<Item = (), Error = ()> + Send> { +//! // ... +//! # Box::new(futures::future::ok(())) +//! } +//! +//! # fn main() { +//! tokio::run({ +//! long_op() +//! .timeout(Duration::from_millis(300)) +//! .map_err(|e| { +//! println!("operation timed out"); +//! }) +//! }) +//! # } +//! ``` +//! +//! [runtime]: ../runtime/struct.Runtime.html +//! [tokio-timer]: https://docs.rs/tokio-timer +//! [ext]: ../util/trait.FutureExt.html#method.timeout +//! [Timeout]: struct.Timeout.html +//! [Delay]: struct.Delay.html +//! [Interval]: struct.Interval.html +//! [`DelayQueue`]: struct.DelayQueue.html + +pub use tokio_timer::{ + delay_queue, + DelayQueue, + Error, + Interval, + Delay, + Timeout, + timeout, +}; + +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type Deadline<T> = ::tokio_timer::Deadline<T>; +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type DeadlineError<T> = ::tokio_timer::DeadlineError<T>; diff --git a/third_party/rust/tokio-0.1.11/src/util/future.rs b/third_party/rust/tokio-0.1.11/src/util/future.rs new file mode 100644 index 0000000000..92097ad9ea --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/util/future.rs @@ -0,0 +1,87 @@ +#[allow(deprecated)] +use tokio_timer::Deadline; +use tokio_timer::Timeout; + +use futures::Future; + +use std::time::{Instant, Duration}; + + +/// An extension trait for `Future` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there only is a [`timeout`] function, but this will increase +/// over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Future` already implement `FutureExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait FutureExt: Future { + + /// Creates a new future which allows `self` until `timeout`. + /// + /// This combinator creates a new future which wraps the receiving future + /// with a timeout. The returned future is allowed to execute until it + /// completes or `timeout` has elapsed, whichever happens first. + /// + /// If the future completes before `timeout` then the future will resolve + /// with that item. Otherwise the future will resolve to an error. + /// + /// The future is guaranteed to be polled at least once, even if `timeout` + /// is set to zero. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let future = long_future() + /// .timeout(Duration::from_secs(1)) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(future); + /// # } + /// ``` + fn timeout(self, timeout: Duration) -> Timeout<Self> + where Self: Sized, + { + Timeout::new(self, timeout) + } + + #[deprecated(since = "0.1.8", note = "use `timeout` instead")] + #[allow(deprecated)] + #[doc(hidden)] + fn deadline(self, deadline: Instant) -> Deadline<Self> + where Self: Sized, + { + Deadline::new(self, deadline) + } +} + +impl<T: ?Sized> FutureExt for T where T: Future {} + +#[cfg(test)] +mod test { + use super::*; + use prelude::future; + + #[test] + fn timeout_polls_at_least_once() { + let base_future = future::result::<(), ()>(Ok(())); + let timeouted_future = base_future.timeout(Duration::new(0, 0)); + assert!(timeouted_future.wait().is_ok()); + } +} diff --git a/third_party/rust/tokio-0.1.11/src/util/mod.rs b/third_party/rust/tokio-0.1.11/src/util/mod.rs new file mode 100644 index 0000000000..3ebd1fc708 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/util/mod.rs @@ -0,0 +1,14 @@ +//! Utilities for working with Tokio. +//! +//! This module contains utilities that are useful for working with Tokio. +//! Currently, this only includes [`FutureExt`] and [`StreamExt`], but this +//! may grow over time. +//! +//! [`FutureExt`]: trait.FutureExt.html +//! [`StreamExt`]: trait.StreamExt.html + +mod future; +mod stream; + +pub use self::future::FutureExt; +pub use self::stream::StreamExt; diff --git a/third_party/rust/tokio-0.1.11/src/util/stream.rs b/third_party/rust/tokio-0.1.11/src/util/stream.rs new file mode 100644 index 0000000000..ef268483c0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/util/stream.rs @@ -0,0 +1,62 @@ +use tokio_timer::Timeout; + +use futures::Stream; + +use std::time::Duration; + + +/// An extension trait for `Stream` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there only is a [`timeout`] function, but this will increase +/// over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Stream` already implement `StreamExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait StreamExt: Stream { + + /// Creates a new stream which allows `self` until `timeout`. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// with a timeout. For each item, the returned stream is allowed to execute + /// until it completes or `timeout` has elapsed, whichever happens first. + /// + /// If an item completes before `timeout` then the stream will yield + /// with that item. Otherwise the stream will yield to an error. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let stream = long_future() + /// .into_stream() + /// .timeout(Duration::from_secs(1)) + /// .for_each(|i| future::ok(println!("item = {:?}", i))) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(stream); + /// # } + /// ``` + fn timeout(self, timeout: Duration) -> Timeout<Self> + where Self: Sized, + { + Timeout::new(self, timeout) + } +} + +impl<T: ?Sized> StreamExt for T where T: Stream {} |