diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-0.1.22 | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.22')
61 files changed, 9156 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/.cargo-checksum.json b/third_party/rust/tokio-0.1.22/.cargo-checksum.json new file mode 100644 index 0000000000..0bae964ad4 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"c0759c1ddaa0ba0d81c04f97956a246a23960ad08d9468e81663ffd00f13b565","Cargo.toml":"3402764a216ca410ef5dc163be1599286495c8e8766c6d338bc93a17dd68e2b7","LICENSE":"898b1ae9821e98daf8964c8d6c7f61641f5f5aa78ad500020771c0939ee0dea1","README.md":"fa241309c9b098fba35393aa9d8d99b0bf3403a2e561df9d629d62af5ad16424","examples/README.md":"aa83075c8a8e34e7bb606074add40c935a56526377381a6795167f72c2c80e43","examples/blocking.rs":"ed9dd91131ab315e5442bc80ab8b320554ec8528a1559c740b19aa1331b0c1c8","examples/chat-combinator-current-thread.rs":"e1425874bf0136003983e58e7299558e5e7a08d8e4ef283aef938b613d54ab91","examples/chat-combinator.rs":"2111184ab8afa7199bcd3cee12707e8cbe739f9d3c3112c28c25738ea6ff1737","examples/chat.rs":"2bf09cac3c51ffb4c4b3bd7de1e9933c421b0711736cfb393c8a83613d9d9e93","examples/connect.rs":"ffc7d9e94c132116c9dac7421473753ff1eb1a3684ec3b608d71e286a91e9ecf","examples/echo-udp.rs":"1c072b7579a8b563eba1fc04d2e3b6b013d4ed1d2ef8ecafa3ef10941e6529b5","examples/echo.rs":"e085e3f5df2598c19ee8fa19ada2f8f62c3d1716982136e60a9b9ae31b7ff338","examples/hello_world.rs":"8dc4648d4ebf9cd746a610e2ed723e20a246bb8bb436d29e1fd20966ef84b330","examples/manual-runtime.rs":"5e398889040a1c080fe5c9edaa0d5bf7e43407602bb4101b8dea45af3d4d239f","examples/print_each_packet.rs":"1245d84afaa35f5aec355765fa05bb895dd3c6506395adeba2138d67ad88df32","examples/proxy.rs":"aa07c08c01ff6f6aa39d715572142c96a828a46067c679cd9eaf82d82d4703b5","examples/tinydb.rs":"b1b355498e14073aa0a128c9094e2b5c8f724d32cdf30ae866fbc10ccc688d0f","examples/tinyhttp.rs":"2889c5c7d18b8005bec751a7d969a9c0e9b2b523b3c122a283728eaf13c3e02d","examples/udp-client.rs":"d207c6b8b557156dc3c24c9212e33b91738a52a50cb677f03bf298cb34128a1b","examples/udp-codec.rs":"00f1acbcebb0ced0e23aa5d8fb653380397713bf183bfdb6c7ee179aa64e36b1","src/async_await.rs":"eb6ef54b49bab149061db228c1ec5ed30e3481ce5eab49ef9c210635c79e50ec","src/clock.rs":"ad45adb859163b40a51e566e0e53abff8682c8bbb7813fd106aa70ea55efe5f3","src/codec/length_delimited.rs":"6ad60b83a2e532d272a608f8f49f580520edcd3a8780c426882d10103b57d326","src/codec/mod.rs":"a71d45e80e91105d0e61ed58c905b057bc627b01062f4854d3cf80658414ba1c","src/executor/current_thread/mod.rs":"c6f183735fda8f2081e88960eaf10e7e9435fa7f9e3db118626f55366e22094f","src/executor/mod.rs":"8e620dbd10a508d5e53bd68e097623692092f775c61e1d89f4f01ff981d373d3","src/fs.rs":"78a013bd78076ebfc93dc6a90a3148188aaacab260893fc1f32e00cc883b4d6b","src/io.rs":"3161d7ecbe28650e122a688e84168c60e6d40f1af72fa7c9bdf9e3bec294e738","src/lib.rs":"7083fd76fdaec70683a9a2f1020d33a1bf8ef5ff36e0847f4a7612767b54a56f","src/net.rs":"f16936e7cfa5f4b138776f88d7d9f5f86d76c4d81f06ac2c97e9dd019679730a","src/prelude.rs":"8f3f437ff3dec6ef757e3b63c6d7b7314b213ff093e4b7d83e13256b1100f595","src/reactor/mod.rs":"a834f534ea320184e36be31a0ff4064036164390baea3ab04b7c1b3b13953e0e","src/reactor/poll_evented.rs":"09b314cd27b4699edb92637bd4f71430495fc09b4faf4b49d79e7771285f6031","src/runtime/current_thread/async_await.rs":"f917a68790ac7a0d05cf808d74799d77e16495fc1774efbe20b019059bd73954","src/runtime/current_thread/builder.rs":"30f1dae794e0db2b4263eb1e96b71c12c91fc08d2845cbb7cb540cd31a3b1612","src/runtime/current_thread/mod.rs":"b1ee6495fe1ae94b50b07bef625559aec91d63bf364fd934cd35c0ba71ae1061","src/runtime/current_thread/runtime.rs":"07858eae98638e9d4632a3e38e5edea6e464619c7bc728de62273323304a9456","src/runtime/mod.rs":"e2af2db2e2c6cfd15728f8b0ef2bc1ad5b3b6c3c128b0ce7e3cbb275480bc0e2","src/runtime/threadpool/async_await.rs":"dcce5b1549eee922fc5d161d9532b205ea4fc33b9afe421f8a2a302c8a8c5711","src/runtime/threadpool/builder.rs":"4d08e8757553e2ee4c693a5104c553e6c74d60abf3e2618488aca3b9767d0ba4","src/runtime/threadpool/mod.rs":"39cb09c1f8d2271126e4650d31c9584d659ddca678491ac763cecaf44ab9fce6","src/runtime/threadpool/shutdown.rs":"51ddd3de76afeff7aad793387994ca81afc65483e8f75585df53404ebad7a5d2","src/runtime/threadpool/task_executor.rs":"93fa327a9e9091b0c65cbd9bd50eab529da4881506930c1959cc04d31870e03f","src/sync.rs":"6f1d7b15c8ba6e0eb0d738aa966c7f024afda6ca947fea7ff58227d58f6f2146","src/timer.rs":"dd9a391129a13159a1e5639d2427b5686cb29d68d731b9eaac203483746c3dfd","src/util/enumerate.rs":"b0a34db5c0cec62dded09047f194de8f3f3617174608bc2fd029bbe8d6ee7d66","src/util/future.rs":"0171e9368656a04ee9d8bbc615b8fcfeefb87c269afd7dbd5f95713ba2670da9","src/util/mod.rs":"06c975f35382b153109eeac9784a5598461e973f039a321ba2d7f4f34b6924ec","src/util/stream.rs":"fc4ede3baf5ae91628ee0ac7fe0e237dc6339f761495284eb14233d67e963d0d","tests/buffered.rs":"af62f4217c2209e1e838c70ec6acc9490a9d3148fac64af7f8901277d459634d","tests/clock.rs":"094ce72403f6d893aa1e226b02fe73449165b300b89d9d3bb39e4d592ed75d92","tests/drop-core.rs":"ba4dfc09cb02a83d6f7f26850182144b24d07bd20ab9c9387d75ad342ed970b5","tests/enumerate.rs":"90ab4cb39d77c122b392dfa8e5808f3abc8eb52120d9ee96c9d2fa2d0fcf017d","tests/global.rs":"2dfe61ba78a5b6000464a22bd1728147d6c18cb674a702b747113b198023638c","tests/length_delimited.rs":"657387514e896a3ad8407437586e771acf24bcbe0abeba2e3977133b82ea76f3","tests/line-frames.rs":"6c4f46cd8b546a5497504e18588277c80cea1aa7b41730e2be98b165b30f4850","tests/pipe-hup.rs":"459e10492a53e4416868003e44e6c6b78d2eac3e1945d714ed7216a2382707f1","tests/reactor.rs":"7082d976eb7191c6353e7481477009966bea528911aa01e70deff048604f41bd","tests/runtime.rs":"df85e78b6a6e267450123b39e8e953b44d79a8b33925abcebcc90efef1ca27f8","tests/timer.rs":"0b5c7a4e3ead8e66eec8afdcba36e09eed4f1cbaf5ce5fde8867d1da6be9a1db"},"package":"5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6"}
\ No newline at end of file diff --git a/third_party/rust/tokio-0.1.22/CHANGELOG.md b/third_party/rust/tokio-0.1.22/CHANGELOG.md new file mode 100644 index 0000000000..bf25ccdab7 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/CHANGELOG.md @@ -0,0 +1,148 @@ +This changelog only applies to the `tokio` crate proper. Each sub crate +maintains its own changelog tracking changes made in each respective sub crate. + +# 0.1.22 (June 2, 2019) + +### Changed +- Moved from `tokio-trace-core` to `tracing-core` (#1223). + +# 0.1.21 (May 30, 2019) + +### Changed +- Bump `tokio-trace-core` version to 0.2 (#1111). + +# 0.1.20 (May 14, 2019) + +### Added +- `tokio::runtime::Builder::panic_handler` allows configuring handling + panics on the runtime (#1055). + +# 0.1.19 (April 22, 2019) + +### Added +- Re-export `tokio::sync::Mutex` primitive (#964). + +# 0.1.18 (March 22, 2019) + +### Added +- `TypedExecutor` re-export and implementations (#993). + +# 0.1.17 (March 13, 2019) + +### Added +- Propagate trace subscriber in the runtime (#966). + +# 0.1.16 (March 1, 2019) + +### Fixed +- async-await: track latest nightly changes (#940). + +### Added +- `sync::Watch`, a single value broadcast channel (#922). +- Async equivalent of read / write file helpers being added to `std` (#896). + +# 0.1.15 (January 24, 2019) + +### Added +- Re-export tokio-sync APIs (#839). +- Stream enumerate combinator (#832). + +# 0.1.14 (January 6, 2019) + +* Use feature flags to break up the crate, allowing users to pick & choose + components (#808). +* Export `UnixDatagram` and `UnixDatagramFramed` (#772). + +# 0.1.13 (November 21, 2018) + +* Fix `Runtime::reactor()` when no tasks are spawned (#721). +* `runtime::Builder` no longer uses deprecated methods (#749). +* Provide `after_start` and `before_stop` configuration settings for + `Runtime` (#756). +* Implement throttle stream combinator (#736). + +# 0.1.12 (October 23, 2018) + +* runtime: expose `keep_alive` on runtime builder (#676). +* runtime: create a reactor per worker thread (#660). +* codec: fix panic in `LengthDelimitedCodec` (#682). +* io: re-export `tokio_io::io::read` function (#689). +* runtime: check for executor re-entry in more places (#708). + +# 0.1.11 (September 28, 2018) + +* Fix `tokio-async-await` dependency (#675). + +# 0.1.10 (September 27, 2018) + +* Fix minimal versions + +# 0.1.9 (September 27, 2018) + +* Experimental async/await improvements (#661). +* Re-export `TaskExecutor` from `tokio-current-thread` (#652). +* Improve `Runtime` builder API (#645). +* `tokio::run` panics when called from the context of an executor + (#646). +* Introduce `StreamExt` with a `timeout` helper (#573). +* Move `length_delimited` into `tokio` (#575). +* Re-organize `tokio::net` module (#548). +* Re-export `tokio-current-thread::spawn` in current_thread runtime + (#579). + +# 0.1.8 (August 23, 2018) + +* Extract tokio::executor::current_thread to a sub crate (#370) +* Add `Runtime::block_on` (#398) +* Add `runtime::current_thread::block_on_all` (#477) +* Misc documentation improvements (#450) +* Implement `std::error::Error` for error types (#501) + +# 0.1.7 (June 6, 2018) + +* Add `Runtime::block_on` for concurrent runtime (#391). +* Provide handle to `current_thread::Runtime` that allows spawning tasks from + other threads (#340). +* Provide `clock::now()`, a configurable source of time (#381). + +# 0.1.6 (May 2, 2018) + +* Add asynchronous filesystem APIs (#323). +* Add "current thread" runtime variant (#308). +* `CurrentThread`: Expose inner `Park` instance. +* Improve fairness of `CurrentThread` executor (#313). + +# 0.1.5 (March 30, 2018) + +* Provide timer API (#266) + +# 0.1.4 (March 22, 2018) + +* Fix build on FreeBSD (#218) +* Shutdown the Runtime when the handle is dropped (#214) +* Set Runtime thread name prefix for worker threads (#232) +* Add builder for Runtime (#234) +* Extract TCP and UDP types into separate crates (#224) +* Optionally support futures 0.2. + +# 0.1.3 (March 09, 2018) + +* Fix `CurrentThread::turn` to block on idle (#212). + +# 0.1.2 (March 09, 2018) + +* Introduce Tokio Runtime (#141) +* Provide `CurrentThread` for more flexible usage of current thread executor (#141). +* Add Lio for platforms that support it (#142). +* I/O resources now lazily bind to the reactor (#160). +* Extract Reactor to dedicated crate (#169) +* Add facade to sub crates and add prelude (#166). +* Switch TCP/UDP fns to poll_ -> Poll<...> style (#175) + +# 0.1.1 (February 09, 2018) + +* Doc fixes + +# 0.1.0 (February 07, 2018) + +* Initial crate released based on [RFC](https://github.com/tokio-rs/tokio-rfcs/pull/3). diff --git a/third_party/rust/tokio-0.1.22/Cargo.toml b/third_party/rust/tokio-0.1.22/Cargo.toml new file mode 100644 index 0000000000..e89f08a706 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/Cargo.toml @@ -0,0 +1,137 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio" +version = "0.1.22" +authors = ["Carl Lerche <me@carllerche.com>"] +description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio/0.1.21/tokio/" +readme = "README.md" +keywords = ["io", "async", "non-blocking", "futures"] +categories = ["asynchronous", "network-programming"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.bytes] +version = "0.4" +optional = true + +[dependencies.futures] +version = "0.1.20" + +[dependencies.mio] +version = "0.6.14" +optional = true + +[dependencies.num_cpus] +version = "1.8.0" +optional = true + +[dependencies.tokio-codec] +version = "0.1.0" +optional = true + +[dependencies.tokio-current-thread] +version = "0.1.6" +optional = true + +[dependencies.tokio-executor] +version = "0.1.7" +optional = true + +[dependencies.tokio-fs] +version = "0.1.6" +optional = true + +[dependencies.tokio-io] +version = "0.1.6" +optional = true + +[dependencies.tokio-reactor] +version = "0.1.1" +optional = true + +[dependencies.tokio-sync] +version = "0.1.5" +optional = true + +[dependencies.tokio-tcp] +version = "0.1.0" +optional = true + +[dependencies.tokio-threadpool] +version = "0.1.14" +optional = true + +[dependencies.tokio-timer] +version = "0.2.8" +optional = true + +[dependencies.tokio-udp] +version = "0.1.0" +optional = true + +[dependencies.tracing-core] +version = "0.1" +optional = true +[dev-dependencies.env_logger] +version = "0.5" +default-features = false + +[dev-dependencies.flate2] +version = "1" +features = ["tokio"] + +[dev-dependencies.futures-cpupool] +version = "0.1" + +[dev-dependencies.http] +version = "0.1" + +[dev-dependencies.httparse] +version = "1.0" + +[dev-dependencies.libc] +version = "0.2" + +[dev-dependencies.num_cpus] +version = "1.0" + +[dev-dependencies.serde] +version = "1.0" + +[dev-dependencies.serde_derive] +version = "1.0" + +[dev-dependencies.serde_json] +version = "1.0" + +[dev-dependencies.time] +version = "0.1" + +[features] +codec = ["io", "tokio-codec"] +default = ["codec", "fs", "io", "reactor", "rt-full", "sync", "tcp", "timer", "udp", "uds"] +experimental-tracing = ["tracing-core"] +fs = ["tokio-fs"] +io = ["bytes", "tokio-io"] +reactor = ["io", "mio", "tokio-reactor"] +rt-full = ["num_cpus", "reactor", "timer", "tokio-current-thread", "tokio-executor", "tokio-threadpool"] +sync = ["tokio-sync"] +tcp = ["tokio-tcp"] +timer = ["tokio-timer"] +udp = ["tokio-udp"] +uds = ["tokio-uds"] +[target."cfg(unix)".dependencies.tokio-uds] +version = "0.2.1" +optional = true diff --git a/third_party/rust/tokio-0.1.22/LICENSE b/third_party/rust/tokio-0.1.22/LICENSE new file mode 100644 index 0000000000..cdb28b4b56 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2019 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-0.1.22/README.md b/third_party/rust/tokio-0.1.22/README.md new file mode 100644 index 0000000000..73f871da2d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/README.md @@ -0,0 +1,125 @@ +# Tokio + +A runtime for writing reliable, asynchronous, and slim applications with +the Rust programming language. It is: + +* **Fast**: Tokio's zero-cost abstractions give you bare-metal + performance. + +* **Reliable**: Tokio leverages Rust's ownership, type system, and + concurrency model to reduce bugs and ensure thread safety. + +* **Scalable**: Tokio has a minimal footprint, and handles backpressure + and cancellation naturally. + +[![Crates.io][crates-badge]][crates-url] +[![MIT licensed][mit-badge]][mit-url] +[![Build Status][azure-badge]][azure-url] +[![Gitter chat][gitter-badge]][gitter-url] + +[crates-badge]: https://img.shields.io/crates/v/tokio.svg +[crates-url]: https://crates.io/crates/tokio +[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg +[mit-url]: LICENSE-MIT +[azure-badge]: https://dev.azure.com/tokio-rs/Tokio/_apis/build/status/tokio-rs.tokio?branchName=master +[azure-url]: https://dev.azure.com/tokio-rs/Tokio/_build/latest?definitionId=1&branchName=master +[gitter-badge]: https://img.shields.io/gitter/room/tokio-rs/tokio.svg +[gitter-url]: https://gitter.im/tokio-rs/tokio + +[Website](https://tokio.rs) | +[Guides](https://tokio.rs/docs/getting-started/hello-world/) | +[API Docs](https://docs.rs/tokio/0.1.22/tokio) | +[Chat](https://gitter.im/tokio-rs/tokio) + +## Overview + +Tokio is an event-driven, non-blocking I/O platform for writing +asynchronous applications with the Rust programming language. At a high +level, it provides a few major components: + +* A multithreaded, work-stealing based task [scheduler]. +* A [reactor] backed by the operating system's event queue (epoll, kqueue, + IOCP, etc...). +* Asynchronous [TCP and UDP][net] sockets. + +These components provide the runtime components necessary for building +an asynchronous application. + +[net]: https://docs.rs/tokio/0.1.22/tokio/net/index.html +[reactor]: https://docs.rs/tokio/0.1.22/tokio/reactor/index.html +[scheduler]: https://docs.rs/tokio/0.1.22/tokio/runtime/index.html + +## Example + +A basic TCP echo server with Tokio: + +```rust +extern crate tokio; + +use tokio::prelude::*; +use tokio::io::copy; +use tokio::net::TcpListener; + +fn main() { + // Bind the server's socket. + let addr = "127.0.0.1:12345".parse().unwrap(); + let listener = TcpListener::bind(&addr) + .expect("unable to bind TCP listener"); + + // Pull out a stream of sockets for incoming connections + let server = listener.incoming() + .map_err(|e| eprintln!("accept failed = {:?}", e)) + .for_each(|sock| { + // Split up the reading and writing parts of the + // socket. + let (reader, writer) = sock.split(); + + // A future that echos the data and returns how + // many bytes were copied... + let bytes_copied = copy(reader, writer); + + // ... after which we'll print what happened. + let handle_conn = bytes_copied.map(|amt| { + println!("wrote {:?} bytes", amt) + }).map_err(|err| { + eprintln!("IO error {:?}", err) + }); + + // Spawn the future as a concurrent task. + tokio::spawn(handle_conn) + }); + + // Start the Tokio runtime + tokio::run(server); +} +``` + +More examples can be found [here](examples). + +## Getting Help + +First, see if the answer to your question can be found in the [Guides] or the +[API documentation]. If the answer is not there, there is an active community in +the [Tokio Gitter channel][chat]. We would be happy to try to answer your +question. Last, if that doesn't work, try opening an [issue] with the question. + +[chat]: https://gitter.im/tokio-rs/tokio +[issue]: https://github.com/tokio-rs/tokio/issues/new + +## Supported Rust Versions + +Tokio is built against the latest stable, nightly, and beta Rust releases. The +minimum version supported is the stable release from three months before the +current stable release version. For example, if the latest stable Rust is 1.29, +the minimum version supported is 1.26. The current Tokio version is not +guaranteed to build on Rust versions earlier than the minimum supported version. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-0.1.22/examples/README.md b/third_party/rust/tokio-0.1.22/examples/README.md new file mode 100644 index 0000000000..ac9e9b42ff --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/README.md @@ -0,0 +1,62 @@ +## Examples of how to use Tokio + +This directory contains a number of examples showcasing various capabilities of +the `tokio` crate. + +All examples can be executed with: + +``` +cargo run --example $name +``` + +A high level description of each example is: + +* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to + all connected clients and then terminates the connection, should help see how + to create and initialize `tokio`. + +* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts + connections and then echos back any contents that are read from each connected + client. + +* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP + listener, accept connections in a loop, and put down in the stdout everything + that's read off of each TCP connection. + +* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP + instead of TCP. This will echo back any packets received to the original + sender. + +* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to + interact with most other examples. The program creates a TCP connection or UDP + socket and sends all information read on stdin to the remote peer, displaying + any data received on stdout. Often quite useful when interacting with the + various other servers here! + +* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from + any connected client to all other connected clients. You can connect to this + in multiple terminals and use it to chat between the terminals. + +* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a + much more functional programming approach using combinators. + +* [`proxy`](proxy.rs) - an example proxy server that will forward all connected + TCP clients to the remote address specified when starting the program. + +* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP + request bodies showcasing running on multiple cores, working with futures and + spawning tasks, and finally framing a TCP connection to discrete + request/response objects. + +* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state + between all connected clients, notably the key/value store of this database. + +* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example. + +* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime. + +* [`blocking`](blocking.rs) - perform heavy computation in blocking environment. + +If you've got an example you'd like to see here, please feel free to open an +issue. Otherwise if you've got an example you'd like to add, please feel free +to make a PR! diff --git a/third_party/rust/tokio-0.1.22/examples/blocking.rs b/third_party/rust/tokio-0.1.22/examples/blocking.rs new file mode 100644 index 0000000000..e7d5da6c80 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/blocking.rs @@ -0,0 +1,87 @@ +//! An example of using blocking funcion annotation. +//! +//! This example will create 8 "heavy computation" blocking futures and 8 +//! non-blocking futures with 4 threads core threads in runtime. +//! Each non-blocking future will print it's id and return immideatly. +//! Each blocking future will print it's id on start, sleep for 1000 ms, print +//! it's id and return. +//! +//! Note how non-blocking threads are executed before blocking threads finish +//! their task. + +extern crate tokio; +extern crate tokio_threadpool; + +use std::thread; +use std::time::Duration; +use tokio::prelude::*; +use tokio::runtime::Builder; +use tokio_threadpool::blocking; + +/// This future blocks it's poll method for 1000 ms. +struct BlockingFuture { + value: i32, +} + +impl Future for BlockingFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + println!("Blocking begin: {}!", self.value); + // Try replacing this part with commnted code + blocking(|| { + println!("Blocking part annotated: {}!", self.value); + thread::sleep(Duration::from_millis(1000)); + println!("Blocking done annotated: {}!", self.value); + }) + .map_err(|err| panic!("Error in blocing block: {:?}", err)) + // println!("Blocking part annotated: {}!", self.value); + // thread::sleep(Duration::from_millis(1000)); + // println!("Blocking done annotated: {}!", self.value); + // Ok(Async::Ready(())) + } +} + +/// This future returns immideatly. +struct NonBlockingFuture { + value: i32, +} + +impl Future for NonBlockingFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + println!("Non-blocking done: {}!", self.value); + Ok(Async::Ready(())) + } +} + +/// This future spawns child futures. +struct SpawningFuture; + +impl Future for SpawningFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + for i in 0..8 { + let blocking_future = BlockingFuture { value: i }; + + tokio::spawn(blocking_future); + } + for i in 0..8 { + let non_blocking_future = NonBlockingFuture { value: i }; + tokio::spawn(non_blocking_future); + } + Ok(Async::Ready(())) + } +} + +fn main() { + let spawning_future = SpawningFuture; + + let runtime = Builder::new().core_threads(4).build().unwrap(); + runtime.block_on_all(spawning_future).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs b/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs new file mode 100644 index 0000000000..ee147025d2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat-combinator-current-thread.rs @@ -0,0 +1,172 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a line-based server which accepts connections, reads lines from +//! those connections, and broadcasts the lines to all other connected clients. +//! +//! This example is similar to chat.rs, but uses combinators and a much more +//! functional style. +//! +//! Because we are here running the reactor/executor on the same thread instead +//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use +//! Rc + RefCell instead. The max performance is however limited to a CPU HW +//! thread. +//! +//! You can test this out by running: +//! +//! cargo run --example chat-combinator-current-thread +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate futures; +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio::runtime::current_thread::{Runtime, TaskExecutor}; + +use std::cell::RefCell; +use std::collections::HashMap; +use std::env; +use std::io::BufReader; +use std::iter; +use std::rc::Rc; + +fn main() -> Result<(), Box<std::error::Error>> { + let mut runtime = Runtime::new().unwrap(); + + // Create the TCP listener we'll accept connections on. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse()?; + + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // This is running on the Tokio current_thread runtime, so it will be single- + // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks. + let connections = Rc::new(RefCell::new(HashMap::new())); + + // The server task asynchronously iterates over and processes each incoming + // connection. + let srv = socket + .incoming() + .map_err(|e| { + println!("failed to accept socket; error = {:?}", e); + e + }) + .for_each(move |stream| { + // The client's socket address + let addr = stream.peer_addr()?; + + println!("New Connection: {}", addr); + + // Split the TcpStream into two separate handles. One handle for reading + // and one handle for writing. This lets us use separate tasks for + // reading and writing. + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + let mut conns = connections.borrow_mut(); + conns.insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); + + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); + + // Move the connection state into the closure below. + let connections = connections_inner.clone(); + + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.borrow_mut(); + + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()) + .unwrap(); + } + + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf<TcpStream>`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + + // Spawn locally a task to process the connection + TaskExecutor::current() + .spawn_local(Box::new(connection.then(move |_| { + let mut conns = connections.borrow_mut(); + conns.remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + }))) + .unwrap(); + + Ok(()) + }) + .map_err(|err| println!("error occurred: {:?}", err)); + + // Spawn srv itself + runtime.spawn(srv); + + // Execute server + runtime.run().unwrap(); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs b/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs new file mode 100644 index 0000000000..b81e8f7c35 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat-combinator.rs @@ -0,0 +1,156 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a line-based server which accepts connections, reads lines from +//! those connections, and broadcasts the lines to all other connected clients. +//! +//! This example is similar to chat.rs, but uses combinators and a much more +//! functional style. +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate futures; +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +use std::collections::HashMap; +use std::env; +use std::io::BufReader; +use std::iter; +use std::sync::{Arc, Mutex}; + +fn main() -> Result<(), Box<std::error::Error>> { + // Create the TCP listener we'll accept connections on. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse()?; + + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // This is running on the Tokio runtime, so it will be multi-threaded. The + // `Arc<Mutex<...>>` allows state to be shared across the threads. + let connections = Arc::new(Mutex::new(HashMap::new())); + + // The server task asynchronously iterates over and processes each incoming + // connection. + let srv = socket + .incoming() + .map_err(|e| { + println!("failed to accept socket; error = {:?}", e); + e + }) + .for_each(move |stream| { + // The client's socket address + let addr = stream.peer_addr()?; + + println!("New Connection: {}", addr); + + // Split the TcpStream into two separate handles. One handle for reading + // and one handle for writing. This lets us use separate tasks for + // reading and writing. + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + connections.lock().unwrap().insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); + + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); + + // Move the connection state into the closure below. + let connections = connections_inner.clone(); + + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.lock().unwrap(); + + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()) + .unwrap(); + } + + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf<TcpStream>`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + + // Spawn a task to process the connection + tokio::spawn(connection.then(move |_| { + connections.lock().unwrap().remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + })); + + Ok(()) + }) + .map_err(|err| println!("error occurred: {:?}", err)); + + // execute server + tokio::run(srv); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/chat.rs b/third_party/rust/tokio-0.1.22/examples/chat.rs new file mode 100644 index 0000000000..b21432afa2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat.rs @@ -0,0 +1,473 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines sent by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! +//! You can run the `telnet` command in any number of additional windows. +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate tokio; +#[macro_use] +extern crate futures; +extern crate bytes; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::future::{self, Either}; +use futures::sync::mpsc; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender<Bytes>; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver<Bytes>; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap<SocketAddr, Tx>, +} + +/// The state for each connected client. +struct Peer { + /// Name of the peer. + /// + /// When a client connects, the first line sent is treated as the client's + /// name (like alice or bob). The name is used to preface all messages that + /// arrive from the client so that we can simulate a real chat server: + /// + /// ```text + /// alice: Hello everyone. + /// bob: Welcome to telnet chat! + /// ``` + name: BytesMut, + + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Lines, + + /// Handle to the shared chat state. + /// + /// This is used to broadcast messages read off the socket to all connected + /// peers. + state: Arc<Mutex<Shared>>, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, + + /// Client socket address. + /// + /// The socket address is used as the key in the `peers` HashMap. The + /// address is saved so that the `Peer` drop implementation can clean up its + /// entry. + addr: SocketAddr, +} + +/// Line based codec +/// +/// This decorates a socket and presents a line based read / write interface. +/// +/// As a user of `Lines`, we can focus on working at the line level. So, we send +/// and receive values that represent entire lines. The `Lines` codec will +/// handle the encoding and decoding as well as reading from and writing to the +/// socket. +#[derive(Debug)] +struct Lines { + /// The TCP socket. + socket: TcpStream, + + /// Buffer used when reading from the socket. Data is not returned from this + /// buffer until an entire line has been read. + rd: BytesMut, + + /// Buffer used to stage data before writing it to the socket. + wr: BytesMut, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } +} + +impl Peer { + /// Create a new instance of `Peer`. + fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer { + // Get the client socket address + let addr = lines.socket.peer_addr().unwrap(); + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded(); + + // Add an entry for this `Peer` in the shared state map. + state.lock().unwrap().peers.insert(addr, tx); + + Peer { + name, + lines, + state, + rx, + addr, + } + } +} + +/// This is where a connected client is managed. +/// +/// A `Peer` is also a future representing completely processing the client. +/// +/// When a `Peer` is created, the first line (representing the client's name) +/// has already been read. When the socket closes, the `Peer` future completes. +/// +/// While processing, the peer future implementation will: +/// +/// 1) Receive messages on its message channel and write them to the socket. +/// 2) Receive messages from the socket and broadcast them to all peers. +/// +impl Future for Peer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + // Tokio (and futures) use cooperative scheduling without any + // preemption. If a task never yields execution back to the executor, + // then other tasks may be starved. + // + // To deal with this, robust applications should not have any unbounded + // loops. In this example, we will read at most `LINES_PER_TICK` lines + // from the client on each tick. + // + // If the limit is hit, the current task is notified, informing the + // executor to schedule the task again asap. + const LINES_PER_TICK: usize = 10; + + // Receive all messages from peers. + for i in 0..LINES_PER_TICK { + // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is + // safe. + match self.rx.poll().unwrap() { + Async::Ready(Some(v)) => { + // Buffer the line. Once all lines are buffered, they will + // be flushed to the socket (right below). + self.lines.buffer(&v); + + // If this is the last iteration, the loop will break even + // though there could still be lines to read. Because we did + // not reach `Async::NotReady`, we have to notify ourselves + // in order to tell the executor to schedule the task again. + if i + 1 == LINES_PER_TICK { + task::current().notify(); + } + } + _ => break, + } + } + + // Flush the write buffer to the socket + let _ = self.lines.poll_flush()?; + + // Read new lines from the socket + while let Async::Ready(line) = self.lines.poll()? { + println!("Received line ({:?}) : {:?}", self.name, line); + + if let Some(message) = line { + // Append the peer's name to the front of the line: + let mut line = self.name.clone(); + line.extend_from_slice(b": "); + line.extend_from_slice(&message); + line.extend_from_slice(b"\r\n"); + + // We're using `Bytes`, which allows zero-copy clones (by + // storing the data in an Arc internally). + // + // However, before cloning, we must freeze the data. This + // converts it from mutable -> immutable, allowing zero copy + // cloning. + let line = line.freeze(); + + // Now, send the line to all other peers + for (addr, tx) in &self.state.lock().unwrap().peers { + // Don't send the message to ourselves + if *addr != self.addr { + // The send only fails if the rx half has been dropped, + // however this is impossible as the `tx` half will be + // removed from the map before the `rx` is dropped. + tx.unbounded_send(line.clone()).unwrap(); + } + } + } else { + // EOF was reached. The remote client has disconnected. There is + // nothing more to do. + return Ok(Async::Ready(())); + } + } + + // As always, it is important to not just return `NotReady` without + // ensuring an inner future also returned `NotReady`. + // + // We know we got a `NotReady` from either `self.rx` or `self.lines`, so + // the contract is respected. + Ok(Async::NotReady) + } +} + +impl Drop for Peer { + fn drop(&mut self) { + self.state.lock().unwrap().peers.remove(&self.addr); + } +} + +impl Lines { + /// Create a new `Lines` codec backed by the socket + fn new(socket: TcpStream) -> Self { + Lines { + socket, + rd: BytesMut::new(), + wr: BytesMut::new(), + } + } + + /// Buffer a line. + /// + /// This writes the line to an internal buffer. Calls to `poll_flush` will + /// attempt to flush this buffer to the socket. + fn buffer(&mut self, line: &[u8]) { + // Ensure the buffer has capacity. Ideally this would not be unbounded, + // but to keep the example simple, we will not limit this. + self.wr.reserve(line.len()); + + // Push the line onto the end of the write buffer. + // + // The `put` function is from the `BufMut` trait. + self.wr.put(line); + } + + /// Flush the write buffer to the socket + fn poll_flush(&mut self) -> Poll<(), io::Error> { + // As long as there is buffered data to write, try to write it. + while !self.wr.is_empty() { + // Try to write some bytes to the socket + let n = try_ready!(self.socket.poll_write(&self.wr)); + + // As long as the wr is not empty, a successful write should + // never write 0 bytes. + assert!(n > 0); + + // This discards the first `n` bytes of the buffer. + let _ = self.wr.split_to(n); + } + + Ok(Async::Ready(())) + } + + /// Read data from the socket. + /// + /// This only returns `Ready` when the socket has closed. + fn fill_read_buf(&mut self) -> Poll<(), io::Error> { + loop { + // Ensure the read buffer has capacity. + // + // This might result in an internal allocation. + self.rd.reserve(1024); + + // Read data into the buffer. + let n = try_ready!(self.socket.read_buf(&mut self.rd)); + + if n == 0 { + return Ok(Async::Ready(())); + } + } + } +} + +impl Stream for Lines { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // First, read any new data that might have been received off the socket + let sock_closed = self.fill_read_buf()?.is_ready(); + + // Now, try finding lines + let pos = self + .rd + .windows(2) + .enumerate() + .find(|&(_, bytes)| bytes == b"\r\n") + .map(|(i, _)| i); + + if let Some(pos) = pos { + // Remove the line from the read buffer and set it to `line`. + let mut line = self.rd.split_to(pos + 2); + + // Drop the trailing \r\n + line.split_off(pos); + + // Return the line + return Ok(Async::Ready(Some(line))); + } + + if sock_closed { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +/// Spawn a task to manage the socket. +/// +/// This will read the first line from the socket to identify the client, then +/// add the client to the set of connected peers in the chat service. +fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { + // Wrap the socket with the `Lines` codec that we wrote above. + // + // By doing this, we can operate at the line level instead of doing raw byte + // manipulation. + let lines = Lines::new(socket); + + // The first line is treated as the client's name. The client is not added + // to the set of connected peers until this line is received. + // + // We use the `into_future` combinator to extract the first item from the + // lines stream. `into_future` takes a `Stream` and converts it to a future + // of `(first, rest)` where `rest` is the original stream instance. + let connection = lines + .into_future() + // `into_future` doesn't have the right error type, so map the error to + // make it work. + .map_err(|(e, _)| e) + // Process the first received line as the client's name. + .and_then(|(name, lines)| { + // If `name` is `None`, then the client disconnected without + // actually sending a line of data. + // + // Since the connection is closed, there is no further work that we + // need to do. So, we just terminate processing by returning + // `future::ok()`. + // + // The problem is that only a single future type can be returned + // from a combinator closure, but we want to return both + // `future::ok()` and `Peer` (below). + // + // This is a common problem, so the `futures` crate solves this by + // providing the `Either` helper enum that allows creating a single + // return type that covers two concrete future types. + let name = match name { + Some(name) => name, + None => { + // The remote client closed the connection without sending + // any data. + return Either::A(future::ok(())); + } + }; + + println!("`{:?}` is joining the chat", name); + + // Create the peer. + // + // This is also a future that processes the connection, only + // completing when the socket closes. + let peer = Peer::new(name, state, lines); + + // Wrap `peer` with `Either::B` to make the return type fit. + Either::B(peer) + }) + // Task futures have an error of type `()`, this ensures we handle the + // error. We do this by printing the error to STDOUT. + .map_err(|e| { + println!("connection error = {:?}", e); + }); + + // Spawn the task. Internally, this submits the task to a thread pool. + tokio::spawn(connection); +} + +pub fn main() -> Result<(), Box<std::error::Error>> { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Arc::new(Mutex::new(Shared::new())); + + let addr = "127.0.0.1:6142".parse()?; + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr)?; + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener + .incoming() + .for_each(move |socket| { + // Spawn a task to process the connection + process(socket, state.clone()); + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); + + println!("server running on localhost:6142"); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + // + // In our example, we have not defined a shutdown strategy, so this will + // block until `ctrl-c` is pressed at the terminal. + tokio::run(server); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/connect.rs b/third_party/rust/tokio-0.1.22/examples/connect.rs new file mode 100644 index 0000000000..4dc0ea31e2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/connect.rs @@ -0,0 +1,257 @@ +//! An example of hooking up stdin/stdout to either a TCP or UDP stream. +//! +//! This example will connect to a socket address specified in the argument list +//! and then forward all data read on stdin to the server, printing out all data +//! received on stdout. An optional `--udp` argument can be passed to specify +//! that the connection should be made over UDP instead of TCP, translating each +//! line entered on stdin to a UDP packet to be sent to the remote address. +//! +//! Note that this is not currently optimized for performance, especially +//! around buffer management. Rather it's intended to show an example of +//! working with a client. +//! +//! This example can be quite useful when interacting with the other examples in +//! this repository! Many of them recommend running this as a simple "hook up +//! stdin/stdout to a server" to get up and running. + +#![deny(warnings)] + +extern crate bytes; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::env; +use std::io::{self, Read, Write}; +use std::net::SocketAddr; +use std::thread; + +use futures::sync::mpsc; +use tokio::prelude::*; + +fn main() -> Result<(), Box<std::error::Error>> { + // Determine if we're going to run in TCP or UDP mode + let mut args = env::args().skip(1).collect::<Vec<_>>(); + let tcp = match args.iter().position(|a| a == "--udp") { + Some(i) => { + args.remove(i); + false + } + None => true, + }; + + // Parse what address we're going to connect to + let addr = match args.first() { + Some(addr) => addr, + None => Err("this program requires at least one argument")?, + }; + let addr = addr.parse::<SocketAddr>()?; + + // Right now Tokio doesn't support a handle to stdin running on the event + // loop, so we farm out that work to a separate thread. This thread will + // read data (with blocking I/O) from stdin and then send it to the event + // loop over a standard futures channel. + let (stdin_tx, stdin_rx) = mpsc::channel(0); + thread::spawn(|| read_stdin(stdin_tx)); + let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx")); + + // Now that we've got our stdin read we either set up our TCP connection or + // our UDP connection to get a stream of bytes we're going to emit to + // stdout. + let stdout = if tcp { + tcp::connect(&addr, Box::new(stdin_rx))? + } else { + udp::connect(&addr, Box::new(stdin_rx))? + }; + + // And now with our stream of bytes to write to stdout, we execute that in + // the event loop! Note that this is doing blocking I/O to emit data to + // stdout, and in general it's a no-no to do that sort of work on the event + // loop. In this case, though, we know it's ok as the event loop isn't + // otherwise running anything useful. + let mut out = io::stdout(); + + tokio::run({ + stdout + .for_each(move |chunk| out.write_all(&chunk)) + .map_err(|e| println!("error reading stdout; error = {:?}", e)) + }); + Ok(()) +} + +mod codec { + use bytes::{BufMut, BytesMut}; + use std::io; + use tokio::codec::{Decoder, Encoder}; + + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP/UDP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + pub struct Bytes; + + impl Decoder for Bytes { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } + } + + impl Encoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } + } +} + +mod tcp { + use tokio; + use tokio::codec::Decoder; + use tokio::net::TcpStream; + use tokio::prelude::*; + + use bytes::BytesMut; + use codec::Bytes; + + use std::error::Error; + use std::io; + use std::net::SocketAddr; + + pub fn connect( + addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, + ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> { + let tcp = TcpStream::connect(addr); + + // After the TCP connection has been established, we set up our client + // to start forwarding data. + // + // First we use the `Io::framed` method with a simple implementation of + // a `Codec` (listed below) that just ships bytes around. We then split + // that in two to work with the stream and sink separately. + // + // Half of the work we're going to do is to take all data we receive on + // `stdin` and send that along the TCP stream (`sink`). The second half + // is to take all the data we receive (`stream`) and then write that to + // stdout. We'll be passing this handle back out from this method. + // + // You'll also note that we *spawn* the work to read stdin and write it + // to the TCP stream. This is done to ensure that happens concurrently + // with us reading data from the stream. + let stream = Box::new( + tcp.map(move |stream| { + let (sink, stream) = Bytes.framed(stream).split(); + + tokio::spawn(stdin.forward(sink).then(|result| { + if let Err(e) = result { + println!("failed to write to socket: {}", e) + } + Ok(()) + })); + + stream + }) + .flatten_stream(), + ); + Ok(stream) + } +} + +mod udp { + use std::error::Error; + use std::io; + use std::net::SocketAddr; + + use bytes::BytesMut; + use tokio; + use tokio::net::{UdpFramed, UdpSocket}; + use tokio::prelude::*; + + use codec::Bytes; + + pub fn connect( + &addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, + ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> { + // We'll bind our UDP socket to a local IP/port, but for now we + // basically let the OS pick both of those. + let addr_to_bind = if addr.ip().is_ipv4() { + "0.0.0.0:0".parse()? + } else { + "[::]:0".parse()? + }; + let udp = match UdpSocket::bind(&addr_to_bind) { + Ok(udp) => udp, + Err(_) => Err("failed to bind socket")?, + }; + + // Like above with TCP we use an instance of `Bytes` codec to transform + // this UDP socket into a framed sink/stream which operates over + // discrete values. In this case we're working with *pairs* of socket + // addresses and byte buffers. + let (sink, stream) = UdpFramed::new(udp, Bytes).split(); + + // All bytes from `stdin` will go to the `addr` specified in our + // argument list. Like with TCP this is spawned concurrently + let forward_stdin = stdin + .map(move |chunk| (chunk, addr)) + .forward(sink) + .then(|result| { + if let Err(e) = result { + println!("failed to write to socket: {}", e) + } + Ok(()) + }); + + // With UDP we could receive data from any source, so filter out + // anything coming from a different address + let receive = stream.filter_map(move |(chunk, src)| { + if src == addr { + Some(chunk.into()) + } else { + None + } + }); + + let stream = Box::new( + future::lazy(|| { + tokio::spawn(forward_stdin); + future::ok(receive) + }) + .flatten_stream(), + ); + Ok(stream) + } +} + +// Our helper method which will read data from stdin and send it along the +// sender provided. +fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) { + let mut stdin = io::stdin(); + loop { + let mut buf = vec![0; 1024]; + let n = match stdin.read(&mut buf) { + Err(_) | Ok(0) => break, + Ok(n) => n, + }; + buf.truncate(n); + tx = match tx.send(buf).wait() { + Ok(tx) => tx, + Err(_) => break, + }; + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/echo-udp.rs b/third_party/rust/tokio-0.1.22/examples/echo-udp.rs new file mode 100644 index 0000000000..93ebca799d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/echo-udp.rs @@ -0,0 +1,74 @@ +//! An UDP echo server that just sends back everything that it receives. +//! +//! If you're on Unix you can test this out by in one terminal executing: +//! +//! cargo run --example echo-udp +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect -- --udp 127.0.0.1:8080 +//! +//! Each line you type in to the `nc` terminal should be echo'd back to you! + +#![deny(warnings)] + +#[macro_use] +extern crate futures; +extern crate tokio; + +use std::net::SocketAddr; +use std::{env, io}; + +use tokio::net::UdpSocket; +use tokio::prelude::*; + +struct Server { + socket: UdpSocket, + buf: Vec<u8>, + to_send: Option<(usize, SocketAddr)>, +} + +impl Future for Server { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + loop { + // First we check to see if there's a message we need to echo back. + // If so then we try to send it back to the original source, waiting + // until it's writable and we're able to do so. + if let Some((size, peer)) = self.to_send { + let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); + println!("Echoed {}/{} bytes to {}", amt, size, peer); + self.to_send = None; + } + + // If we're here then `to_send` is `None`, so we take a look for the + // next message we're going to echo back. + self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); + } + } +} + +fn main() -> Result<(), Box<std::error::Error>> { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + let socket = UdpSocket::bind(&addr)?; + println!("Listening on: {}", socket.local_addr()?); + + let server = Server { + socket: socket, + buf: vec![0; 1024], + to_send: None, + }; + + // This starts the server task. + // + // `map_err` handles the error by logging it and maps the future to a type + // that can be spawned. + // + // `tokio::run` spawns the task on the Tokio runtime and starts running. + tokio::run(server.map_err(|e| println!("server error = {:?}", e))); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/echo.rs b/third_party/rust/tokio-0.1.22/examples/echo.rs new file mode 100644 index 0000000000..45f808f89d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/echo.rs @@ -0,0 +1,115 @@ +//! A "hello world" echo server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! write back everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example echo +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be echo'd back to +//! you! If you open up multiple terminals running the `connect` example you +//! should be able to see them all make progress simultaneously. + +#![deny(warnings)] + +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +use std::env; +use std::net::SocketAddr; + +fn main() -> Result<(), Box<std::error::Error>> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // Here we convert the `TcpListener` to a stream of incoming connections + // with the `incoming` method. We then define how to process each element in + // the stream with the `for_each` method. + // + // This combinator, defined on the `Stream` trait, will allow us to define a + // computation to happen for all items on the stream (in this case TCP + // connections made to the server). The return value of the `for_each` + // method is itself a future representing processing the entire stream of + // connections, and ends up being our server. + let done = socket + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + // Once we're inside this closure this represents an accepted client + // from our server. The `socket` is the client connection (similar to + // how the standard library operates). + // + // We just want to copy all data read from the socket back onto the + // socket itself (e.g. "echo"). We can use the standard `io::copy` + // combinator in the `tokio-core` crate to do precisely this! + // + // The `copy` function takes two arguments, where to read from and where + // to write to. We only have one argument, though, with `socket`. + // Luckily there's a method, `Io::split`, which will split an Read/Write + // stream into its two halves. This operation allows us to work with + // each stream independently, such as pass them as two arguments to the + // `copy` function. + // + // The `copy` function then returns a future, and this future will be + // resolved when the copying operation is complete, resolving to the + // amount of data that was copied. + let (reader, writer) = socket.split(); + let amt = io::copy(reader, writer); + + // After our copy operation is complete we just print out some helpful + // information. + let msg = amt.then(move |result| { + match result { + Ok((amt, _, _)) => println!("wrote {} bytes", amt), + Err(e) => println!("error: {}", e), + } + + Ok(()) + }); + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // This function will transfer ownership of the future (`msg` in this + // case) to the Tokio runtime thread pool that. The thread pool will + // drive the future to completion. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(msg) + }); + + // And finally now that we've define what our server is, we run it! + // + // This starts the Tokio runtime, spawns the server task, and blocks the + // current thread until all tasks complete execution. Since the `done` task + // never completes (it just keeps accepting sockets), `tokio::run` blocks + // forever (until ctrl-c is pressed). + tokio::run(done); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/hello_world.rs b/third_party/rust/tokio-0.1.22/examples/hello_world.rs new file mode 100644 index 0000000000..c82762691a --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/hello_world.rs @@ -0,0 +1,58 @@ +//! Hello world server. +//! +//! A simple client that opens a TCP stream, writes "hello world\n", and closes +//! the connection. +//! +//! You can test this out by running: +//! +//! ncat -l 6142 +//! +//! And then in another terminal run: +//! +//! cargo run --example hello_world + +#![deny(warnings)] + +extern crate tokio; + +use tokio::io; +use tokio::net::TcpStream; +use tokio::prelude::*; + +pub fn main() -> Result<(), Box<std::error::Error>> { + let addr = "127.0.0.1:6142".parse()?; + + // Open a TCP stream to the socket address. + // + // Note that this is the Tokio TcpStream, which is fully async. + let client = TcpStream::connect(&addr) + .and_then(|stream| { + println!("created stream"); + io::write_all(stream, "hello world\n").then(|result| { + println!("wrote to stream; success={:?}", result.is_ok()); + Ok(()) + }) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("connection error = {:?}", err); + }); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + println!("About to create the stream and write to it..."); + tokio::run(client); + println!("Stream has been created and written to."); + + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs b/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs new file mode 100644 index 0000000000..8e3e129965 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/manual-runtime.rs @@ -0,0 +1,87 @@ +//! An example how to manually assemble a runtime and run some tasks on it. +//! +//! This is closer to the single-threaded runtime than the default tokio one, as it is simpler to +//! grasp. There are conceptually similar, but the multi-threaded one would be more code. If you +//! just want to *use* a single-threaded runtime, use the one provided by tokio directly +//! (`tokio::runtime::current_thread::Runtime::new()`. This is a demonstration only. +//! +//! Note that the error handling is a bit left out. Also, the `run` could be modified to return the +//! result of the provided future. + +extern crate futures; +extern crate tokio; +extern crate tokio_current_thread; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_timer; + +use std::io::Error as IoError; +use std::time::{Duration, Instant}; + +use futures::{future, Future}; +use tokio_current_thread::CurrentThread; +use tokio_reactor::Reactor; +use tokio_timer::timer::{self, Timer}; + +/// Creates a "runtime". +/// +/// This is similar to running `tokio::runtime::current_thread::Runtime::new()`. +fn run<F: Future<Item = (), Error = ()>>(f: F) -> Result<(), IoError> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new(reactor); + let timer_handle = timer.handle(); + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor generate some new stimuli for the + // futures to continue in their life. + let mut executor = CurrentThread::new_with_park(timer); + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + // This will set the default handle and timer to use inside the closure and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the current single-threaded + // executor when used. This is a trick, because we need two mutable references to the + // executor (one to run the provided future, another to install as the default one). We + // use the fake one here as the default one. + let mut default_executor = tokio_current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + // Run the provided future + executor.block_on(f).unwrap(); + // Run all the other futures that are still left in the executor + executor.run().unwrap(); + }); + }); + }); + Ok(()) +} + +fn main() -> Result<(), Box<std::error::Error>> { + run(future::lazy(|| { + // Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn(). + // It also can use the default reactor and create timeouts. + + // Connect somewhere. And then do nothing with it. Yes, useless. + // + // This will use the default reactor which runs in the current thread. + let connect = tokio::net::TcpStream::connect(&"127.0.0.1:53".parse().unwrap()) + .map(|_| println!("Connected")) + .map_err(|e| println!("Failed to connect: {}", e)); + // We can spawn it without requiring Send. This would panic if we run it outside of the + // `run` (or outside of anything else) + tokio_current_thread::spawn(connect); + + // We can also create timeouts. + let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5)) + .map(|()| println!("5 seconds are over")) + .map_err(|e| println!("Failed to wait: {}", e)); + // We can spawn on the default executor, which is also the local one. + tokio::executor::spawn(deadline); + Ok(()) + }))?; + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs b/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs new file mode 100644 index 0000000000..94a606483c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/print_each_packet.rs @@ -0,0 +1,150 @@ +//! A "print-each-packet" server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! put down in the stdout everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example print\_each\_packet +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be written to terminal! +//! +//! Minimal js example: +//! +//! ```js +//! var net = require("net"); +//! +//! var listenPort = 8080; +//! +//! var server = net.createServer(function (socket) { +//! socket.on("data", function (bytes) { +//! console.log("bytes", bytes); +//! }); +//! +//! socket.on("end", function() { +//! console.log("Socket received FIN packet and closed connection"); +//! }); +//! socket.on("error", function (error) { +//! console.log("Socket closed with error", error); +//! }); +//! +//! socket.on("close", function (with_error) { +//! if (with_error) { +//! console.log("Socket closed with result: Err(SomeError)"); +//! } else { +//! console.log("Socket closed with result: Ok(())"); +//! } +//! }); +//! +//! }); +//! +//! server.listen(listenPort); +//! +//! console.log("Listening on:", listenPort); +//! ``` +//! + +#![deny(warnings)] + +extern crate tokio; +extern crate tokio_codec; + +use tokio::codec::Decoder; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio_codec::BytesCodec; + +use std::env; +use std::net::SocketAddr; + +fn main() -> Result<(), Box<std::error::Error>> { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + // Here we convert the `TcpListener` to a stream of incoming connections + // with the `incoming` method. We then define how to process each element in + // the stream with the `for_each` method. + // + // This combinator, defined on the `Stream` trait, will allow us to define a + // computation to happen for all items on the stream (in this case TCP + // connections made to the server). The return value of the `for_each` + // method is itself a future representing processing the entire stream of + // connections, and ends up being our server. + let done = socket + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + // Once we're inside this closure this represents an accepted client + // from our server. The `socket` is the client connection (similar to + // how the standard library operates). + // + // We're parsing each socket with the `BytesCodec` included in `tokio_io`, + // and then we `split` each codec into the reader/writer halves. + // + // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html + let framed = BytesCodec::new().framed(socket); + let (_writer, reader) = framed.split(); + + let processor = reader + .for_each(|bytes| { + println!("bytes: {:?}", bytes); + Ok(()) + }) + // After our copy operation is complete we just print out some helpful + // information. + .and_then(|()| { + println!("Socket received FIN packet and closed connection"); + Ok(()) + }) + .or_else(|err| { + println!("Socket closed with error: {:?}", err); + // We have to return the error to catch it in the next ``.then` call + Err(err) + }) + .then(|result| { + println!("Socket closed with result: {:?}", result); + Ok(()) + }); + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // This function will transfer ownership of the future (`msg` in this + // case) to the Tokio runtime thread pool that. The thread pool will + // drive the future to completion. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(processor) + }); + + // And finally now that we've define what our server is, we run it! + // + // This starts the Tokio runtime, spawns the server task, and blocks the + // current thread until all tasks complete execution. Since the `done` task + // never completes (it just keeps accepting sockets), `tokio::run` blocks + // forever (until ctrl-c is pressed). + tokio::run(done); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/proxy.rs b/third_party/rust/tokio-0.1.22/examples/proxy.rs new file mode 100644 index 0000000000..2cbcf119a2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/proxy.rs @@ -0,0 +1,130 @@ +//! A proxy that forwards data to another server and forwards that server's +//! responses back to clients. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! You can showcase this by running this in one terminal: +//! +//! cargo run --example proxy +//! +//! This in another terminal +//! +//! cargo run --example echo +//! +//! And finally this in another terminal +//! +//! cargo run --example connect 127.0.0.1:8081 +//! +//! This final terminal will connect to our proxy, which will in turn connect to +//! the echo server, and you'll be able to see data flowing between them. + +#![deny(warnings)] + +extern crate tokio; + +use std::env; +use std::io::{self, Read, Write}; +use std::net::{Shutdown, SocketAddr}; +use std::sync::{Arc, Mutex}; + +use tokio::io::{copy, shutdown}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +fn main() -> Result<(), Box<std::error::Error>> { + let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); + let listen_addr = listen_addr.parse::<SocketAddr>()?; + + let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); + let server_addr = server_addr.parse::<SocketAddr>()?; + + // Create a TCP listener which will listen for incoming connections. + let socket = TcpListener::bind(&listen_addr)?; + println!("Listening on: {}", listen_addr); + println!("Proxying to: {}", server_addr); + + let done = socket + .incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |client| { + let server = TcpStream::connect(&server_addr); + let amounts = server.and_then(move |server| { + // Create separate read/write handles for the TCP clients that we're + // proxying data between. Note that typically you'd use + // `AsyncRead::split` for this operation, but we want our writer + // handles to have a custom implementation of `shutdown` which + // actually calls `TcpStream::shutdown` to ensure that EOF is + // transmitted properly across the proxied connection. + // + // As a result, we wrap up our client/server manually in arcs and + // use the impls below on our custom `MyTcpStream` type. + let client_reader = MyTcpStream(Arc::new(Mutex::new(client))); + let client_writer = client_reader.clone(); + let server_reader = MyTcpStream(Arc::new(Mutex::new(server))); + let server_writer = server_reader.clone(); + + // Copy the data (in parallel) between the client and the server. + // After the copy is done we indicate to the remote side that we've + // finished by shutting down the connection. + let client_to_server = copy(client_reader, server_writer) + .and_then(|(n, _, server_writer)| shutdown(server_writer).map(move |_| n)); + + let server_to_client = copy(server_reader, client_writer) + .and_then(|(n, _, client_writer)| shutdown(client_writer).map(move |_| n)); + + client_to_server.join(server_to_client) + }); + + let msg = amounts + .map(move |(from_client, from_server)| { + println!( + "client wrote {} bytes and received {} bytes", + from_client, from_server + ); + }) + .map_err(|e| { + // Don't panic. Maybe the client just disconnected too soon. + println!("error: {}", e); + }); + + tokio::spawn(msg); + + Ok(()) + }); + + tokio::run(done); + Ok(()) +} + +// This is a custom type used to have a custom implementation of the +// `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to +// notify the remote end that we're done writing. +#[derive(Clone)] +struct MyTcpStream(Arc<Mutex<TcpStream>>); + +impl Read for MyTcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.lock().unwrap().read(buf) + } +} + +impl Write for MyTcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.lock().unwrap().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncRead for MyTcpStream {} + +impl AsyncWrite for MyTcpStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.lock().unwrap().shutdown(Shutdown::Write)?; + Ok(().into()) + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/tinydb.rs b/third_party/rust/tokio-0.1.22/examples/tinydb.rs new file mode 100644 index 0000000000..11298ed133 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/tinydb.rs @@ -0,0 +1,227 @@ +//! A "tiny database" and accompanying protocol +//! +//! This example shows the usage of shared state amongst all connected clients, +//! namely a database of key/value pairs. Each connected client can send a +//! series of GET/SET commands to query the current value of a key or set the +//! value of a key. +//! +//! This example has a simple protocol you can use to interact with the server. +//! To run, first run this in one terminal window: +//! +//! cargo run --example tinydb +//! +//! and next in another windows run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! In the `connect` window you can type in commands where when you hit enter +//! you'll get a response from the server for that command. An example session +//! is: +//! +//! +//! $ cargo run --example connect 127.0.0.1:8080 +//! GET foo +//! foo = bar +//! GET FOOBAR +//! error: no key FOOBAR +//! SET FOOBAR my awesome string +//! set FOOBAR = `my awesome string`, previous: None +//! SET foo tokio +//! set foo = `tokio`, previous: Some("bar") +//! GET foo +//! foo = tokio +//! +//! Namely you can issue two forms of commands: +//! +//! * `GET $key` - this will fetch the value of `$key` from the database and +//! return it. The server's database is initially populated with the key `foo` +//! set to the value `bar` +//! * `SET $key $value` - this will set the value of `$key` to `$value`, +//! returning the previous value, if any. + +#![deny(warnings)] + +extern crate tokio; + +use std::collections::HashMap; +use std::env; +use std::io::BufReader; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +use tokio::io::{lines, write_all}; +use tokio::net::TcpListener; +use tokio::prelude::*; + +/// The in-memory database shared amongst all clients. +/// +/// This database will be shared via `Arc`, so to mutate the internal map we're +/// going to use a `Mutex` for interior mutability. +struct Database { + map: Mutex<HashMap<String, String>>, +} + +/// Possible requests our clients can send us +enum Request { + Get { key: String }, + Set { key: String, value: String }, +} + +/// Responses to the `Request` commands above +enum Response { + Value { + key: String, + value: String, + }, + Set { + key: String, + value: String, + previous: Option<String>, + }, + Error { + msg: String, + }, +} + +fn main() -> Result<(), Box<std::error::Error>> { + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + let listener = TcpListener::bind(&addr).map_err(|_| "failed to bind")?; + println!("Listening on: {}", addr); + + // Create the shared state of this server that will be shared amongst all + // clients. We populate the initial database and then create the `Database` + // structure. Note the usage of `Arc` here which will be used to ensure that + // each independently spawned client will have a reference to the in-memory + // database. + let mut initial_db = HashMap::new(); + initial_db.insert("foo".to_string(), "bar".to_string()); + let db = Arc::new(Database { + map: Mutex::new(initial_db), + }); + + let done = listener + .incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |socket| { + // As with many other small examples, the first thing we'll do is + // *split* this TCP stream into two separately owned halves. This'll + // allow us to work with the read and write halves independently. + let (reader, writer) = socket.split(); + + // Since our protocol is line-based we use `tokio_io`'s `lines` utility + // to convert our stream of bytes, `reader`, into a `Stream` of lines. + let lines = lines(BufReader::new(reader)); + + // Here's where the meat of the processing in this server happens. First + // we see a clone of the database being created, which is creating a + // new reference for this connected client to use. Also note the `move` + // keyword on the closure here which moves ownership of the reference + // into the closure, which we'll need for spawning the client below. + // + // The `map` function here means that we'll run some code for all + // requests (lines) we receive from the client. The actual handling here + // is pretty simple, first we parse the request and if it's valid we + // generate a response based on the values in the database. + let db = db.clone(); + let responses = lines.map(move |line| { + let request = match Request::parse(&line) { + Ok(req) => req, + Err(e) => return Response::Error { msg: e }, + }; + + let mut db = db.map.lock().unwrap(); + match request { + Request::Get { key } => match db.get(&key) { + Some(value) => Response::Value { + key, + value: value.clone(), + }, + None => Response::Error { + msg: format!("no key {}", key), + }, + }, + Request::Set { key, value } => { + let previous = db.insert(key.clone(), value.clone()); + Response::Set { + key, + value, + previous, + } + } + } + }); + + // At this point `responses` is a stream of `Response` types which we + // now want to write back out to the client. To do that we use + // `Stream::fold` to perform a loop here, serializing each response and + // then writing it out to the client. + let writes = responses.fold(writer, |writer, response| { + let mut response = response.serialize(); + response.push('\n'); + write_all(writer, response.into_bytes()).map(|(w, _)| w) + }); + + // Like with other small servers, we'll `spawn` this client to ensure it + // runs concurrently with all other clients, for now ignoring any errors + // that we see. + let msg = writes.then(move |_| Ok(())); + + tokio::spawn(msg) + }); + + tokio::run(done); + Ok(()) +} + +impl Request { + fn parse(input: &str) -> Result<Request, String> { + let mut parts = input.splitn(3, " "); + match parts.next() { + Some("GET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("GET must be followed by a key")), + }; + if parts.next().is_some() { + return Err(format!("GET's key must not be followed by anything")); + } + Ok(Request::Get { + key: key.to_string(), + }) + } + Some("SET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("SET must be followed by a key")), + }; + let value = match parts.next() { + Some(value) => value, + None => return Err(format!("SET needs a value")), + }; + Ok(Request::Set { + key: key.to_string(), + value: value.to_string(), + }) + } + Some(cmd) => Err(format!("unknown command: {}", cmd)), + None => Err(format!("empty input")), + } + } +} + +impl Response { + fn serialize(&self) -> String { + match *self { + Response::Value { ref key, ref value } => format!("{} = {}", key, value), + Response::Set { + ref key, + ref value, + ref previous, + } => format!("set {} = `{}`, previous: {:?}", key, value, previous), + Response::Error { ref msg } => format!("error: {}", msg), + } + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs b/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs new file mode 100644 index 0000000000..cde1b79afb --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/tinyhttp.rs @@ -0,0 +1,325 @@ +//! A "tiny" example of HTTP request/response handling using transports. +//! +//! This example is intended for *learning purposes* to see how various pieces +//! hook up together and how HTTP can get up and running. Note that this example +//! is written with the restriction that it *can't* use any "big" library other +//! than Tokio, if you'd like a "real world" HTTP library you likely want a +//! crate like Hyper. +//! +//! Code here is based on the `echo-threads` example and implements two paths, +//! the `/plaintext` and `/json` routes to respond with some text and json, +//! respectively. By default this will run I/O on all the cores your system has +//! available, and it doesn't support HTTP request bodies. + +#![deny(warnings)] + +extern crate bytes; +extern crate http; +extern crate httparse; +#[macro_use] +extern crate serde_derive; +extern crate serde_json; +extern crate time; +extern crate tokio; +extern crate tokio_io; + +use std::net::SocketAddr; +use std::{env, fmt, io}; + +use tokio::codec::{Decoder, Encoder}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +use bytes::BytesMut; +use http::header::HeaderValue; +use http::{Request, Response, StatusCode}; + +fn main() -> Result<(), Box<std::error::Error>> { + // Parse the arguments, bind the TCP socket we'll be listening to, spin up + // our worker threads, and start shipping sockets to those worker threads. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>()?; + + let listener = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + tokio::run({ + listener + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(|socket| { + process(socket); + Ok(()) + }) + }); + Ok(()) +} + +fn process(socket: TcpStream) { + let (tx, rx) = + // Frame the socket using the `Http` protocol. This maps the TCP socket + // to a Stream + Sink of HTTP frames. + Http.framed(socket) + // This splits a single `Stream + Sink` value into two separate handles + // that can be used independently (even on different tasks or threads). + .split(); + + // Map all requests into responses and send them back to the client. + let task = tx.send_all(rx.and_then(respond)).then(|res| { + if let Err(e) = res { + println!("failed to process connection; error = {:?}", e); + } + + Ok(()) + }); + + // Spawn the task that handles the connection. + tokio::spawn(task); +} + +/// "Server logic" is implemented in this function. +/// +/// This function is a map from and HTTP request to a future of a response and +/// represents the various handling a server might do. Currently the contents +/// here are pretty uninteresting. +fn respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send> { + let f = future::lazy(move || { + let mut response = Response::builder(); + let body = match req.uri().path() { + "/plaintext" => { + response.header("Content-Type", "text/plain"); + "Hello, World!".to_string() + } + "/json" => { + response.header("Content-Type", "application/json"); + + #[derive(Serialize)] + struct Message { + message: &'static str, + } + serde_json::to_string(&Message { + message: "Hello, World!", + })? + } + _ => { + response.status(StatusCode::NOT_FOUND); + String::new() + } + }; + let response = response + .body(body) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + Ok(response) + }); + + Box::new(f) +} + +struct Http; + +/// Implementation of encoding an HTTP response into a `BytesMut`, basically +/// just writing out an HTTP/1.1 response. +impl Encoder for Http { + type Item = Response<String>; + type Error = io::Error; + + fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> { + use std::fmt::Write; + + write!( + BytesWrite(dst), + "\ + HTTP/1.1 {}\r\n\ + Server: Example\r\n\ + Content-Length: {}\r\n\ + Date: {}\r\n\ + ", + item.status(), + item.body().len(), + date::now() + ) + .unwrap(); + + for (k, v) in item.headers() { + dst.extend_from_slice(k.as_str().as_bytes()); + dst.extend_from_slice(b": "); + dst.extend_from_slice(v.as_bytes()); + dst.extend_from_slice(b"\r\n"); + } + + dst.extend_from_slice(b"\r\n"); + dst.extend_from_slice(item.body().as_bytes()); + + return Ok(()); + + // Right now `write!` on `Vec<u8>` goes through io::Write and is not + // super speedy, so inline a less-crufty implementation here which + // doesn't go through io::Error. + struct BytesWrite<'a>(&'a mut BytesMut); + + impl<'a> fmt::Write for BytesWrite<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } + + fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result { + fmt::write(self, args) + } + } + } +} + +/// Implementation of decoding an HTTP request from the bytes we've read so far. +/// This leverages the `httparse` crate to do the actual parsing and then we use +/// that information to construct an instance of a `http::Request` object, +/// trying to avoid allocations where possible. +impl Decoder for Http { + type Item = Request<()>; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> { + // TODO: we should grow this headers array if parsing fails and asks + // for more headers + let mut headers = [None; 16]; + let (method, path, version, amt) = { + let mut parsed_headers = [httparse::EMPTY_HEADER; 16]; + let mut r = httparse::Request::new(&mut parsed_headers); + let status = r.parse(src).map_err(|e| { + let msg = format!("failed to parse http request: {:?}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let amt = match status { + httparse::Status::Complete(amt) => amt, + httparse::Status::Partial => return Ok(None), + }; + + let toslice = |a: &[u8]| { + let start = a.as_ptr() as usize - src.as_ptr() as usize; + assert!(start < src.len()); + (start, start + a.len()) + }; + + for (i, header) in r.headers.iter().enumerate() { + let k = toslice(header.name.as_bytes()); + let v = toslice(header.value); + headers[i] = Some((k, v)); + } + + ( + toslice(r.method.unwrap().as_bytes()), + toslice(r.path.unwrap().as_bytes()), + r.version.unwrap(), + amt, + ) + }; + if version != 1 { + return Err(io::Error::new( + io::ErrorKind::Other, + "only HTTP/1.1 accepted", + )); + } + let data = src.split_to(amt).freeze(); + let mut ret = Request::builder(); + ret.method(&data[method.0..method.1]); + ret.uri(data.slice(path.0, path.1)); + ret.version(http::Version::HTTP_11); + for header in headers.iter() { + let (k, v) = match *header { + Some((ref k, ref v)) => (k, v), + None => break, + }; + let value = unsafe { HeaderValue::from_shared_unchecked(data.slice(v.0, v.1)) }; + ret.header(&data[k.0..k.1], value); + } + + let req = ret + .body(()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(Some(req)) + } +} + +mod date { + use std::cell::RefCell; + use std::fmt::{self, Write}; + use std::str; + + use time::{self, Duration}; + + pub struct Now(()); + + /// Returns a struct, which when formatted, renders an appropriate `Date` + /// header value. + pub fn now() -> Now { + Now(()) + } + + // Gee Alex, doesn't this seem like premature optimization. Well you see + // there Billy, you're absolutely correct! If your server is *bottlenecked* + // on rendering the `Date` header, well then boy do I have news for you, you + // don't need this optimization. + // + // In all seriousness, though, a simple "hello world" benchmark which just + // sends back literally "hello world" with standard headers actually is + // bottlenecked on rendering a date into a byte buffer. Since it was at the + // top of a profile, and this was done for some competitive benchmarks, this + // module was written. + // + // Just to be clear, though, I was not intending on doing this because it + // really does seem kinda absurd, but it was done by someone else [1], so I + // blame them! :) + // + // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66 + + struct LastRenderedNow { + bytes: [u8; 128], + amt: usize, + next_update: time::Timespec, + } + + thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow { + bytes: [0; 128], + amt: 0, + next_update: time::Timespec::new(0, 0), + })); + + impl fmt::Display for Now { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + LAST.with(|cache| { + let mut cache = cache.borrow_mut(); + let now = time::get_time(); + if now >= cache.next_update { + cache.update(now); + } + f.write_str(cache.buffer()) + }) + } + } + + impl LastRenderedNow { + fn buffer(&self) -> &str { + str::from_utf8(&self.bytes[..self.amt]).unwrap() + } + + fn update(&mut self, now: time::Timespec) { + self.amt = 0; + write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap(); + self.next_update = now + Duration::seconds(1); + self.next_update.nsec = 0; + } + } + + struct LocalBuffer<'a>(&'a mut LastRenderedNow); + + impl<'a> fmt::Write for LocalBuffer<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + let start = self.0.amt; + let end = start + s.len(); + self.0.bytes[start..end].copy_from_slice(s.as_bytes()); + self.0.amt += s.len(); + Ok(()) + } + } +} diff --git a/third_party/rust/tokio-0.1.22/examples/udp-client.rs b/third_party/rust/tokio-0.1.22/examples/udp-client.rs new file mode 100644 index 0000000000..900d3616df --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/udp-client.rs @@ -0,0 +1,70 @@ +//! A UDP client that just sends everything it gets via `stdio` in a single datagram, and then +//! waits for a reply. +//! +//! For the reasons of simplicity data from `stdio` is read until `EOF` in a blocking manner. +//! +//! You can test this out by running an echo server: +//! +//! ``` +//! $ cargo run --example echo-udp -- 127.0.0.1:8080 +//! ``` +//! +//! and running the client in another terminal: +//! +//! ``` +//! $ cargo run --example udp-client +//! ``` +//! +//! You can optionally provide any custom endpoint address for the client: +//! +//! ``` +//! $ cargo run --example udp-client -- 127.0.0.1:8080 +//! ``` +//! +//! Don't forget to pass `EOF` to the standard input of the client! +//! +//! Please mind that since the UDP protocol doesn't have any capabilities to detect a broken +//! connection the server needs to be run first, otherwise the client will block forever. + +extern crate futures; +extern crate tokio; + +use std::env; +use std::io::stdin; +use std::net::SocketAddr; +use tokio::net::UdpSocket; +use tokio::prelude::*; + +fn get_stdin_data() -> Result<Vec<u8>, Box<std::error::Error>> { + let mut buf = Vec::new(); + stdin().read_to_end(&mut buf)?; + Ok(buf) +} + +fn main() -> Result<(), Box<std::error::Error>> { + let remote_addr: SocketAddr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".into()) + .parse()?; + // We use port 0 to let the operating system allocate an available port for us. + let local_addr: SocketAddr = if remote_addr.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + } + .parse()?; + let socket = UdpSocket::bind(&local_addr)?; + const MAX_DATAGRAM_SIZE: usize = 65_507; + socket + .send_dgram(get_stdin_data()?, &remote_addr) + .and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE])) + .map(|(_, data, len, _)| { + println!( + "Received {} bytes:\n{}", + len, + String::from_utf8_lossy(&data[..len]) + ) + }) + .wait()?; + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/examples/udp-codec.rs b/third_party/rust/tokio-0.1.22/examples/udp-codec.rs new file mode 100644 index 0000000000..3657d8cc17 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/udp-codec.rs @@ -0,0 +1,65 @@ +//! This example leverages `BytesCodec` to create a UDP client and server which +//! speak a custom protocol. +//! +//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of +//! client messages. These messages are then processed and returned back as a +//! new message with a new destination. Overall, we then use this to construct a +//! "ping pong" pair where two sockets are sending messages back and forth. + +#![deny(warnings)] + +extern crate env_logger; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; + +use std::net::SocketAddr; + +use tokio::net::{UdpFramed, UdpSocket}; +use tokio::prelude::*; +use tokio_codec::BytesCodec; + +fn main() -> Result<(), Box<std::error::Error>> { + let _ = env_logger::init(); + + let addr: SocketAddr = "127.0.0.1:0".parse()?; + + // Bind both our sockets and then figure out what ports we got. + let a = UdpSocket::bind(&addr)?; + let b = UdpSocket::bind(&addr)?; + let b_addr = b.local_addr()?; + + // We're parsing each socket with the `BytesCodec` included in `tokio_io`, and then we + // `split` each codec into the sink/stream halves. + let (a_sink, a_stream) = UdpFramed::new(a, BytesCodec::new()).split(); + let (b_sink, b_stream) = UdpFramed::new(b, BytesCodec::new()).split(); + + // Start off by sending a ping from a to b, afterwards we just print out + // what they send us and continually send pings + // let pings = stream::iter((0..5).map(Ok)); + let a = a_sink.send(("PING".into(), b_addr)).and_then(|a_sink| { + let mut i = 0; + let a_stream = a_stream.take(4).map(move |(msg, addr)| { + i += 1; + println!("[a] recv: {}", String::from_utf8_lossy(&msg)); + (format!("PING {}", i).into(), addr) + }); + a_sink.send_all(a_stream) + }); + + // The second client we have will receive the pings from `a` and then send + // back pongs. + let b_stream = b_stream.map(|(msg, addr)| { + println!("[b] recv: {}", String::from_utf8_lossy(&msg)); + ("PONG".into(), addr) + }); + let b = b_sink.send_all(b_stream); + + // Spawn the sender of pongs and then wait for our pinger to finish. + tokio::run({ + b.join(a) + .map(|_| ()) + .map_err(|e| println!("error = {:?}", e)) + }); + Ok(()) +} diff --git a/third_party/rust/tokio-0.1.22/src/async_await.rs b/third_party/rust/tokio-0.1.22/src/async_await.rs new file mode 100644 index 0000000000..ed8b52d073 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/async_await.rs @@ -0,0 +1,17 @@ +use tokio_futures::compat; + +/// Like `tokio::run`, but takes an `async` block +pub fn run_async<F>(future: F) +where + F: std::future::Future<Output = ()> + Send + 'static, +{ + ::run(compat::infallible_into_01(future)); +} + +/// Like `tokio::spawn`, but takes an `async` block +pub fn spawn_async<F>(future: F) +where + F: std::future::Future<Output = ()> + Send + 'static, +{ + ::spawn(compat::infallible_into_01(future)); +} diff --git a/third_party/rust/tokio-0.1.22/src/clock.rs b/third_party/rust/tokio-0.1.22/src/clock.rs new file mode 100644 index 0000000000..7ddbbf37fe --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/clock.rs @@ -0,0 +1,15 @@ +//! A configurable source of time. +//! +//! This module provides the [`now`][n] function, which returns an `Instant` +//! representing "now". The source of time used by this function is configurable +//! (via the [`tokio-timer`] crate) and allows mocking out the source of time in +//! tests or performing caching operations to reduce the number of syscalls. +//! +//! Note that, because the source of time is configurable, it is possible to +//! observe non-monotonic behavior when calling [`now`][n] from different +//! executors. +//! +//! [n]: fn.now.html +//! [`tokio-timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/clock/index.html + +pub use tokio_timer::clock::now; diff --git a/third_party/rust/tokio-0.1.22/src/codec/length_delimited.rs b/third_party/rust/tokio-0.1.22/src/codec/length_delimited.rs new file mode 100644 index 0000000000..87393982b0 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/codec/length_delimited.rs @@ -0,0 +1,982 @@ +//! Frame a stream of bytes based on a length prefix +//! +//! Many protocols delimit their frames by prefacing frame data with a +//! frame head that specifies the length of the frame. The +//! `length_delimited` module provides utilities for handling the length +//! based framing. This allows the consumer to work with entire frames +//! without having to worry about buffering or other framing logic. +//! +//! # Getting started +//! +//! If implementing a protocol from scratch, using length delimited framing +//! is an easy way to get started. [`LengthDelimitedCodec::new()`] will +//! return a length delimited codec using default configuration values. +//! This can then be used to construct a framer to adapt a full-duplex +//! byte stream into a stream of frames. +//! +//! ``` +//! # extern crate tokio; +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use tokio::codec::*; +//! +//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) +//! -> Framed<T, LengthDelimitedCodec> +//! { +//! Framed::new(io, LengthDelimitedCodec::new()) +//! } +//! # pub fn main() {} +//! ``` +//! +//! The returned transport implements `Sink + Stream` for `BytesMut`. It +//! encodes the frame with a big-endian `u32` header denoting the frame +//! payload length: +//! +//! ```text +//! +----------+--------------------------------+ +//! | len: u32 | frame payload | +//! +----------+--------------------------------+ +//! ``` +//! +//! Specifically, given the following: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate bytes; +//! # extern crate futures; +//! # +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use tokio::codec::*; +//! use bytes::Bytes; +//! use futures::{Sink, Future}; +//! +//! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) -> Result<(), Box<std::error::Error>> { +//! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); +//! let frame = Bytes::from("hello world"); +//! +//! transport.send(frame).wait()?; +//! Ok(()) +//! } +//! # +//! # pub fn main() {} +//! ``` +//! +//! The encoded frame will look like this: +//! +//! ```text +//! +---- len: u32 ----+---- data ----+ +//! | \x00\x00\x00\x0b | hello world | +//! +------------------+--------------+ +//! ``` +//! +//! # Decoding +//! +//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], +//! such that each yielded [`BytesMut`] value contains the contents of an +//! entire frame. There are many configuration parameters enabling +//! [`FramedRead`] to handle a wide range of protocols. Here are some +//! examples that will cover the various options at a high level. +//! +//! ## Example 1 +//! +//! The following will parse a `u16` length field at offset 0, including the +//! frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(0) // default value +//! .num_skip(0) // Do not strip frame header +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ +//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | +//! +----------+---------------+ +----------+---------------+ +//! ``` +//! +//! The value of the length field is 11 (`\x0B`) which represents the length +//! of the payload, `hello world`. By default, [`FramedRead`] assumes that +//! the length field represents the number of bytes that **follows** the +//! length field. Thus, the entire frame has a length of 13: 2 bytes for the +//! frame head + 11 bytes for the payload. +//! +//! ## Example 2 +//! +//! The following will parse a `u16` length field at offset 0, omitting the +//! frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(0) // default value +//! // `num_skip` is not needed, the default is to skip +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +--- Payload ---+ +//! | \x00\x0B | Hello world | --> | Hello world | +//! +----------+---------------+ +---------------+ +//! ``` +//! +//! This is similar to the first example, the only difference is that the +//! frame head is **not** included in the yielded `BytesMut` value. +//! +//! ## Example 3 +//! +//! The following will parse a `u16` length field at offset 0, including the +//! frame head in the yielded `BytesMut`. In this case, the length field +//! **includes** the frame head length. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(-2) // size of head +//! .num_skip(0) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ +//! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | +//! +----------+---------------+ +----------+---------------+ +//! ``` +//! +//! In most cases, the length field represents the length of the payload +//! only, as shown in the previous examples. However, in some protocols the +//! length field represents the length of the whole frame, including the +//! head. In such cases, we specify a negative `length_adjustment` to adjust +//! the value provided in the frame head to represent the payload length. +//! +//! ## Example 4 +//! +//! The following will parse a 3 byte length field at offset 0 in a 5 byte +//! frame head, including the frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(3) +//! .length_adjustment(2) // remaining head +//! .num_skip(0) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +---- len -----+- head -+--- Payload ---+ +//! | \x00\x00\x0B | \xCAFE | Hello world | +//! +--------------+--------+---------------+ +//! +//! DECODED +//! +---- len -----+- head -+--- Payload ---+ +//! | \x00\x00\x0B | \xCAFE | Hello world | +//! +--------------+--------+---------------+ +//! ``` +//! +//! A more advanced example that shows a case where there is extra frame +//! head data between the length field and the payload. In such cases, it is +//! usually desirable to include the frame head as part of the yielded +//! `BytesMut`. This lets consumers of the length delimited framer to +//! process the frame head as needed. +//! +//! The positive `length_adjustment` value lets `FramedRead` factor in the +//! additional head into the frame length calculation. +//! +//! ## Example 5 +//! +//! The following will parse a `u16` length field at offset 1 of a 4 byte +//! frame head. The first byte and the length field will be omitted from the +//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be +//! included. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(1) // length of hdr1 +//! .length_field_length(2) +//! .length_adjustment(1) // length of hdr2 +//! .num_skip(3) // length of hdr1 + LEN +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ +//! | \xCA | \x00\x0B | \xFE | Hello world | +//! +--------+----------+--------+---------------+ +//! +//! DECODED +//! +- hdr2 -+--- Payload ---+ +//! | \xFE | Hello world | +//! +--------+---------------+ +//! ``` +//! +//! The length field is situated in the middle of the frame head. In this +//! case, the first byte in the frame head could be a version or some other +//! identifier that is not needed for processing. On the other hand, the +//! second half of the head is needed. +//! +//! `length_field_offset` indicates how many bytes to skip before starting +//! to read the length field. `length_adjustment` is the number of bytes to +//! skip starting at the end of the length field. In this case, it is the +//! second half of the head. +//! +//! ## Example 6 +//! +//! The following will parse a `u16` length field at offset 1 of a 4 byte +//! frame head. The first byte and the length field will be omitted from the +//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be +//! included. In this case, the length field **includes** the frame head +//! length. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(1) // length of hdr1 +//! .length_field_length(2) +//! .length_adjustment(-3) // length of hdr1 + LEN, negative +//! .num_skip(3) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ +//! | \xCA | \x00\x0F | \xFE | Hello world | +//! +--------+----------+--------+---------------+ +//! +//! DECODED +//! +- hdr2 -+--- Payload ---+ +//! | \xFE | Hello world | +//! +--------+---------------+ +//! ``` +//! +//! Similar to the example above, the difference is that the length field +//! represents the length of the entire frame instead of just the payload. +//! The length of `hdr1` and `len` must be counted in `length_adjustment`. +//! Note that the length of `hdr2` does **not** need to be explicitly set +//! anywhere because it already is factored into the total frame length that +//! is read from the byte stream. +//! +//! # Encoding +//! +//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], +//! such that each submitted [`BytesMut`] is prefaced by a length field. +//! There are fewer configuration options than [`FramedRead`]. Given +//! protocols that have more complex frame heads, an encoder should probably +//! be written by hand using [`Encoder`]. +//! +//! Here is a simple example, given a `FramedWrite` with the following +//! configuration: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate bytes; +//! # use tokio::io::AsyncWrite; +//! # use tokio::codec::length_delimited; +//! # use bytes::BytesMut; +//! # fn write_frame<T: AsyncWrite>(io: T) { +//! # let _ = +//! length_delimited::Builder::new() +//! .length_field_length(2) +//! .new_write(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! A payload of `hello world` will be encoded as: +//! +//! ```text +//! +- len: u16 -+---- data ----+ +//! | \x00\x0b | hello world | +//! +------------+--------------+ +//! ``` +//! +//! [`LengthDelimitedCodec::new()`]: struct.LengthDelimitedCodec.html#method.new +//! [`FramedRead`]: struct.FramedRead.html +//! [`FramedWrite`]: struct.FramedWrite.html +//! [`AsyncRead`]: ../../trait.AsyncRead.html +//! [`AsyncWrite`]: ../../trait.AsyncWrite.html +//! [`Encoder`]: ../trait.Encoder.html +//! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html + +use { + codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}, + io::{AsyncRead, AsyncWrite}, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; + +use std::error::Error as StdError; +use std::io::{self, Cursor}; +use std::{cmp, fmt}; + +/// Configure length delimited `LengthDelimitedCodec`s. +/// +/// `Builder` enables constructing configured length delimited codecs. Note +/// that not all configuration settings apply to both encoding and decoding. See +/// the documentation for specific methods for more detail. +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + max_frame_len: usize, + + // Number of bytes representing the field length + length_field_len: usize, + + // Number of bytes in the header before the length field + length_field_offset: usize, + + // Adjust the length specified in the header field by this amount + length_adjustment: isize, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: Option<usize>, + + // Length field byte order (little or big endian) + length_field_is_big_endian: bool, +} + +/// An error when the number of bytes read is more than max frame length. +pub struct FrameTooBig { + _priv: (), +} + +/// A codec for frames delimited by a frame head specifying their lengths. +/// +/// This allows the consumer to work with entire frames without having to worry +/// about buffering or other framing logic. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[derive(Debug)] +pub struct LengthDelimitedCodec { + // Configuration values + builder: Builder, + + // Read state + state: DecodeState, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +// ===== impl LengthDelimitedCodec ====== + +impl LengthDelimitedCodec { + /// Creates a new `LengthDelimitedCodec` with the default configuration values. + pub fn new() -> Self { + Self { + builder: Builder::new(), + state: DecodeState::Head, + } + } + + /// Returns the current max frame setting + /// + /// This is the largest size this codec will accept from the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is decoded. In other + /// words, if a frame is currently in process of being decoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.builder.max_frame_length(val); + } + + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { + let head_len = self.builder.num_head_bytes(); + let field_len = self.builder.length_field_len; + + if src.len() < head_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + // Skip the required bytes + src.advance(self.builder.length_field_offset); + + // match endianess + let n = if self.builder.length_field_is_big_endian { + src.get_uint_be(field_len) + } else { + src.get_uint_le(field_len) + }; + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + FrameTooBig { _priv: () }, + )); + } + + // The check above ensures there is no overflow + let n = n as usize; + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_sub(-self.builder.length_adjustment as usize) + } else { + n.checked_add(self.builder.length_adjustment as usize) + }; + + // Error handling + match n { + Some(n) => n, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + )); + } + } + }; + + let num_skip = self.builder.get_num_skip(); + + if num_skip > 0 { + let _ = src.split_to(num_skip); + } + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + return Ok(Some(n)); + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + +impl Decoder for LengthDelimitedCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + let n = match self.state { + DecodeState::Head => match self.decode_head(src)? { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + }, + DecodeState::Data(n) => n, + }; + + match self.decode_data(n, src)? { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + src.reserve(self.builder.num_head_bytes()); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +impl Encoder for LengthDelimitedCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { + let n = (&data).into_buf().remaining(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + FrameTooBig { _priv: () }, + )); + } + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_add(-self.builder.length_adjustment as usize) + } else { + n.checked_sub(self.builder.length_adjustment as usize) + }; + + let n = n.ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + ) + })?; + + // Reserve capacity in the destination buffer to fit the frame and + // length field (plus adjustment). + dst.reserve(self.builder.length_field_len + n); + + if self.builder.length_field_is_big_endian { + dst.put_uint_be(n as u64, self.builder.length_field_len); + } else { + dst.put_uint_le(n as u64, self.builder.length_field_len); + } + + // Write the frame to the buffer + dst.extend_from_slice(&data[..]); + + Ok(()) + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited codec builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new() -> Builder { + Builder { + // Default max frame length of 8MB + max_frame_len: 8 * 1_024 * 1_024, + + // Default byte length of 4 + length_field_len: 4, + + // Default to the header field being at the start of the header. + length_field_offset: 0, + + length_adjustment: 0, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: None, + + // Default to reading the length field in network (big) endian. + length_field_is_big_endian: true, + } + } + + /// Read the length field as a big endian integer + /// + /// This is the default setting. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .big_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn big_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = true; + self + } + + /// Read the length field as a little endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .little_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn little_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = false; + self + } + + /// Read the length field as a native endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .native_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn native_endian(&mut self) -> &mut Self { + if cfg!(target_endian = "big") { + self.big_endian() + } else { + self.little_endian() + } + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `FrameTooBig` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Sets the number of bytes used to represent the length field + /// + /// The default value is `4`. The max value is `8`. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_length(4) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_field_length(&mut self, val: usize) -> &mut Self { + assert!(val > 0 && val <= 8, "invalid length field length"); + self.length_field_len = val; + self + } + + /// Sets the number of bytes in the header before the length field + /// + /// This configuration option only applies to decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(1) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_field_offset(&mut self, val: usize) -> &mut Self { + self.length_field_offset = val; + self + } + + /// Delta between the payload length specified in the header and the real + /// payload length + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_adjustment(-2) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_adjustment(&mut self, val: isize) -> &mut Self { + self.length_adjustment = val; + self + } + + /// Sets the number of bytes to skip before reading the payload + /// + /// Default value is `length_field_len + length_field_offset` + /// + /// This configuration option only applies to decoding + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .num_skip(4) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn num_skip(&mut self, val: usize) -> &mut Self { + self.num_skip = Some(val); + self + } + + /// Create a configured length delimited `LengthDelimitedCodec` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// # pub fn main() { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_codec(); + /// # } + /// ``` + pub fn new_codec(&self) -> LengthDelimitedCodec { + LengthDelimitedCodec { + builder: *self, + state: DecodeState::Head, + } + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> + where + T: AsyncRead, + { + FramedRead::new(upstream, self.new_codec()) + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate bytes; + /// # use tokio::io::AsyncWrite; + /// # use tokio::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncWrite>(io: T) { + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> + where + T: AsyncWrite, + { + FramedWrite::new(inner, self.new_codec()) + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate bytes; + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use tokio::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + /// # let _ = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> + where + T: AsyncRead + AsyncWrite, + { + Framed::new(inner, self.new_codec()) + } + + fn num_head_bytes(&self) -> usize { + let num = self.length_field_offset + self.length_field_len; + cmp::max(num, self.num_skip.unwrap_or(0)) + } + + fn get_num_skip(&self) -> usize { + self.num_skip + .unwrap_or(self.length_field_offset + self.length_field_len) + } +} + +// ===== impl FrameTooBig ===== + +impl fmt::Debug for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameTooBig").finish() + } +} + +impl fmt::Display for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for FrameTooBig { + fn description(&self) -> &str { + "frame size too big" + } +} diff --git a/third_party/rust/tokio-0.1.22/src/codec/mod.rs b/third_party/rust/tokio-0.1.22/src/codec/mod.rs new file mode 100644 index 0000000000..b6a3bbcb0d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/codec/mod.rs @@ -0,0 +1,19 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: ../io/trait.AsyncRead.html +//! [`AsyncWrite`]: ../io/trait.AsyncWrite.html +//! [`Sink`]: https://docs.rs/futures/0.1/futures/sink/trait.Sink.html +//! [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html +//! [transports]: https://tokio.rs/docs/going-deeper/frames/ + +pub use tokio_codec::{ + BytesCodec, Decoder, Encoder, Framed, FramedParts, FramedRead, FramedWrite, LinesCodec, +}; + +pub mod length_delimited; + +pub use self::length_delimited::LengthDelimitedCodec; diff --git a/third_party/rust/tokio-0.1.22/src/executor/current_thread/mod.rs b/third_party/rust/tokio-0.1.22/src/executor/current_thread/mod.rs new file mode 100644 index 0000000000..6036aa997b --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/executor/current_thread/mod.rs @@ -0,0 +1,170 @@ +#![allow(deprecated)] + +//! Execute many tasks concurrently on the current thread. +//! +//! [`CurrentThread`] is an executor that keeps tasks on the same thread that +//! they were spawned from. This allows it to execute futures that are not +//! `Send`. +//! +//! A single [`CurrentThread`] instance is able to efficiently manage a large +//! number of tasks and will attempt to schedule all tasks fairly. +//! +//! All tasks that are being managed by a [`CurrentThread`] executor are able to +//! spawn additional tasks by calling [`spawn`]. This function only works from +//! within the context of a running [`CurrentThread`] instance. +//! +//! The easiest way to start a new [`CurrentThread`] executor is to call +//! [`block_on_all`] with an initial task to seed the executor. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread; +//! use futures::future::lazy; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! current_thread::block_on_all(lazy(|| { +//! // The execution context is setup, futures may be executed. +//! current_thread::spawn(lazy(|| { +//! println!("called from the current thread executor"); +//! Ok(()) +//! })); +//! +//! Ok::<_, ()>(()) +//! })); +//! # } +//! ``` +//! +//! The `block_on_all` function will block the current thread until **all** +//! tasks that have been spawned onto the [`CurrentThread`] instance have +//! completed. +//! +//! More fine-grain control can be achieved by using [`CurrentThread`] directly. +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread::CurrentThread; +//! use futures::future::{lazy, empty}; +//! use std::time::Duration; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! let mut current_thread = CurrentThread::new(); +//! +//! // Spawn a task, the task is not executed yet. +//! current_thread.spawn(lazy(|| { +//! println!("Spawning a task"); +//! Ok(()) +//! })); +//! +//! // Spawn a task that never completes +//! current_thread.spawn(empty()); +//! +//! // Run the executor, but only until the provided future completes. This +//! // provides the opportunity to start executing previously spawned tasks. +//! let res = current_thread.block_on(lazy(|| { +//! Ok::<_, ()>("Hello") +//! })).unwrap(); +//! +//! // Now, run the executor for *at most* 1 second. Since a task was spawned +//! // that never completes, this function will return with an error. +//! current_thread.run_timeout(Duration::from_secs(1)).unwrap_err(); +//! # } +//! ``` +//! +//! # Execution model +//! +//! Internally, [`CurrentThread`] maintains a queue. When one of its tasks is +//! notified, the task gets added to the queue. The executor will pop tasks from +//! the queue and call [`Future::poll`]. If the task gets notified while it is +//! being executed, it won't get re-executed until all other tasks currently in +//! the queue get polled. +//! +//! Before the task is polled, a thread-local variable referencing the current +//! [`CurrentThread`] instance is set. This enables [`spawn`] to spawn new tasks +//! onto the same executor without having to thread through a handle value. +//! +//! If the [`CurrentThread`] instance still has uncompleted tasks, but none of +//! these tasks are ready to be polled, the current thread is put to sleep. When +//! a task is notified, the thread is woken up and processing resumes. +//! +//! All tasks managed by [`CurrentThread`] remain on the current thread. When a +//! task completes, it is dropped. +//! +//! [`spawn`]: fn.spawn.html +//! [`block_on_all`]: fn.block_on_all.html +//! [`CurrentThread`]: struct.CurrentThread.html +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll + +pub use tokio_current_thread::{ + BlockError, + CurrentThread, + Entered, + Handle, + RunError, + RunTimeoutError, + TaskExecutor, + Turn, + TurnError, + block_on_all, + spawn, +}; + +use std::cell::Cell; +use std::marker::PhantomData; + +use futures::future::{self}; + +#[deprecated(since = "0.1.2", note = "use block_on_all instead")] +#[doc(hidden)] +#[derive(Debug)] +pub struct Context<'a> { + cancel: Cell<bool>, + _p: PhantomData<&'a ()>, +} + +impl<'a> Context<'a> { + /// Cancels *all* executing futures. + pub fn cancel_all_spawned(&self) { + self.cancel.set(true); + } +} + +#[deprecated(since = "0.1.2", note = "use block_on_all instead")] +#[doc(hidden)] +pub fn run<F, R>(f: F) -> R + where F: FnOnce(&mut Context) -> R +{ + let mut context = Context { + cancel: Cell::new(false), + _p: PhantomData, + }; + + let mut current_thread = CurrentThread::new(); + + let ret = current_thread + .block_on(future::lazy(|| Ok::<_, ()>(f(&mut context)))) + .unwrap(); + + if context.cancel.get() { + return ret; + } + + current_thread.run().unwrap(); + ret +} + +#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")] +#[doc(hidden)] +pub fn task_executor() -> TaskExecutor { + TaskExecutor::current() +} + diff --git a/third_party/rust/tokio-0.1.22/src/executor/mod.rs b/third_party/rust/tokio-0.1.22/src/executor/mod.rs new file mode 100644 index 0000000000..8331e821f9 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/executor/mod.rs @@ -0,0 +1,145 @@ +//! Task execution utilities. +//! +//! In the Tokio execution model, futures are lazy. When a future is created, no +//! work is performed. In order for the work defined by the future to happen, +//! the future must be submitted to an executor. A future that is submitted to +//! an executor is called a "task". +//! +//! The executor is responsible for ensuring that [`Future::poll`] is +//! called whenever the task is [notified]. Notification happens when the +//! internal state of a task transitions from "not ready" to ready. For +//! example, a socket might have received data and a call to `read` will now be +//! able to succeed. +//! +//! The specific strategy used to manage the tasks is left up to the +//! executor. There are two main flavors of executors: single-threaded and +//! multi-threaded. Tokio provides implementation for both of these in the +//! [`runtime`] module. +//! +//! # `Executor` trait. +//! +//! This module provides the [`Executor`] trait (re-exported from +//! [`tokio-executor`]), which describes the API that all executors must +//! implement. +//! +//! A free [`spawn`] function is provided that allows spawning futures onto the +//! default executor (tracked via a thread-local variable) without referencing a +//! handle. It is expected that all executors will set a value for the default +//! executor. This value will often be set to the executor itself, but it is +//! possible that the default executor might be set to a different executor. +//! +//! For example, a single threaded executor might set the default executor to a +//! thread pool instead of itself, allowing futures to spawn new tasks onto the +//! thread pool when those tasks are `Send`. +//! +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll +//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify +//! [`runtime`]: ../runtime/index.html +//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1 +//! [`Executor`]: trait.Executor.html +//! [`spawn`]: fn.spawn.html + +#[deprecated( + since = "0.1.8", + note = "use tokio-current-thread crate or functions in tokio::runtime::current_thread instead", +)] +#[doc(hidden)] +pub mod current_thread; + +#[deprecated(since = "0.1.8", note = "use tokio-threadpool crate instead")] +#[doc(hidden)] +/// Re-exports of [`tokio-threadpool`], deprecated in favor of the crate. +/// +/// [`tokio-threadpool`]: https://docs.rs/tokio-threadpool/0.1 +pub mod thread_pool { + pub use tokio_threadpool::{ + Builder, + Sender, + Shutdown, + ThreadPool, + }; +} + +pub use tokio_executor::{Executor, TypedExecutor, DefaultExecutor, SpawnError}; + +use futures::{Future, IntoFuture}; +use futures::future::{self, FutureResult}; + +/// Return value from the `spawn` function. +/// +/// Currently this value doesn't actually provide any functionality. However, it +/// provides a way to add functionality later without breaking backwards +/// compatibility. +/// +/// This also implements `IntoFuture` so that it can be used as the return value +/// in a `for_each` loop. +/// +/// See [`spawn`] for more details. +/// +/// [`spawn`]: fn.spawn.html +#[derive(Debug)] +pub struct Spawn(()); + +/// Spawns a future on the default executor. +/// +/// In order for a future to do work, it must be spawned on an executor. The +/// `spawn` function is the easiest way to do this. It spawns a future on the +/// [default executor] for the current execution context (tracked using a +/// thread-local variable). +/// +/// The default executor is **usually** a thread pool. +/// +/// # Examples +/// +/// In this example, a server is started and `spawn` is used to start a new task +/// that processes each received connection. +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// [default executor]: struct.DefaultExecutor.html +/// +/// # Panics +/// +/// This function will panic if the default executor is not set or if spawning +/// onto the default executor returns an error. To avoid the panic, use +/// [`DefaultExecutor`]. +/// +/// [`DefaultExecutor`]: struct.DefaultExecutor.html +pub fn spawn<F>(f: F) -> Spawn +where F: Future<Item = (), Error = ()> + 'static + Send +{ + ::tokio_executor::spawn(f); + Spawn(()) +} + +impl IntoFuture for Spawn { + type Future = FutureResult<(), ()>; + type Item = (); + type Error = (); + + fn into_future(self) -> Self::Future { + future::ok(()) + } +} diff --git a/third_party/rust/tokio-0.1.22/src/fs.rs b/third_party/rust/tokio-0.1.22/src/fs.rs new file mode 100644 index 0000000000..5d185cd0f2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/fs.rs @@ -0,0 +1,15 @@ +//! Asynchronous filesystem manipulation operations. +//! +//! This module contains basic methods and types for manipulating the contents +//! of the local filesystem from within the context of the Tokio runtime. +//! +//! Unlike *most* other Tokio APIs, the filesystem APIs **must** be used from +//! the context of the Tokio runtime as they require Tokio specific features to +//! function. + +pub use tokio_fs::OpenOptions; +pub use tokio_fs::{ + create_dir, create_dir_all, file, hard_link, metadata, os, read_dir, read_link, +}; +pub use tokio_fs::{read, write, ReadFile, WriteFile}; +pub use tokio_fs::{remove_dir, remove_file, rename, set_permissions, symlink_metadata, File}; diff --git a/third_party/rust/tokio-0.1.22/src/io.rs b/third_party/rust/tokio-0.1.22/src/io.rs new file mode 100644 index 0000000000..feb1f6b26f --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/io.rs @@ -0,0 +1,62 @@ +//! Asynchronous I/O. +//! +//! This module is the asynchronous version of `std::io`. Primarily, it +//! defines two traits, [`AsyncRead`] and [`AsyncWrite`], which extend the +//! `Read` and `Write` traits of the standard library. +//! +//! # AsyncRead and AsyncWrite +//! +//! [`AsyncRead`] and [`AsyncWrite`] must only be implemented for +//! non-blocking I/O types that integrate with the futures type system. In +//! other words, these types must never block the thread, and instead the +//! current task is notified when the I/O resource is ready. +//! +//! # Standard input and output +//! +//! Tokio provides asynchronous APIs to standard [input], [output], and [error]. +//! These APIs are very similar to the ones provided by `std`, but they also +//! implement [`AsyncRead`] and [`AsyncWrite`]. +//! +//! Unlike *most* other Tokio APIs, the standard input / output APIs +//! **must** be used from the context of the Tokio runtime as they require +//! Tokio specific features to function. +//! +//! [input]: fn.stdin.html +//! [output]: fn.stdout.html +//! [error]: fn.stderr.html +//! +//! # Utility functions +//! +//! Utilities functions are provided for working with [`AsyncRead`] / +//! [`AsyncWrite`] types. For example, [`copy`] asynchronously copies all +//! data from a source to a destination. +//! +//! # `std` re-exports +//! +//! Additionally, [`Read`], [`Write`], [`Error`], [`ErrorKind`], and +//! [`Result`] are re-exported from `std::io` for ease of use. +//! +//! [`AsyncRead`]: trait.AsyncRead.html +//! [`AsyncWrite`]: trait.AsyncWrite.html +//! [`copy`]: fn.copy.html +//! [`Read`]: trait.Read.html +//! [`Write`]: trait.Write.html +//! [`Error`]: struct.Error.html +//! [`ErrorKind`]: enum.ErrorKind.html +//! [`Result`]: type.Result.html + +pub use tokio_io::{AsyncRead, AsyncWrite}; + +// standard input, output, and error +#[cfg(feature = "fs")] +pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout}; + +// Utils +pub use tokio_io::io::{ + copy, flush, lines, read, read_exact, read_to_end, read_until, shutdown, write_all, Copy, + Flush, Lines, ReadExact, ReadHalf, ReadToEnd, ReadUntil, Shutdown, WriteAll, WriteHalf, +}; + +// Re-export io::Error so that users don't have to deal +// with conflicts when `use`ing `futures::io` and `std::io`. +pub use std::io::{Error, ErrorKind, Read, Result, Write}; diff --git a/third_party/rust/tokio-0.1.22/src/lib.rs b/third_party/rust/tokio-0.1.22/src/lib.rs new file mode 100644 index 0000000000..d4764516fe --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/lib.rs @@ -0,0 +1,138 @@ +#![doc(html_root_url = "https://docs.rs/tokio/0.1.22")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +//! A runtime for writing reliable, asynchronous, and slim applications. +//! +//! Tokio is an event-driven, non-blocking I/O platform for writing asynchronous +//! applications with the Rust programming language. At a high level, it +//! provides a few major components: +//! +//! * A multi threaded, work-stealing based task [scheduler][runtime]. +//! * A [reactor] backed by the operating system's event queue (epoll, kqueue, +//! IOCP, etc...). +//! * Asynchronous [TCP and UDP][net] sockets. +//! * Asynchronous [filesystem][fs] operations. +//! * [Timer][timer] API for scheduling work in the future. +//! +//! Tokio is built using [futures] as the abstraction for managing the +//! complexity of asynchronous programming. +//! +//! Guide level documentation is found on the [website]. +//! +//! [website]: https://tokio.rs/docs/getting-started/hello-world/ +//! [futures]: http://docs.rs/futures/0.1 +//! +//! # Examples +//! +//! A simple TCP echo server: +//! +//! ```no_run +//! extern crate tokio; +//! +//! use tokio::prelude::*; +//! use tokio::io::copy; +//! use tokio::net::TcpListener; +//! +//! fn main() { +//! // Bind the server's socket. +//! let addr = "127.0.0.1:12345".parse().unwrap(); +//! let listener = TcpListener::bind(&addr) +//! .expect("unable to bind TCP listener"); +//! +//! // Pull out a stream of sockets for incoming connections +//! let server = listener.incoming() +//! .map_err(|e| eprintln!("accept failed = {:?}", e)) +//! .for_each(|sock| { +//! // Split up the reading and writing parts of the +//! // socket. +//! let (reader, writer) = sock.split(); +//! +//! // A future that echos the data and returns how +//! // many bytes were copied... +//! let bytes_copied = copy(reader, writer); +//! +//! // ... after which we'll print what happened. +//! let handle_conn = bytes_copied.map(|amt| { +//! println!("wrote {:?} bytes", amt) +//! }).map_err(|err| { +//! eprintln!("IO error {:?}", err) +//! }); +//! +//! // Spawn the future as a concurrent task. +//! tokio::spawn(handle_conn) +//! }); +//! +//! // Start the Tokio runtime +//! tokio::run(server); +//! } +//! ``` + +macro_rules! if_runtime { + ($($i:item)*) => ($( + #[cfg(any(feature = "rt-full"))] + $i + )*) +} + +#[macro_use] +extern crate futures; + +#[cfg(feature = "io")] +extern crate bytes; +#[cfg(feature = "reactor")] +extern crate mio; +#[cfg(feature = "rt-full")] +extern crate num_cpus; +#[cfg(feature = "codec")] +extern crate tokio_codec; +#[cfg(feature = "rt-full")] +extern crate tokio_current_thread; +#[cfg(feature = "fs")] +extern crate tokio_fs; +#[cfg(feature = "io")] +extern crate tokio_io; +#[cfg(feature = "reactor")] +extern crate tokio_reactor; +#[cfg(feature = "sync")] +extern crate tokio_sync; +#[cfg(feature = "tcp")] +extern crate tokio_tcp; +#[cfg(feature = "rt-full")] +extern crate tokio_threadpool; +#[cfg(feature = "timer")] +extern crate tokio_timer; +#[cfg(feature = "udp")] +extern crate tokio_udp; +#[cfg(feature = "experimental-tracing")] +extern crate tracing_core; + +#[cfg(all(unix, feature = "uds"))] +extern crate tokio_uds; + +#[cfg(feature = "timer")] +pub mod clock; +#[cfg(feature = "codec")] +pub mod codec; +#[cfg(feature = "fs")] +pub mod fs; +#[cfg(feature = "io")] +pub mod io; +#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] +pub mod net; +pub mod prelude; +#[cfg(feature = "reactor")] +pub mod reactor; +#[cfg(feature = "sync")] +pub mod sync; +#[cfg(feature = "timer")] +pub mod timer; +pub mod util; + +if_runtime! { + extern crate tokio_executor; + pub mod executor; + pub mod runtime; + + pub use executor::spawn; + pub use runtime::run; +} diff --git a/third_party/rust/tokio-0.1.22/src/net.rs b/third_party/rust/tokio-0.1.22/src/net.rs new file mode 100644 index 0000000000..a6b425da69 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/net.rs @@ -0,0 +1,98 @@ +//! TCP/UDP/Unix bindings for `tokio`. +//! +//! This module contains the TCP/UDP/Unix networking types, similar to the standard +//! library, which can be used to implement networking protocols. +//! +//! # Organization +//! +//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP +//! * [`UdpSocket`] and [`UdpFramed`] provide functionality for communication over UDP +//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a +//! Unix Domain Stream Socket **(available on Unix only)** +//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication +//! over Unix Domain Datagram Socket **(available on Unix only)** + +//! +//! [`TcpListener`]: struct.TcpListener.html +//! [`TcpStream`]: struct.TcpStream.html +//! [`UdpSocket`]: struct.UdpSocket.html +//! [`UdpFramed`]: struct.UdpFramed.html +//! [`UnixListener`]: struct.UnixListener.html +//! [`UnixStream`]: struct.UnixStream.html +//! [`UnixDatagram`]: struct.UnixDatagram.html +//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html + +#[cfg(feature = "tcp")] +pub mod tcp { + //! TCP bindings for `tokio`. + //! + //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s + //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` + //! implements a future which returns a `TcpStream`. + //! + //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s + //! [`incoming`][incoming_method] method can be used to accept new connections. + //! It return the [`Incoming`] struct, which implements a stream which returns + //! `TcpStream`s. + //! + //! [`TcpStream`]: struct.TcpStream.html + //! [`connect`]: struct.TcpStream.html#method.connect + //! [`ConnectFuture`]: struct.ConnectFuture.html + //! [`TcpListener`]: struct.TcpListener.html + //! [incoming_method]: struct.TcpListener.html#method.incoming + //! [`Incoming`]: struct.Incoming.html + pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; +} +#[cfg(feature = "tcp")] +pub use self::tcp::{TcpListener, TcpStream}; + +#[cfg(feature = "tcp")] +#[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")] +#[doc(hidden)] +pub type ConnectFuture = self::tcp::ConnectFuture; +#[cfg(feature = "tcp")] +#[deprecated(note = "use `tokio::net::tcp::Incoming` instead")] +#[doc(hidden)] +pub type Incoming = self::tcp::Incoming; + +#[cfg(feature = "udp")] +pub mod udp { + //! UDP bindings for `tokio`. + //! + //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. + //! Reading and writing to it can be done using futures, which return the + //! [`RecvDgram`] and [`SendDgram`] structs respectively. + //! + //! For convenience it's also possible to convert raw datagrams into higher-level + //! frames. + //! + //! [`UdpSocket`]: struct.UdpSocket.html + //! [`RecvDgram`]: struct.RecvDgram.html + //! [`SendDgram`]: struct.SendDgram.html + //! [`UdpFramed`]: struct.UdpFramed.html + //! [`framed`]: struct.UdpSocket.html#method.framed + pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket}; +} +#[cfg(feature = "udp")] +pub use self::udp::{UdpFramed, UdpSocket}; + +#[cfg(feature = "udp")] +#[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")] +#[doc(hidden)] +pub type RecvDgram<T> = self::udp::RecvDgram<T>; +#[cfg(feature = "udp")] +#[deprecated(note = "use `tokio::net::udp::SendDgram` instead")] +#[doc(hidden)] +pub type SendDgram<T> = self::udp::SendDgram<T>; + +#[cfg(all(unix, feature = "uds"))] +pub mod unix { + //! Unix domain socket bindings for `tokio` (only available on unix systems). + + pub use tokio_uds::{ + ConnectFuture, Incoming, RecvDgram, SendDgram, UCred, UnixDatagram, UnixDatagramFramed, + UnixListener, UnixStream, + }; +} +#[cfg(all(unix, feature = "uds"))] +pub use self::unix::{UnixDatagram, UnixDatagramFramed, UnixListener, UnixStream}; diff --git a/third_party/rust/tokio-0.1.22/src/prelude.rs b/third_party/rust/tokio-0.1.22/src/prelude.rs new file mode 100644 index 0000000000..b1b5f85b5a --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/prelude.rs @@ -0,0 +1,28 @@ +//! A "prelude" for users of the `tokio` crate. +//! +//! This prelude is similar to the standard library's prelude in that you'll +//! almost always want to import its entire contents, but unlike the standard +//! library's prelude you'll have to do so manually: +//! +//! ``` +//! use tokio::prelude::*; +//! ``` +//! +//! The prelude may grow over time as additional items see ubiquitous use. + +#[cfg(feature = "io")] +pub use tokio_io::{AsyncRead, AsyncWrite}; + +pub use util::{FutureExt, StreamExt}; + +pub use std::io::{Read, Write}; + +pub use futures::{future, stream, task, Async, AsyncSink, Future, IntoFuture, Poll, Sink, Stream}; + +#[cfg(feature = "async-await-preview")] +#[doc(inline)] +pub use tokio_futures::{ + io::{AsyncReadExt, AsyncWriteExt}, + sink::SinkExt, + stream::StreamExt as StreamAsyncExt, +}; diff --git a/third_party/rust/tokio-0.1.22/src/reactor/mod.rs b/third_party/rust/tokio-0.1.22/src/reactor/mod.rs new file mode 100644 index 0000000000..0e3f4eaabb --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/reactor/mod.rs @@ -0,0 +1,144 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ```rust +//! # extern crate tokio; +//! # fn dox() { +//! use tokio::prelude::*; +//! use tokio::net::TcpStream; +//! +//! let addr = "93.184.216.34:9243".parse().unwrap(); +//! +//! let connect_future = TcpStream::connect(&addr); +//! +//! let task = connect_future +//! .and_then(|socket| { +//! println!("successfully connected"); +//! Ok(()) +//! }) +//! .map_err(|e| println!("failed to connect; err={:?}", e)); +//! +//! tokio::run(task); +//! # } +//! # fn main() {} +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Lazy registration +//! +//! Notice how the snippet above does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with a reactor, +//! but no reactor is specified. This works because the registration process +//! mentioned above is actually lazy. It doesn't *actually* happen in the +//! [`connect`] function. Instead, the registration is established the first +//! time that the task is polled (again, see [runtime model]). +//! +//! A reactor instance is automatically made available when using the Tokio +//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor +//! sets a thread-local variable referencing the associated [`Reactor`] instance +//! and [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## Implementation +//! +//! The reactor implementation uses [`mio`] to interface with the operating +//! system's event queue. A call to [`Reactor::poll`] results in a single +//! call to [`Poll::poll`] which in turn results in a single call to the +//! operating system's selector. +//! +//! The reactor maintains state for each registered I/O resource. This tracks +//! the executor task to notify when events are provided by the operating +//! system's selector. This state is stored in a `Sync` data structure and +//! referenced by [`Registration`]. When the [`Registration`] instance is +//! dropped, this state is cleaned up. Because the state is stored in a `Sync` +//! data structure, the [`Registration`] instance is able to be moved to other +//! threads. +//! +//! By default, a runtime's default reactor runs on a background thread. This +//! ensures that application code cannot significantly impact the reactor's +//! responsiveness. +//! +//! ## Integrating with the reactor +//! +//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that +//! automatically integrate with the reactor. However, library authors or +//! applications may wish to implement their own resources that are also backed +//! by the reactor. +//! +//! There are a couple of ways to do this. +//! +//! If the custom I/O resource implements [`mio::Evented`] and implements +//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the +//! most suited. +//! +//! Otherwise, [`Registration`] can be used directly. This provides the lowest +//! level primitive needed for integrating with the reactor: a stream of +//! readiness events. +//! +//! [`Reactor`]: struct.Reactor.html +//! [`Registration`]: struct.Registration.html +//! [runtime model]: https://tokio.rs/docs/getting-started/runtime-model/ +//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html +//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx +//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect +//! [`connect`]: ../net/struct.TcpStream.html#method.connect +//! [connect-future]: ../net/struct.ConnectFuture.html +//! [`tokio::run`]: ../runtime/fn.run.html +//! [`TcpStream`]: ../net/struct.TcpStream.html +//! [runtime]: ../runtime +//! [`Handle::current`]: struct.Handle.html#method.current +//! [`mio`]: https://github.com/carllerche/mio +//! [`Reactor::poll`]: struct.Reactor.html#method.poll +//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll +//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +//! [`PollEvented`]: struct.PollEvented.html +//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +pub use tokio_reactor::{ + Background, Handle, PollEvented as PollEvented2, Reactor, Registration, Turn, +}; + +mod poll_evented; +#[allow(deprecated)] +pub use self::poll_evented::PollEvented; diff --git a/third_party/rust/tokio-0.1.22/src/reactor/poll_evented.rs b/third_party/rust/tokio-0.1.22/src/reactor/poll_evented.rs new file mode 100644 index 0000000000..74e5d2ed88 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/reactor/poll_evented.rs @@ -0,0 +1,547 @@ +//! Readiness tracking streams, backing I/O objects. +//! +//! This module contains the core type which is used to back all I/O on object +//! in `tokio-core`. The `PollEvented` type is the implementation detail of +//! all I/O. Each `PollEvented` manages registration with a reactor, +//! acquisition of a token, and tracking of the readiness state on the +//! underlying I/O primitive. + +#![allow(deprecated, warnings)] + +use std::fmt; +use std::io::{self, Read, Write}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Mutex; + +use futures::{task, Async, Poll}; +use mio::event::Evented; +use mio::Ready; +use tokio_io::{AsyncRead, AsyncWrite}; + +use reactor::{Handle, Registration}; + +#[deprecated(since = "0.1.2", note = "PollEvented2 instead")] +#[doc(hidden)] +pub struct PollEvented<E> { + io: E, + inner: Inner, + handle: Handle, +} + +struct Inner { + registration: Mutex<Registration>, + + /// Currently visible read readiness + read_readiness: AtomicUsize, + + /// Currently visible write readiness + write_readiness: AtomicUsize, +} + +impl<E: fmt::Debug> fmt::Debug for PollEvented<E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PollEvented").field("io", &self.io).finish() + } +} + +impl<E> PollEvented<E> { + /// Creates a new readiness stream associated with the provided + /// `loop_handle` and for the given `source`. + pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> + where + E: Evented, + { + let registration = Registration::new(); + registration.register(&io)?; + + Ok(PollEvented { + io: io, + inner: Inner { + registration: Mutex::new(registration), + read_readiness: AtomicUsize::new(0), + write_readiness: AtomicUsize::new(0), + }, + handle: handle.clone(), + }) + } + + /// Tests to see if this source is ready to be read from or not. + /// + /// If this stream is not ready for a read then `Async::NotReady` will be + /// returned and the current task will be scheduled to receive a + /// notification when the stream is readable again. In other words, this + /// method is only safe to call from within the context of a future's task, + /// typically done in a `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::readable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_read(&mut self) -> Async<()> { + if self.poll_read2().is_ready() { + return ().into(); + } + + Async::NotReady + } + + fn poll_read2(&self) -> Async<Ready> { + let r = self.inner.registration.lock().unwrap(); + + // Load the cached readiness + match self.inner.read_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = r.take_read_ready().unwrap() { + n |= ready2usize(ready); + self.inner.read_readiness.store(n, Relaxed); + } + + return usize2ready(n).into(); + } + } + + let ready = match r.poll_read_ready().unwrap() { + Async::Ready(r) => r, + _ => return Async::NotReady, + }; + + // Cache the value + self.inner.read_readiness.store(ready2usize(ready), Relaxed); + + ready.into() + } + + /// Tests to see if this source is ready to be written to or not. + /// + /// If this stream is not ready for a write then `Async::NotReady` will be returned + /// and the current task will be scheduled to receive a notification when + /// the stream is writable again. In other words, this method is only safe + /// to call from within the context of a future's task, typically done in a + /// `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::writable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_write(&mut self) -> Async<()> { + let r = self.inner.registration.lock().unwrap(); + + match self.inner.write_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = r.take_write_ready().unwrap() { + n |= ready2usize(ready); + self.inner.write_readiness.store(n, Relaxed); + } + + return ().into(); + } + } + + let ready = match r.poll_write_ready().unwrap() { + Async::Ready(r) => r, + _ => return Async::NotReady, + }; + + // Cache the value + self.inner + .write_readiness + .store(ready2usize(ready), Relaxed); + + ().into() + } + + /// Test to see whether this source fulfills any condition listed in `mask` + /// provided. + /// + /// The `mask` given here is a mio `Ready` set of possible events. This can + /// contain any events like read/write but also platform-specific events + /// such as hup and error. The `mask` indicates events that are interested + /// in being ready. + /// + /// If any event in `mask` is ready then it is returned through + /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty + /// and contains all events that are currently ready in the `mask` provided. + /// + /// If no events are ready in the `mask` provided then the current task is + /// scheduled to receive a notification when any of them become ready. If + /// the `writable` event is contained within `mask` then this + /// `PollEvented`'s `write` task will be blocked and otherwise the `read` + /// task will be blocked. This is generally only relevant if you're working + /// with this `PollEvented` object on multiple tasks. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> { + let mut ret = Ready::empty(); + + if mask.is_empty() { + return ret.into(); + } + + if mask.is_writable() { + if self.poll_write().is_ready() { + ret = Ready::writable(); + } + } + + let mask = mask - Ready::writable(); + + if !mask.is_empty() { + if let Async::Ready(v) = self.poll_read2() { + ret |= v & mask; + } + } + + if ret.is_empty() { + if mask.is_writable() { + let _ = self.need_write(); + } + + if mask.is_readable() { + let _ = self.need_read(); + } + + Async::NotReady + } else { + ret.into() + } + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer readable, but it needs to be. + /// + /// This function, like `poll_read`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// readable, typically because a "would block" error was seen. + /// + /// *All* readiness bits associated with this stream except the writable bit + /// will be reset when this method is called. The current task is then + /// scheduled to receive a notification whenever anything changes other than + /// the writable bit. Note that this typically just means the readable bit + /// is used here, but if you're using a custom I/O object for events like + /// hup/error this may also be relevant. + /// + /// Note that it is also only valid to call this method if `poll_read` + /// previously indicated that the object is readable. That is, this function + /// must always be paired with calls to `poll_read` previously. + /// + /// # Errors + /// + /// This function will return an error if the `Reactor` that this `PollEvented` + /// is associated with has gone away (been destroyed). The error means that + /// the ambient futures task could not be scheduled to receive a + /// notification and typically means that the error should be propagated + /// outwards. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_read(&mut self) -> io::Result<()> { + self.inner.read_readiness.store(0, Relaxed); + + if self.poll_read().is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer writable, but it needs to be. + /// + /// This function, like `poll_write`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// writable, typically because a "would block" error was seen. + /// + /// The flag indicating that this stream is writable is unset and the + /// current task is scheduled to receive a notification when the stream is + /// then again writable. + /// + /// Note that it is also only valid to call this method if `poll_write` + /// previously indicated that the object is writable. That is, this function + /// must always be paired with calls to `poll_write` previously. + /// + /// # Errors + /// + /// This function will return an error if the `Reactor` that this `PollEvented` + /// is associated with has gone away (been destroyed). The error means that + /// the ambient futures task could not be scheduled to receive a + /// notification and typically means that the error should be propagated + /// outwards. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_write(&mut self) -> io::Result<()> { + self.inner.write_readiness.store(0, Relaxed); + + if self.poll_write().is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Returns a reference to the event loop handle that this readiness stream + /// is associated with. + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Returns a shared reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_ref(&self) -> &E { + &self.io + } + + /// Returns a mutable reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_mut(&mut self) -> &mut E { + &mut self.io + } + + /// Consumes the `PollEvented` and returns the underlying I/O object + pub fn into_inner(self) -> E { + self.io + } + + /// Deregisters this source of events from the reactor core specified. + /// + /// This method can optionally be called to unregister the underlying I/O + /// object with the event loop that the `handle` provided points to. + /// Typically this method is not required as this automatically happens when + /// `E` is dropped, but for some use cases the `E` object doesn't represent + /// an owned reference, so dropping it won't automatically unregister with + /// the event loop. + /// + /// This consumes `self` as it will no longer provide events after the + /// method is called, and will likely return an error if this `PollEvented` + /// was created on a separate event loop from the `handle` specified. + pub fn deregister(&self) -> io::Result<()> + where + E: Evented, + { + self.inner.registration.lock().unwrap().deregister(&self.io) + } +} + +impl<E: Read> Read for PollEvented<E> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_read() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + let r = self.get_mut().read(buf); + + if is_wouldblock(&r) { + self.need_read()?; + } + + return r; + } +} + +impl<E: Write> Write for PollEvented<E> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + let r = self.get_mut().write(buf); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r; + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + let r = self.get_mut().flush(); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r; + } +} + +impl<E: Read> AsyncRead for PollEvented<E> {} + +impl<E: Write> AsyncWrite for PollEvented<E> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +fn is_wouldblock<T>(r: &io::Result<T>) -> bool { + match *r { + Ok(_) => false, + Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + } +} + +const READ: usize = 1 << 0; +const WRITE: usize = 1 << 1; + +fn ready2usize(ready: Ready) -> usize { + let mut bits = 0; + if ready.is_readable() { + bits |= READ; + } + if ready.is_writable() { + bits |= WRITE; + } + bits | platform::ready2usize(ready) +} + +fn usize2ready(bits: usize) -> Ready { + let mut ready = Ready::empty(); + if bits & READ != 0 { + ready.insert(Ready::readable()); + } + if bits & WRITE != 0 { + ready.insert(Ready::writable()); + } + ready | platform::usize2ready(bits) +} + +#[cfg(unix)] +mod platform { + use mio::unix::UnixReady; + use mio::Ready; + + const HUP: usize = 1 << 2; + const ERROR: usize = 1 << 3; + const AIO: usize = 1 << 4; + const LIO: usize = 1 << 5; + + #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] + fn is_aio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_aio() + } + + #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] + fn is_aio(_ready: &Ready) -> bool { + false + } + + #[cfg(target_os = "freebsd")] + fn is_lio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_lio() + } + + #[cfg(not(target_os = "freebsd"))] + fn is_lio(_ready: &Ready) -> bool { + false + } + + pub fn ready2usize(ready: Ready) -> usize { + let ready = UnixReady::from(ready); + let mut bits = 0; + if is_aio(&ready) { + bits |= AIO; + } + if is_lio(&ready) { + bits |= LIO; + } + if ready.is_error() { + bits |= ERROR; + } + if ready.is_hup() { + bits |= HUP; + } + bits + } + + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + fn usize2ready_aio(ready: &mut UnixReady) { + ready.insert(UnixReady::aio()); + } + + #[cfg(not(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + )))] + fn usize2ready_aio(_ready: &mut UnixReady) { + // aio not available here → empty + } + + #[cfg(target_os = "freebsd")] + fn usize2ready_lio(ready: &mut UnixReady) { + ready.insert(UnixReady::lio()); + } + + #[cfg(not(target_os = "freebsd"))] + fn usize2ready_lio(_ready: &mut UnixReady) { + // lio not available here → empty + } + + pub fn usize2ready(bits: usize) -> Ready { + let mut ready = UnixReady::from(Ready::empty()); + if bits & AIO != 0 { + usize2ready_aio(&mut ready); + } + if bits & LIO != 0 { + usize2ready_lio(&mut ready); + } + if bits & HUP != 0 { + ready.insert(UnixReady::hup()); + } + if bits & ERROR != 0 { + ready.insert(UnixReady::error()); + } + ready.into() + } +} + +#[cfg(windows)] +mod platform { + use mio::Ready; + + pub fn all() -> Ready { + // No platform-specific Readinesses for Windows + Ready::empty() + } + + pub fn hup() -> Ready { + Ready::empty() + } + + pub fn ready2usize(_r: Ready) -> usize { + 0 + } + + pub fn usize2ready(_r: usize) -> Ready { + Ready::empty() + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/current_thread/async_await.rs b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/async_await.rs new file mode 100644 index 0000000000..7a25994147 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/async_await.rs @@ -0,0 +1,17 @@ +use super::Runtime; +use std::future::Future; + +impl Runtime { + /// Like `block_on`, but takes an `async` block + pub fn block_on_async<F>(&mut self, future: F) -> F::Output + where + F: Future, + { + use tokio_futures::compat; + + match self.block_on(compat::infallible_into_01(future)) { + Ok(v) => v, + Err(_) => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/current_thread/builder.rs b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/builder.rs new file mode 100644 index 0000000000..72960fadf2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/builder.rs @@ -0,0 +1,88 @@ +use executor::current_thread::CurrentThread; +use runtime::current_thread::Runtime; + +use tokio_reactor::Reactor; +use tokio_timer::clock::Clock; +use tokio_timer::timer::Timer; + +use std::io; + +/// Builds a Single-threaded runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// extern crate tokio; +/// extern crate tokio_timer; +/// +/// use tokio::runtime::current_thread::Builder; +/// use tokio_timer::clock::Clock; +/// +/// # pub fn main() { +/// // build Runtime +/// let runtime = Builder::new() +/// .clock(Clock::new()) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + Builder { + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Create the configured `Runtime`. + pub fn build(&mut self) -> io::Result<Runtime> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new_with_now(reactor, self.clock.clone()); + let timer_handle = timer.handle(); + + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor to generate some new stimuli for the + // futures to continue in their life. + let executor = CurrentThread::new_with_park(timer); + + let runtime = Runtime::new2( + reactor_handle, + timer_handle, + self.clock.clone(), + executor); + + Ok(runtime) + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/current_thread/mod.rs b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/mod.rs new file mode 100644 index 0000000000..eb0358df0c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/mod.rs @@ -0,0 +1,110 @@ +//! A runtime implementation that runs everything on the current thread. +//! +//! [`current_thread::Runtime`][rt] is similar to the primary +//! [`Runtime`][concurrent-rt] except that it runs all components on the current +//! thread instead of using a thread pool. This means that it is able to spawn +//! futures that do not implement `Send`. +//! +//! Same as the default [`Runtime`][concurrent-rt], the +//! [`current_thread::Runtime`][rt] includes: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself +//! and cannot be safely moved to other threads. +//! +//! # Spawning from other threads +//! +//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot +//! safely be moved to other threads, it provides a `Handle` that can be sent +//! to other threads and allows to spawn new tasks from there. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! use std::thread; +//! +//! # fn main() { +//! let mut runtime = Runtime::new().unwrap(); +//! let handle = runtime.handle(); +//! +//! thread::spawn(move || { +//! handle.spawn(future::ok(())); +//! }).join().unwrap(); +//! +//! # /* +//! runtime.run().unwrap(); +//! # */ +//! # } +//! ``` +//! +//! # Examples +//! +//! Creating a new `Runtime` and running a future `f` until its completion and +//! returning its result. +//! +//! ``` +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! +//! let mut runtime = Runtime::new().unwrap(); +//! +//! // Use the runtime... +//! // runtime.block_on(f); // where f is a future +//! ``` +//! +//! [rt]: struct.Runtime.html +//! [concurrent-rt]: ../struct.Runtime.html +//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html +//! [reactor]: ../../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../../timer/index.html + +mod builder; +mod runtime; + +#[cfg(feature = "async-await-preview")] +mod async_await; + +pub use self::builder::Builder; +pub use self::runtime::{Runtime, Handle}; +pub use tokio_current_thread::spawn; +pub use tokio_current_thread::TaskExecutor; + +use futures::Future; + +/// Run the provided future to completion using a runtime running on the current thread. +/// +/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future, +/// which blocks the current thread until the provided future completes. It then calls +/// [`Runtime::run`] to wait for any other spawned futures to resolve. +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where + F: Future, +{ + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + let v = r.block_on(future)?; + r.run().expect("failed to resolve remaining futures"); + Ok(v) +} + +/// Start a current-thread runtime using the supplied future to bootstrap execution. +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +pub fn run<F>(future: F) +where + F: Future<Item = (), Error = ()> + 'static, +{ + + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + r.spawn(future); + r.run().expect("failed to resolve remaining futures"); +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/current_thread/runtime.rs b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/runtime.rs new file mode 100644 index 0000000000..e297e873dd --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/current_thread/runtime.rs @@ -0,0 +1,247 @@ +use tokio_current_thread::{self as current_thread, CurrentThread}; +use tokio_current_thread::Handle as ExecutorHandle; +use runtime::current_thread::Builder; + +use tokio_reactor::{self, Reactor}; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; +use tokio_executor; + +use futures::{future, Future}; + +use std::fmt; +use std::error::Error; +use std::io; + +/// Single-threaded runtime provides a way to start reactor +/// and executor on the current thread. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +#[derive(Debug)] +pub struct Runtime { + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>, +} + +/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance +#[derive(Debug, Clone)] +pub struct Handle(ExecutorHandle); + +impl Handle { + /// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError> + where F: Future<Item = (), Error = ()> + Send + 'static { + self.0.spawn(future) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), tokio_executor::SpawnError> { + self.0.status() + } +} + +impl<T> future::Executor<T> for Handle +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = self.status() { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } +} + +impl<T> ::executor::TypedExecutor<T> for Handle +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), ::executor::SpawnError> { + Handle::spawn(self, future) + } +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + inner: current_thread::RunError, +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.inner) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + self.inner.description() + } + + // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30, + // replace this with Error::source. + #[allow(deprecated)] + fn cause(&self) -> Option<&Error> { + self.inner.cause() + } +} + +impl Runtime { + /// Returns a new runtime initialized with default configuration values. + pub fn new() -> io::Result<Runtime> { + Builder::new().build() + } + + pub(super) fn new2( + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>) -> Runtime + { + Runtime { + reactor_handle, + timer_handle, + clock, + executor, + } + } + + /// Get a new handle to spawn futures on the single-threaded Tokio runtime + /// + /// Different to the runtime itself, the handle can be sent to different + /// threads. + pub fn handle(&self) -> Handle { + Handle(self.executor.handle().clone()) + } + + /// Spawn a future onto the single-threaded Tokio runtime. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::current_thread::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("running on the runtime"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + 'static, + { + self.executor.spawn(future); + self + } + + /// Runs the provided future, blocking the current thread until the future + /// completes. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. Once the function returns, any uncompleted futures + /// remain pending in the `Runtime` instance. These futures will not run + /// until `block_on` or `run` is called again. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution by calling `block_on` or `run`. + pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error> + where F: Future + { + self.enter(|executor| { + // Run the provided future + let ret = executor.block_on(f); + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + }) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.enter(|executor| executor.run()) + .map_err(|e| RunError { + inner: e, + }) + } + + fn enter<F, R>(&mut self, f: F) -> R + where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R + { + let Runtime { + ref reactor_handle, + ref timer_handle, + ref clock, + ref mut executor, + .. + } = *self; + + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + + // This will set the default handle and timer to use inside the closure + // and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + clock::with_default(clock, enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the + // current single-threaded executor when used. This is a trick, + // because we need two mutable references to the executor (one + // to run the provided future, another to install as the default + // one). We use the fake one here as the default one. + let mut default_executor = current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + f(&mut executor) + }) + }) + }) + }) + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/mod.rs b/third_party/rust/tokio-0.1.22/src/runtime/mod.rs new file mode 100644 index 0000000000..759616829e --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/mod.rs @@ -0,0 +1,125 @@ +//! A batteries included runtime for applications using Tokio. +//! +//! Applications using Tokio require some runtime support in order to work: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! While it is possible to setup each component manually, this involves a bunch +//! of boilerplate. +//! +//! [`Runtime`] bundles all of these various runtime components into a single +//! handle that can be started and shutdown together, eliminating the necessary +//! boilerplate to run a Tokio application. +//! +//! Most applications wont need to use [`Runtime`] directly. Instead, they will +//! use the [`run`] function, which uses [`Runtime`] under the hood. +//! +//! Creating a [`Runtime`] does the following: +//! +//! * Spawn a background thread running a [`Reactor`] instance. +//! * Start a [`ThreadPool`] for executing futures. +//! * Run an instance of [`Timer`] **per** thread pool worker thread. +//! +//! The thread pool uses a work-stealing strategy and is configured to start a +//! worker thread for each CPU core available on the system. This tends to be +//! the ideal setup for Tokio applications. +//! +//! A timer per thread pool worker thread is used to minimize the amount of +//! synchronization that is required for working with the timer. +//! +//! # Usage +//! +//! Most applications will use the [`run`] function. This takes a future to +//! "seed" the application, blocking the thread until the runtime becomes +//! [idle]. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! tokio::run(server); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! In this function, the `run` function blocks until the runtime becomes idle. +//! See [`shutdown_on_idle`][idle] for more shutdown details. +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::runtime::Runtime; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! // Create the runtime +//! let mut rt = Runtime::new().unwrap(); +//! +//! // Spawn the server task +//! rt.spawn(server); +//! +//! // Wait until the runtime becomes idle and shut it down. +//! rt.shutdown_on_idle() +//! .wait().unwrap(); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! [reactor]: ../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../timer/index.html +//! [`Runtime`]: struct.Runtime.html +//! [`Reactor`]: ../reactor/struct.Reactor.html +//! [`ThreadPool`]: https://docs.rs/tokio-threadpool/0.1/tokio_threadpool/struct.ThreadPool.html +//! [`run`]: fn.run.html +//! [idle]: struct.Runtime.html#method.shutdown_on_idle +//! [`tokio::spawn`]: ../executor/fn.spawn.html +//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html + +pub mod current_thread; +mod threadpool; + +pub use self::threadpool::{ + Builder, + Runtime, + Shutdown, + TaskExecutor, + run, +}; + diff --git a/third_party/rust/tokio-0.1.22/src/runtime/threadpool/async_await.rs b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/async_await.rs new file mode 100644 index 0000000000..12bdb1a11a --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/async_await.rs @@ -0,0 +1,18 @@ +use super::Runtime; +use std::future::Future; + +impl Runtime { + /// Like `block_on`, but takes an `async` block + pub fn block_on_async<F>(&mut self, future: F) -> F::Output + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + use tokio_futures::compat; + + match self.block_on(compat::infallible_into_01(future)) { + Ok(v) => v, + Err(_) => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/threadpool/builder.rs b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/builder.rs new file mode 100644 index 0000000000..68626b09a7 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/builder.rs @@ -0,0 +1,418 @@ +use super::{Inner, Runtime}; + +use reactor::Reactor; + +use std::io; +use std::sync::Mutex; +use std::time::Duration; +use std::any::Any; + +use num_cpus; +use tokio_reactor; +use tokio_threadpool::Builder as ThreadPoolBuilder; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; + +#[cfg(feature = "experimental-tracing")] +use tracing_core as trace; + +/// Builds Tokio Runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// extern crate tokio; +/// extern crate tokio_timer; +/// +/// use std::time::Duration; +/// +/// use tokio::runtime::Builder; +/// use tokio_timer::clock::Clock; +/// +/// fn main() { +/// // build Runtime +/// let mut runtime = Builder::new() +/// .blocking_threads(4) +/// .clock(Clock::system()) +/// .core_threads(4) +/// .keep_alive(Some(Duration::from_secs(60))) +/// .name_prefix("my-custom-name-") +/// .stack_size(3 * 1024 * 1024) +/// .build() +/// .unwrap(); +/// +/// // use runtime ... +/// } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// Thread pool specific builder + threadpool_builder: ThreadPoolBuilder, + + /// The number of worker threads + core_threads: usize, + + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + let core_threads = num_cpus::get().max(1); + + let mut threadpool_builder = ThreadPoolBuilder::new(); + threadpool_builder.name_prefix("tokio-runtime-worker-"); + threadpool_builder.pool_size(core_threads); + + Builder { + threadpool_builder, + core_threads, + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Set builder to set up the thread pool instance. + #[deprecated( + since = "0.1.9", + note = "use the `core_threads`, `blocking_threads`, `name_prefix`, \ + `keep_alive`, and `stack_size` functions on `runtime::Builder`, \ + instead")] + #[doc(hidden)] + pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self { + self.threadpool_builder = val; + self + } + + /// Sets a callback to handle panics in futures. + /// + /// The callback is triggered when a panic during a future bubbles up to + /// Tokio. By default Tokio catches these panics, and they will be ignored. + /// The parameter passed to this callback is the same error value returned + /// from `std::panic::catch_unwind()`. To abort the process on panics, use + /// `std::panic::resume_unwind()` in this callback as shown below. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .panic_handler(|err| std::panic::resume_unwind(err)) + /// .build() + /// .unwrap(); + /// # } + /// ``` + pub fn panic_handler<F>(&mut self, f: F) -> &mut Self + where + F: Fn(Box<Any + Send>) + Send + Sync + 'static, + { + self.threadpool_builder.panic_handler(f); + self + } + + + /// Set the maximum number of worker threads for the `Runtime`'s thread pool. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .core_threads(4) + /// .build() + /// .unwrap(); + /// # } + /// ``` + pub fn core_threads(&mut self, val: usize) -> &mut Self { + self.core_threads = val; + self.threadpool_builder.pool_size(val); + self + } + + /// Set the maximum number of concurrent blocking sections in the `Runtime`'s + /// thread pool. + /// + /// When the maximum concurrent `blocking` calls is reached, any further + /// calls to `blocking` will return `NotReady` and the task is notified once + /// previously in-flight calls to `blocking` return. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is 100. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .blocking_threads(200) + /// .build(); + /// # } + /// ``` + pub fn blocking_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.max_blocking(val); + self + } + + /// Set the worker thread keep alive duration for threads in the `Runtime`'s + /// thread pool. + /// + /// If set, a worker thread will wait for up to the specified duration for + /// work, at which point the thread will shutdown. When work becomes + /// available, a new thread will eventually be spawned to replace the one + /// that shut down. + /// + /// When the value is `None`, the thread will wait for work forever. + /// + /// The default value is `None`. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// use std::time::Duration; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .keep_alive(Some(Duration::from_secs(30))) + /// .build(); + /// # } + /// ``` + pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self { + self.threadpool_builder.keep_alive(val); + self + } + + /// Set name prefix of threads spawned by the `Runtime`'s thread pool. + /// + /// Thread name prefix is used for generating thread names. For example, if + /// prefix is `my-pool-`, then threads in the pool will get names like + /// `my-pool-1` etc. + /// + /// The default prefix is "tokio-runtime-worker-". + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .name_prefix("my-pool-") + /// .build(); + /// # } + /// ``` + pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { + self.threadpool_builder.name_prefix(val); + self + } + + /// Set the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .stack_size(32 * 1024) + /// .build(); + /// # } + /// ``` + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.stack_size(val); + self + } + + /// Execute function `f` after each thread is started but before it starts + /// doing work. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let thread_pool = runtime::Builder::new() + /// .after_start(|| { + /// println!("thread started"); + /// }) + /// .build(); + /// # } + /// ``` + pub fn after_start<F>(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.threadpool_builder.after_start(f); + self + } + + /// Execute function `f` before each thread stops. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let thread_pool = runtime::Builder::new() + /// .before_stop(|| { + /// println!("thread stopping"); + /// }) + /// .build(); + /// # } + /// ``` + pub fn before_stop<F>(&mut self, f: F) -> &mut Self + where F: Fn() + Send + Sync + 'static + { + self.threadpool_builder.before_stop(f); + self + } + + /// Create the configured `Runtime`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::runtime::Builder; + /// # pub fn main() { + /// let runtime = Builder::new().build().unwrap(); + /// // ... call runtime.run(...) + /// # let _ = runtime; + /// # } + /// ``` + pub fn build(&mut self) -> io::Result<Runtime> { + // TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too. + self.threadpool_builder.pool_size(self.core_threads); + + let mut reactor_handles = Vec::new(); + let mut timer_handles = Vec::new(); + let mut timers = Vec::new(); + + for _ in 0..self.core_threads { + // Create a new reactor. + let reactor = Reactor::new()?; + reactor_handles.push(reactor.handle()); + + // Create a new timer. + let timer = Timer::new_with_now(reactor, self.clock.clone()); + timer_handles.push(timer.handle()); + timers.push(Mutex::new(Some(timer))); + } + + // Get a handle to the clock for the runtime. + let clock = self.clock.clone(); + + // Get the current trace dispatcher. + // TODO(eliza): when `tracing-core` is stable enough to take a + // public API dependency, we should allow users to set a custom + // subscriber for the runtime. + #[cfg(feature = "experimental-tracing")] + let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone); + + let pool = self + .threadpool_builder + .around_worker(move |w, enter| { + let index = w.id().to_usize(); + + tokio_reactor::with_default(&reactor_handles[index], enter, |enter| { + clock::with_default(&clock, enter, |enter| { + timer::with_default(&timer_handles[index], enter, |_| { + + #[cfg(feature = "experimental-tracing")] + trace::dispatcher::with_default(&dispatch, || { + w.run(); + }); + + #[cfg(not(feature = "experimental-tracing"))] + w.run(); + }); + }) + }); + }) + .custom_park(move |worker_id| { + let index = worker_id.to_usize(); + + timers[index] + .lock() + .unwrap() + .take() + .unwrap() + }) + .build(); + + // To support deprecated `reactor()` function + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + Ok(Runtime { + inner: Some(Inner { + reactor_handle, + reactor: Mutex::new(Some(reactor)), + pool, + }), + }) + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs new file mode 100644 index 0000000000..b688a46745 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/mod.rs @@ -0,0 +1,398 @@ +mod builder; +mod shutdown; +mod task_executor; + +#[cfg(feature = "async-await-preview")] +mod async_await; + +pub use self::builder::Builder; +pub use self::shutdown::Shutdown; +pub use self::task_executor::TaskExecutor; + +use reactor::{Handle, Reactor}; + +use std::io; +use std::sync::Mutex; + +use tokio_executor::enter; +use tokio_threadpool as threadpool; + +use futures; +use futures::future::Future; + +/// Handle to the Tokio runtime. +/// +/// The Tokio runtime includes a reactor as well as an executor for running +/// tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use [`tokio::run`], which uses a `Runtime` internally. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct.Builder.html +/// [`tokio::run`]: fn.run.html +#[derive(Debug)] +pub struct Runtime { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + /// A handle to the reactor in the background thread. + reactor_handle: Handle, + + // TODO: This should go away in 0.2 + reactor: Mutex<Option<Reactor>>, + + /// Task execution pool. + pool: threadpool::ThreadPool, +} + +// ===== impl Runtime ===== + +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// This function is used to bootstrap the execution of a Tokio application. It +/// does the following: +/// +/// * Start the Tokio runtime using a default configuration. +/// * Spawn the given future onto the thread pool. +/// * Block the current thread until the runtime shuts down. +/// +/// Note that the function will not return immediately once `future` has +/// completed. Instead it waits for the entire runtime to become idle. +/// +/// See the [module level][mod] documentation for more details. +/// +/// # Examples +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +/// +/// [mod]: ../index.html +pub fn run<F>(future: F) +where F: Future<Item = (), Error = ()> + Send + 'static, +{ + // Check enter before creating a new Runtime... + let mut entered = enter().expect("nested tokio::run"); + let mut runtime = Runtime::new().expect("failed to start new Runtime"); + runtime.spawn(future); + entered + .block_on(runtime.shutdown_on_idle()) + .expect("shutdown cannot error") +} + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a reactor, thread pool, and timer being initialized. The + /// thread pool will not spawn any worker threads until it needs to, i.e. + /// tasks are scheduled to run. + /// + /// Most users will not need to call this function directly, instead they + /// will use [`tokio::run`](fn.run.html). + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn new() -> io::Result<Self> { + Builder::new().build() + } + + #[deprecated(since = "0.1.5", note = "use `reactor` instead")] + #[doc(hidden)] + pub fn handle(&self) -> &Handle { + #[allow(deprecated)] + self.reactor() + } + + /// Return a reference to the reactor handle for this runtime instance. + /// + /// The returned handle reference can be cloned in order to get an owned + /// value of the handle. This handle can be used to initialize I/O resources + /// (like TCP or UDP sockets) that will not be used on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let reactor_handle = rt.reactor().clone(); + /// + /// // use `reactor_handle` + /// ``` + #[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")] + pub fn reactor(&self) -> &Handle { + let mut reactor = self.inner().reactor.lock().unwrap(); + if let Some(reactor) = reactor.take() { + if let Ok(background) = reactor.background() { + background.forget(); + } + } + + &self.inner().reactor_handle + } + + /// Return a handle to the runtime's executor. + /// + /// The returned handle can be used to spawn tasks that run on this runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let executor_handle = rt.executor(); + /// + /// // use `executor_handle` + /// ``` + pub fn executor(&self) -> TaskExecutor { + let inner = self.inner().pool.sender().clone(); + TaskExecutor { inner } + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner_mut().pool.sender().spawn(future).unwrap(); + self + } + + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let mut entered = enter().expect("nested block_on"); + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + entered.block_on(rx).unwrap() + } + + /// Run a future to completion on the Tokio runtime, then wait for all + /// background futures to complete too. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, waiting for background futures to complete, and yielding + /// its resolved result. Any tasks or timers which the future spawns + /// internally will be executed on the runtime and waited for completion. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let mut entered = enter().expect("nested block_on_all"); + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + let block = rx + .map_err(|_| unreachable!()) + .and_then(move |r| { + self.shutdown_on_idle() + .map(move |()| r) + }); + entered.block_on(block).unwrap() + } + + /// Signals the runtime to shutdown once it becomes idle. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function can be used to perform a graceful shutdown of the runtime. + /// + /// The runtime enters an idle state once **all** of the following occur. + /// + /// * The thread pool has no tasks to execute, i.e., all tasks that were + /// spawned have completed. + /// * The reactor is not managing any I/O resources. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_on_idle() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + let inner = inner.pool.shutdown_on_idle(); + Shutdown { inner } + } + + /// Signals the runtime to shutdown immediately. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function will forcibly shutdown the runtime, causing any + /// in-progress work to become canceled. The shutdown steps are: + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + Shutdown::shutdown_now(inner) + } + + fn inner(&self) -> &Inner { + self.inner.as_ref().unwrap() + } + + fn inner_mut(&mut self) -> &mut Inner { + self.inner.as_mut().unwrap() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let shutdown = Shutdown::shutdown_now(inner); + let _ = shutdown.wait(); + } + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/threadpool/shutdown.rs b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/shutdown.rs new file mode 100644 index 0000000000..66f8146080 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/shutdown.rs @@ -0,0 +1,36 @@ +use super::Inner; +use tokio_threadpool as threadpool; + +use std::fmt; + +use futures::{Future, Poll}; + +/// A future that resolves when the Tokio `Runtime` is shut down. +pub struct Shutdown { + pub(super) inner: threadpool::Shutdown, +} + +impl Shutdown { + pub(super) fn shutdown_now(inner: Inner) -> Self { + let inner = inner.pool.shutdown_now(); + Shutdown { inner } + } +} + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + try_ready!(self.inner.poll()); + Ok(().into()) + } +} + +impl fmt::Debug for Shutdown { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Shutdown") + .field("inner", &"Box<Future<Item = (), Error = ()>>") + .finish() + } +} diff --git a/third_party/rust/tokio-0.1.22/src/runtime/threadpool/task_executor.rs b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/task_executor.rs new file mode 100644 index 0000000000..91c665820d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/runtime/threadpool/task_executor.rs @@ -0,0 +1,84 @@ + +use tokio_threadpool::Sender; + +use futures::future::{self, Future}; + +/// Executes futures on the runtime +/// +/// All futures spawned using this executor will be submitted to the associated +/// Runtime's executor. This executor is usually a thread pool. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + pub(super) inner: Sender, +} + +impl TaskExecutor { + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// let executor = rt.executor(); + /// + /// // Spawn a future onto the runtime + /// executor.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&self, future: F) + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner.spawn(future).unwrap(); + } +} + +impl<T> future::Executor<T> for TaskExecutor +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + self.inner.execute(future) + } +} + +impl ::executor::Executor for TaskExecutor { + fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) + -> Result<(), ::executor::SpawnError> + { + self.inner.spawn(future) + } +} + +impl<T> ::executor::TypedExecutor<T> for TaskExecutor +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), ::executor::SpawnError> { + ::executor::Executor::spawn(self, Box::new(future)) + } +} diff --git a/third_party/rust/tokio-0.1.22/src/sync.rs b/third_party/rust/tokio-0.1.22/src/sync.rs new file mode 100644 index 0000000000..c8fb752413 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/sync.rs @@ -0,0 +1,16 @@ +//! Future-aware synchronization +//! +//! This module is enabled with the **`sync`** feature flag. +//! +//! Tasks sometimes need to communicate with each other. This module contains +//! two basic abstractions for doing so: +//! +//! - [oneshot](oneshot/index.html), a way of sending a single value +//! from one task to another. +//! - [mpsc](mpsc/index.html), a multi-producer, single-consumer channel for +//! sending values between tasks. +//! - [lock](lock/index.html), an asynchronous `Mutex`-like type. +//! - [watch](watch/index.html), a single-producer, multi-consumer channel that +//! only stores the **most recently** sent value. + +pub use tokio_sync::{lock, mpsc, oneshot, watch}; diff --git a/third_party/rust/tokio-0.1.22/src/timer.rs b/third_party/rust/tokio-0.1.22/src/timer.rs new file mode 100644 index 0000000000..888e7a9db1 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/timer.rs @@ -0,0 +1,94 @@ +//! Utilities for tracking time. +//! +//! This module provides a number of types for executing code after a set period +//! of time. +//! +//! * [`Delay`][Delay] is a future that does no work and completes at a specific `Instant` +//! in time. +//! +//! * [`Interval`][Interval] is a stream yielding a value at a fixed period. It +//! is initialized with a `Duration` and repeatedly yields each time the +//! duration elapses. +//! +//! * [`Timeout`][Timeout]: Wraps a future or stream, setting an upper bound to the +//! amount of time it is allowed to execute. If the future or stream does not +//! complete in time, then it is canceled and an error is returned. +//! +//! * [`DelayQueue`]: A queue where items are returned once the requested delay +//! has expired. +//! +//! These types are sufficient for handling a large number of scenarios +//! involving time. +//! +//! These types must be used from within the context of the +//! [`Runtime`][runtime] or a timer context must be setup explicitly. See the +//! [`tokio-timer`][tokio-timer] crate for more details on how to setup a timer +//! context. +//! +//! # Examples +//! +//! Wait 100ms and print "Hello World!" +//! +//! ``` +//! use tokio::prelude::*; +//! use tokio::timer::Delay; +//! +//! use std::time::{Duration, Instant}; +//! +//! let when = Instant::now() + Duration::from_millis(100); +//! +//! tokio::run({ +//! Delay::new(when) +//! .map_err(|e| panic!("timer failed; err={:?}", e)) +//! .and_then(|_| { +//! println!("Hello world!"); +//! Ok(()) +//! }) +//! }) +//! ``` +//! +//! Require that an operation takes no more than 300ms. Note that this uses the +//! [`timeout`][ext] function on the [`FutureExt`][ext] trait. This trait is +//! included in the prelude. +//! +//! ``` +//! # extern crate futures; +//! # extern crate tokio; +//! use tokio::prelude::*; +//! +//! use std::time::{Duration, Instant}; +//! +//! fn long_op() -> Box<Future<Item = (), Error = ()> + Send> { +//! // ... +//! # Box::new(futures::future::ok(())) +//! } +//! +//! # fn main() { +//! tokio::run({ +//! long_op() +//! .timeout(Duration::from_millis(300)) +//! .map_err(|e| { +//! println!("operation timed out"); +//! }) +//! }) +//! # } +//! ``` +//! +//! [runtime]: ../runtime/struct.Runtime.html +//! [tokio-timer]: https://docs.rs/tokio-timer +//! [ext]: ../util/trait.FutureExt.html#method.timeout +//! [Timeout]: struct.Timeout.html +//! [Delay]: struct.Delay.html +//! [Interval]: struct.Interval.html +//! [`DelayQueue`]: struct.DelayQueue.html + +pub use tokio_timer::{delay_queue, timeout, Delay, DelayQueue, Error, Interval, Timeout}; + +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type Deadline<T> = ::tokio_timer::Deadline<T>; +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type DeadlineError<T> = ::tokio_timer::DeadlineError<T>; diff --git a/third_party/rust/tokio-0.1.22/src/util/enumerate.rs b/third_party/rust/tokio-0.1.22/src/util/enumerate.rs new file mode 100644 index 0000000000..8f6926fa4f --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/enumerate.rs @@ -0,0 +1,84 @@ +use futures::{Async, Poll, Sink, StartSend, Stream}; + +/// A stream combinator which combines the yields the current item +/// plus its count starting from 0. +/// +/// This structure is produced by the `Stream::enumerate` method. +#[derive(Debug)] +#[must_use = "Does nothing unless polled"] +pub struct Enumerate<T> { + inner: T, + count: usize, +} + +impl<T> Enumerate<T> { + pub(crate) fn new(stream: T) -> Self { + Self { + inner: stream, + count: 0, + } + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<T> Stream for Enumerate<T> +where + T: Stream, +{ + type Item = (usize, T::Item); + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, T::Error> { + match try_ready!(self.inner.poll()) { + Some(item) => { + let ret = Some((self.count, item)); + self.count += 1; + Ok(Async::Ready(ret)) + } + None => return Ok(Async::Ready(None)), + } + } +} + +// Forwarding impl of Sink from the underlying stream +impl<T> Sink for Enumerate<T> +where + T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), T::SinkError> { + self.inner.close() + } +} diff --git a/third_party/rust/tokio-0.1.22/src/util/future.rs b/third_party/rust/tokio-0.1.22/src/util/future.rs new file mode 100644 index 0000000000..5a3818101c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/future.rs @@ -0,0 +1,93 @@ +#[cfg(feature = "timer")] +#[allow(deprecated)] +use tokio_timer::Deadline; +#[cfg(feature = "timer")] +use tokio_timer::Timeout; + +use futures::Future; + +#[cfg(feature = "timer")] +use std::time::{Duration, Instant}; + +/// An extension trait for `Future` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there only is a [`timeout`] function, but this will increase +/// over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Future` already implement `FutureExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait FutureExt: Future { + /// Creates a new future which allows `self` until `timeout`. + /// + /// This combinator creates a new future which wraps the receiving future + /// with a timeout. The returned future is allowed to execute until it + /// completes or `timeout` has elapsed, whichever happens first. + /// + /// If the future completes before `timeout` then the future will resolve + /// with that item. Otherwise the future will resolve to an error. + /// + /// The future is guaranteed to be polled at least once, even if `timeout` + /// is set to zero. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let future = long_future() + /// .timeout(Duration::from_secs(1)) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(future); + /// # } + /// ``` + #[cfg(feature = "timer")] + fn timeout(self, timeout: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, timeout) + } + + #[cfg(feature = "timer")] + #[deprecated(since = "0.1.8", note = "use `timeout` instead")] + #[allow(deprecated)] + #[doc(hidden)] + fn deadline(self, deadline: Instant) -> Deadline<Self> + where + Self: Sized, + { + Deadline::new(self, deadline) + } +} + +impl<T: ?Sized> FutureExt for T where T: Future {} + +#[cfg(test)] +mod test { + use super::*; + use prelude::future; + + #[cfg(feature = "timer")] + #[test] + fn timeout_polls_at_least_once() { + let base_future = future::result::<(), ()>(Ok(())); + let timeouted_future = base_future.timeout(Duration::new(0, 0)); + assert!(timeouted_future.wait().is_ok()); + } +} diff --git a/third_party/rust/tokio-0.1.22/src/util/mod.rs b/third_party/rust/tokio-0.1.22/src/util/mod.rs new file mode 100644 index 0000000000..58fd3d0b05 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/mod.rs @@ -0,0 +1,15 @@ +//! Utilities for working with Tokio. +//! +//! This module contains utilities that are useful for working with Tokio. +//! Currently, this only includes [`FutureExt`] and [`StreamExt`], but this +//! may grow over time. +//! +//! [`FutureExt`]: trait.FutureExt.html +//! [`StreamExt`]: trait.StreamExt.html + +mod enumerate; +mod future; +mod stream; + +pub use self::future::FutureExt; +pub use self::stream::StreamExt; diff --git a/third_party/rust/tokio-0.1.22/src/util/stream.rs b/third_party/rust/tokio-0.1.22/src/util/stream.rs new file mode 100644 index 0000000000..3b7aa2686c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/stream.rs @@ -0,0 +1,95 @@ +#[cfg(feature = "timer")] +use tokio_timer::{throttle::Throttle, Timeout}; + +use futures::Stream; + +#[cfg(feature = "timer")] +use std::time::Duration; +pub use util::enumerate::Enumerate; + +/// An extension trait for `Stream` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there are only [`timeout`] and [`throttle`] functions, but +/// this will increase over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Stream` already implement `StreamExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait StreamExt: Stream { + /// Throttle down the stream by enforcing a fixed delay between items. + /// + /// Errors are also delayed. + #[cfg(feature = "timer")] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + Throttle::new(self, duration) + } + + /// Creates a new stream which gives the current iteration count as well + /// as the next value. + /// + /// The stream returned yields pairs `(i, val)`, where `i` is the + /// current index of iteration and `val` is the value returned by the + /// iterator. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of + /// an iterator with more than [`std::usize::MAX`] elements either produces the + /// wrong result or panics. + fn enumerate(self) -> Enumerate<Self> + where + Self: Sized, + { + Enumerate::new(self) + } + + /// Creates a new stream which allows `self` until `timeout`. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// with a timeout. For each item, the returned stream is allowed to execute + /// until it completes or `timeout` has elapsed, whichever happens first. + /// + /// If an item completes before `timeout` then the stream will yield + /// with that item. Otherwise the stream will yield to an error. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let stream = long_future() + /// .into_stream() + /// .timeout(Duration::from_secs(1)) + /// .for_each(|i| future::ok(println!("item = {:?}", i))) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(stream); + /// # } + /// ``` + #[cfg(feature = "timer")] + fn timeout(self, timeout: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, timeout) + } +} + +impl<T: ?Sized> StreamExt for T where T: Stream {} diff --git a/third_party/rust/tokio-0.1.22/tests/buffered.rs b/third_party/rust/tokio-0.1.22/tests/buffered.rs new file mode 100644 index 0000000000..45560ad203 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/buffered.rs @@ -0,0 +1,65 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::io::{BufReader, BufWriter, Read, Write}; +use std::net::TcpStream; +use std::thread; + +use futures::stream::Stream; +use futures::Future; +use tokio::net::TcpListener; +use tokio_io::io::copy; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +#[test] +fn echo_server() { + const N: usize = 1024; + drop(env_logger::try_init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let msg = "foo bar baz"; + let t = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + + let t2 = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + let mut b = vec![0; msg.len() * N]; + t!(s.read_exact(&mut b)); + b + }); + + let mut expected = Vec::<u8>::new(); + for _i in 0..N { + expected.extend(msg.as_bytes()); + assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); + } + (expected, t2) + }); + + let clients = srv.incoming().take(2).collect(); + let copied = clients.and_then(|clients| { + let mut clients = clients.into_iter(); + let a = BufReader::new(clients.next().unwrap()); + let b = BufWriter::new(clients.next().unwrap()); + copy(a, b) + }); + + let (amt, _, _) = t!(copied.wait()); + let (expected, t2) = t.join().unwrap(); + let actual = t2.join().unwrap(); + + assert!(expected == actual); + assert_eq!(amt, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio-0.1.22/tests/clock.rs b/third_party/rust/tokio-0.1.22/tests/clock.rs new file mode 100644 index 0000000000..184705aede --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/clock.rs @@ -0,0 +1,64 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_timer; + +use tokio::prelude::*; +use tokio::runtime::{self, current_thread}; +use tokio::timer::*; +use tokio_timer::clock::Clock; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +struct MockNow(Instant); + +impl tokio_timer::clock::Now for MockNow { + fn now(&self) -> Instant { + self.0 + } +} + +#[test] +fn clock_and_timer_concurrent() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = runtime::Builder::new().clock(clock).build().unwrap(); + + let (tx, rx) = mpsc::channel(); + + rt.spawn({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn clock_and_timer_single_threaded() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = current_thread::Builder::new().clock(clock).build().unwrap(); + + rt.block_on({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + Ok(()) + }) + }) + .unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/tests/drop-core.rs b/third_party/rust/tokio-0.1.22/tests/drop-core.rs new file mode 100644 index 0000000000..8be0d711d7 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/drop-core.rs @@ -0,0 +1,42 @@ +extern crate futures; +extern crate tokio; + +use std::net; +use std::thread; + +use futures::future; +use futures::prelude::*; +use futures::sync::oneshot; +use tokio::net::TcpListener; +use tokio::reactor::Reactor; + +#[test] +fn tcp_doesnt_block() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + drop(core); + assert!(listener.incoming().wait().next().unwrap().is_err()); +} + +#[test] +fn drop_wakes() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + let (tx, rx) = oneshot::channel::<()>(); + let t = thread::spawn(move || { + let incoming = listener.incoming(); + let new_socket = incoming.into_future().map_err(|_| ()); + let drop_tx = future::lazy(|| { + drop(tx); + future::ok(()) + }); + assert!(new_socket.join(drop_tx).wait().is_err()); + }); + drop(rx.wait()); + drop(core); + t.join().unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/tests/enumerate.rs b/third_party/rust/tokio-0.1.22/tests/enumerate.rs new file mode 100644 index 0000000000..c71b7a24c9 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/enumerate.rs @@ -0,0 +1,26 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_executor; +extern crate tokio_timer; + +use futures::sync::mpsc; +use tokio::util::StreamExt; + +#[test] +fn enumerate() { + use futures::*; + + let (mut tx, rx) = mpsc::channel(1); + + std::thread::spawn(|| { + for i in 0..5 { + tx = tx.send(i * 2).wait().unwrap(); + } + }); + + let result = rx.enumerate().collect(); + assert_eq!( + result.wait(), + Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)]) + ); +} diff --git a/third_party/rust/tokio-0.1.22/tests/global.rs b/third_party/rust/tokio-0.1.22/tests/global.rs new file mode 100644 index 0000000000..1bf45a66f6 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/global.rs @@ -0,0 +1,141 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; +use std::{io, thread}; + +use futures::prelude::*; +use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime::Runtime; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +#[test] +fn hammer_old() { + let _ = env_logger::try_init(); + + let threads = (0..10) + .map(|_| { + thread::spawn(|| { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + let mine = TcpStream::connect(&addr); + let theirs = srv + .incoming() + .into_future() + .map(|(s, _)| s.unwrap()) + .map_err(|(s, _)| s); + let (mine, theirs) = t!(mine.join(theirs).wait()); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); + }) + }) + .collect::<Vec<_>>(); + for thread in threads { + thread.join().unwrap(); + } +} + +struct Rd(Arc<TcpStream>); +struct Wr(Arc<TcpStream>); + +impl io::Read for Rd { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + <&TcpStream>::read(&mut &*self.0, dst) + } +} + +impl tokio_io::AsyncRead for Rd {} + +impl io::Write for Wr { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + <&TcpStream>::write(&mut &*self.0, src) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Wr { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +#[test] +fn hammer_split() { + use tokio_io::io; + + const N: usize = 100; + const ITER: usize = 10; + + let _ = env_logger::try_init(); + + for _ in 0..ITER { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + + let cnt = Arc::new(AtomicUsize::new(0)); + + let mut rt = Runtime::new().unwrap(); + + fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) { + let socket = Arc::new(socket); + let rd = Rd(socket.clone()); + let wr = Wr(socket); + + let cnt2 = cnt.clone(); + + let rd = io::read(rd, vec![0; 1]) + .map(move |_| { + cnt2.fetch_add(1, Relaxed); + }) + .map_err(|e| panic!("read error = {:?}", e)); + + let wr = io::write_all(wr, b"1") + .map(move |_| { + cnt.fetch_add(1, Relaxed); + }) + .map_err(move |e| panic!("write error = {:?}", e)); + + tokio::spawn(rd); + tokio::spawn(wr); + } + + rt.spawn({ + let cnt = cnt.clone(); + srv.incoming() + .map_err(|e| panic!("accept error = {:?}", e)) + .take(N as u64) + .for_each(move |socket| { + split(socket, cnt.clone()); + Ok(()) + }) + }); + + for _ in 0..N { + rt.spawn({ + let cnt = cnt.clone(); + TcpStream::connect(&addr) + .map_err(move |e| panic!("connect error = {:?}", e)) + .map(move |socket| split(socket, cnt)) + }); + } + + rt.shutdown_on_idle().wait().unwrap(); + assert_eq!(N * 4, cnt.load(Relaxed)); + } +} diff --git a/third_party/rust/tokio-0.1.22/tests/length_delimited.rs b/third_party/rust/tokio-0.1.22/tests/length_delimited.rs new file mode 100644 index 0000000000..f87cfa936d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/length_delimited.rs @@ -0,0 +1,627 @@ +extern crate bytes; +extern crate futures; +extern crate tokio; + +use tokio::codec::*; +use tokio::io::{AsyncRead, AsyncWrite}; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::Async::*; +use futures::{Poll, Sink, Stream}; + +use std::collections::VecDeque; +use std::io; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +#[test] +fn read_empty_io_yields_nothing() { + let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_read(mock! { + Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_native_endian() { + let data = if cfg!(target_endian = "big") { + b"\x00\x00\x00\x09abcdefghi" + } else { + b"\x09\x00\x00\x00abcdefghi" + }; + let mut io = length_delimited::Builder::new() + .native_endian() + .new_read(mock! { + Ok(data[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi"); + data.extend_from_slice(b"\x00\x00\x00\x03123"); + data.extend_from_slice(b"\x00\x00\x00\x0bhello world"); + + let mut io = FramedRead::new( + mock! { + Ok(data.into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet_wait() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet_wait() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Err(would_block()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_incomplete_head() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00"[..].into()), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_head_multi() { + let mut io = FramedRead::new( + mock! { + Err(would_block()), + Ok(b"\x00"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_payload() { + let mut io = FramedRead::new( + mock! { + Ok(b"\x00\x00\x00\x09ab"[..].into()), + Err(would_block()), + Ok(b"cd"[..].into()), + Err(would_block()), + }, + LengthDelimitedCodec::new(), + ); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new().new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new().new_read(mock! { + Ok(b"\x00\x00\x00\x09abcd"[..].into()), + Err(would_block()), + Ok(b"efghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_one_byte_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_read(mock! { + Ok(b"\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_header_offset() { + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(4) + .new_read(mock! { + Ok(b"zzzz\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_skip_none_adjusted() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"xx\x00\x09abcdefghi"); + data.extend_from_slice(b"yy\x00\x03123"); + data.extend_from_slice(b"zz\x00\x0bhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(2) + .num_skip(0) + .length_adjustment(4) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!( + io.poll().unwrap(), + Ready(Some(b"xx\x00\x09abcdefghi"[..].into())) + ); + assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into()))); + assert_eq!( + io.poll().unwrap(), + Ready(Some(b"zz\x00\x0bhello world"[..].into())) + ); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_length_includes_head() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x0babcdefghi"); + data.extend_from_slice(b"\x00\x05123"); + data.extend_from_slice(b"\x00\x0dhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_adjustment(-2) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn write_single_frame_length_adjusted() { + let mut io = length_delimited::Builder::new() + .length_adjustment(-2) + .new_write(mock! { + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_nothing_yields_nothing() { + let mut io = FramedWrite::new(mock!(), LengthDelimitedCodec::new()); + assert!(io.poll_complete().unwrap().is_ready()); +} + +#[test] +fn write_single_frame_one_packet() { + let mut io = FramedWrite::new( + mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_one_packet() { + let mut io = FramedWrite::new( + mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io + .start_send(Bytes::from("hello world")) + .unwrap() + .is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_multi_packet() { + let mut io = FramedWrite::new( + mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io + .start_send(Bytes::from("hello world")) + .unwrap() + .is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_would_block() { + let mut io = FramedWrite::new( + mock! { + Err(would_block()), + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, + LengthDelimitedCodec::new(), + ); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_write(mock! { + Ok(b"\x09\x00\x00\x00"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_with_short_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_write(mock! { + Ok(b"\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_write(mock! {}); + + assert_eq!( + io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new().new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"abcdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert_eq!( + io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new().new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"ab"[..].into()), + Err(would_block()), + Ok(b"cdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert!(io.poll_complete().unwrap().is_ready()); + assert_eq!( + io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_zero() { + let mut io = length_delimited::Builder::new().new_write(mock! {}); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert_eq!( + io.poll_complete().unwrap_err().kind(), + io::ErrorKind::WriteZero + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn encode_overflow() { + // Test reproducing tokio-rs/tokio#681. + let mut codec = length_delimited::Builder::new().new_codec(); + let mut buf = BytesMut::with_capacity(1024); + + // Put some data into the buffer without resizing it to hold more. + let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>(); + buf.put_slice(&some_as[..]); + + // Trying to encode the length header should resize the buffer if it won't fit. + codec.encode(Bytes::from("hello"), &mut buf).unwrap(); +} + +// ===== Test utils ===== + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +struct Mock { + calls: VecDeque<io::Result<Op>>, +} + +enum Op { + Data(Vec<u8>), + Flush, +} + +use self::Op::*; + +impl io::Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock {} + +impl io::Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + let len = data.len(); + assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); + assert_eq!(&data[..], &src[..len]); + Ok(len) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.calls.pop_front() { + Some(Ok(Op::Flush)) => Ok(()), + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(()), + } + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Ready(())) + } +} + +impl<'a> From<&'a [u8]> for Op { + fn from(src: &'a [u8]) -> Op { + Op::Data(src.into()) + } +} + +impl From<Vec<u8>> for Op { + fn from(src: Vec<u8>) -> Op { + Op::Data(src) + } +} diff --git a/third_party/rust/tokio-0.1.22/tests/line-frames.rs b/third_party/rust/tokio-0.1.22/tests/line-frames.rs new file mode 100644 index 0000000000..84b860a2a3 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/line-frames.rs @@ -0,0 +1,90 @@ +extern crate bytes; +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_threadpool; + +use std::io; +use std::net::Shutdown; + +use bytes::{BufMut, BytesMut}; +use futures::{Future, Sink, Stream}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_codec::{Decoder, Encoder}; +use tokio_io::io::{read, write_all}; +use tokio_threadpool::Builder; + +pub struct LineCodec; + +impl Decoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + match buf.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some(buf.split_to(i + 1).into())), + None => Ok(None), + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() == 0 { + Ok(None) + } else { + let amt = buf.len(); + Ok(Some(buf.split_to(amt))) + } + } +} + +impl Encoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> { + into.put(&item[..]); + Ok(()) + } +} + +#[test] +fn echo() { + drop(env_logger::try_init()); + + let pool = Builder::new().pool_size(1).build(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + let sender = pool.sender().clone(); + let srv = listener.incoming().for_each(move |socket| { + let (sink, stream) = LineCodec.framed(socket).split(); + sender + .spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())) + .unwrap(); + Ok(()) + }); + + pool.sender() + .spawn(srv.map_err(|e| panic!("srv error: {}", e))) + .unwrap(); + + let client = TcpStream::connect(&addr); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); + assert_eq!(amt, 2); + assert_eq!(&buf[..2], b"a\n"); + + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"\n"); + + let (client, _) = write_all(client, b"b").wait().unwrap(); + client.shutdown(Shutdown::Write).unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"b"); +} diff --git a/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs b/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs new file mode 100644 index 0000000000..eabdec4c80 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/pipe-hup.rs @@ -0,0 +1,103 @@ +#![cfg(unix)] + +extern crate env_logger; +extern crate futures; +extern crate libc; +extern crate mio; +extern crate tokio; +extern crate tokio_io; + +use std::fs::File; +use std::io::{self, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::thread; +use std::time::Duration; + +use futures::Future; +use mio::event::Evented; +use mio::unix::{EventedFd, UnixReady}; +use mio::{PollOpt, Ready, Token}; +use tokio::reactor::{Handle, PollEvented2}; +use tokio_io::io::read_to_end; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +struct MyFile(File); + +impl MyFile { + fn new(file: File) -> MyFile { + unsafe { + let r = libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); + assert!(r != -1, "fcntl error: {}", io::Error::last_os_error()); + } + MyFile(file) + } +} + +impl io::Read for MyFile { + fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { + self.0.read(bytes) + } +} + +impl Evented for MyFile { + fn register( + &self, + poll: &mio::Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).register(poll, token, interest | hup, opts) + } + fn reregister( + &self, + poll: &mio::Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest | hup, opts) + } + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + EventedFd(&self.0.as_raw_fd()).deregister(poll) + } +} + +#[test] +fn hup() { + drop(env_logger::try_init()); + + let handle = Handle::default(); + unsafe { + let mut pipes = [0; 2]; + assert!( + libc::pipe(pipes.as_mut_ptr()) != -1, + "pipe error: {}", + io::Error::last_os_error() + ); + let read = File::from_raw_fd(pipes[0]); + let mut write = File::from_raw_fd(pipes[1]); + let t = thread::spawn(move || { + write.write_all(b"Hello!\n").unwrap(); + write.write_all(b"Good bye!\n").unwrap(); + thread::sleep(Duration::from_millis(100)); + }); + + let source = PollEvented2::new_with_handle(MyFile::new(read), &handle).unwrap(); + + let reader = read_to_end(source, Vec::new()); + let (_, content) = t!(reader.wait()); + assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); + t.join().unwrap(); + } +} diff --git a/third_party/rust/tokio-0.1.22/tests/reactor.rs b/third_party/rust/tokio-0.1.22/tests/reactor.rs new file mode 100644 index 0000000000..fd3a8eea69 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/reactor.rs @@ -0,0 +1,91 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_tcp; + +use tokio_reactor::Reactor; +use tokio_tcp::TcpListener; + +use futures::executor::{spawn, Notify, Spawn}; +use futures::{Future, Stream}; + +use std::mem; +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +#[test] +fn test_drop_on_notify() { + // When the reactor receives a kernel notification, it notifies the + // task that holds the associated socket. If this notification results in + // the task being dropped, the socket will also be dropped. + // + // Previously, there was a deadlock scenario where the reactor, while + // notifying, held a lock and the task being dropped attempted to acquire + // that same lock in order to clean up state. + // + // To simulate this case, we create a fake executor that does nothing when + // the task is notified. This simulates an executor in the process of + // shutting down. Then, when the task handle is dropped, the task itself is + // dropped. + + struct MyNotify; + + type Task = Mutex<Spawn<Box<Future<Item = (), Error = ()>>>>; + + impl Notify for MyNotify { + fn notify(&self, _: usize) { + // Do nothing + } + + fn clone_id(&self, id: usize) -> usize { + let ptr = id as *const Task; + let task = unsafe { Arc::from_raw(ptr) }; + + mem::forget(task.clone()); + mem::forget(task); + + id + } + + fn drop_id(&self, id: usize) { + let ptr = id as *const Task; + let _ = unsafe { Arc::from_raw(ptr) }; + } + } + + let addr = "127.0.0.1:0".parse().unwrap(); + let mut reactor = Reactor::new().unwrap(); + + // Create a listener + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Define a task that just drains the listener + let task = Box::new({ + listener + .incoming() + .for_each(|_| Ok(())) + .map_err(|_| panic!()) + }) as Box<Future<Item = (), Error = ()>>; + + let task = Arc::new(Mutex::new(spawn(task))); + let notify = Arc::new(MyNotify); + + let mut enter = tokio_executor::enter().unwrap(); + + tokio_reactor::with_default(&reactor.handle(), &mut enter, |_| { + let id = &*task as *const Task as usize; + + task.lock() + .unwrap() + .poll_future_notify(¬ify, id) + .unwrap(); + }); + + drop(task); + + // Establish a connection to the acceptor + let _s = TcpStream::connect(&addr).unwrap(); + + reactor.turn(None).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.22/tests/runtime.rs b/third_party/rust/tokio-0.1.22/tests/runtime.rs new file mode 100644 index 0000000000..f84c66738b --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/runtime.rs @@ -0,0 +1,532 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; + +use futures::sync::oneshot; +use std::sync::{atomic, Arc, Mutex}; +use std::thread; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::future::lazy; +use tokio::prelude::*; +use tokio::runtime::Runtime; + +// this import is used in all child modules that have it in scope +// from importing super::*, but the compiler doesn't realise that +// and warns about it. +pub use futures::future::Executor; + +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +fn create_client_server_future() -> Box<Future<Item = (), Error = ()> + Send> { + let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(server.local_addr()); + let client = TcpStream::connect(&addr); + + let server = server + .incoming() + .take(1) + .map_err(|e| panic!("accept err = {:?}", e)) + .for_each(|socket| { + tokio::spawn({ + io::write_all(socket, b"hello") + .map(|_| ()) + .map_err(|e| panic!("write err = {:?}", e)) + }) + }) + .map(|_| ()); + + let client = client + .map_err(|e| panic!("connect err = {:?}", e)) + .and_then(|client| { + // Read all + io::read_to_end(client, vec![]) + .map(|_| ()) + .map_err(|e| panic!("read err = {:?}", e)) + }); + + let future = server.join(client).map(|_| ()); + Box::new(future) +} + +#[test] +fn runtime_tokio_run() { + let _ = env_logger::try_init(); + + tokio::run(create_client_server_future()); +} + +#[test] +fn runtime_single_threaded() { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(create_client_server_future()).unwrap(); + runtime.run().unwrap(); +} + +#[test] +fn runtime_single_threaded_block_on() { + let _ = env_logger::try_init(); + + tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap(); +} + +mod runtime_single_threaded_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod runtime_single_threaded_racy { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(tokio::runtime::current_thread::Handle, Box<Future<Item = (), Error = ()> + Send>), + { + let (trigger, exit) = futures::sync::oneshot::channel(); + let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); + let jh = ::std::thread::spawn(move || { + let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); + handle_tx.send(rt.handle()).unwrap(); + + // don't exit until we are told to + rt.block_on(exit.map_err(|_| ())).unwrap(); + + // run until all spawned futures (incl. the "exit" signal future) have completed. + rt.run().unwrap(); + }); + + let (tx, rx) = futures::sync::oneshot::channel(); + + let handle = handle_rx.recv().unwrap(); + spawn( + handle, + Box::new(futures::future::lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })), + ); + + // signal runtime thread to exit + trigger.send(()).unwrap(); + + // wait for runtime thread to exit + jh.join().unwrap(); + + assert_eq!(rx.wait().unwrap(), ()); + } + + #[test] + fn spawn() { + test(|handle, f| { + handle.spawn(f).unwrap(); + }) + } + + #[test] + fn execute() { + test(|handle, f| { + handle.execute(f).unwrap(); + }) + } +} + +mod runtime_multi_threaded { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime) + Send + 'static, + { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::Builder::new().build().unwrap(); + spawn(&mut runtime); + runtime.shutdown_on_idle().wait().unwrap(); + } + + #[test] + fn spawn() { + test(|rt| { + rt.spawn(create_client_server_future()); + }); + } + + #[test] + fn execute() { + test(|rt| { + rt.executor() + .execute(create_client_server_future()) + .unwrap(); + }); + } +} + +#[test] +fn block_on_timer() { + use std::time::{Duration, Instant}; + use tokio::timer::{Delay, Error}; + + fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send> + where + T: Send + 'static, + { + Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) + } + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod from_block_on { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + use std::time::Duration; + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + runtime + .block_on(rx.then(move |_| { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, *cnt.lock().unwrap()); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod many { + use super::*; + + const ITER: usize = 200; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime, Box<Future<Item = (), Error = ()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); + + for _ in 0..ITER { + let c = cnt.clone(); + spawn( + &mut runtime, + Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + })), + ); + } + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); + } + + #[test] + fn spawn() { + test(|rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test(|rt, f| { + rt.executor().execute(f).unwrap(); + }) + } +} + +mod from_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +mod nested_enter { + use super::*; + use std::panic; + use tokio::runtime::current_thread; + + fn test<F1, F2>(first: F1, nested: F2) + where + F1: Fn(Box<Future<Item = (), Error = ()> + Send>) + Send + 'static, + F2: Fn(Box<Future<Item = (), Error = ()> + Send>) + panic::UnwindSafe + Send + 'static, + { + let panicked = Arc::new(Mutex::new(false)); + let panicked2 = panicked.clone(); + + // Since this is testing panics in other threads, printing about panics + // is noisy and can give the impression that the test is ignoring panics. + // + // It *is* ignoring them, but on purpose. + let prev_hook = panic::take_hook(); + panic::set_hook(Box::new(|info| { + let s = info.to_string(); + if s.starts_with("panicked at 'nested ") + || s.starts_with("panicked at 'Multiple executors at once") + { + // expected, noop + } else { + println!("{}", s); + } + })); + + first(Box::new(lazy(move || { + panic::catch_unwind(move || nested(Box::new(lazy(|| Ok::<(), ()>(()))))) + .expect_err("nested should panic"); + *panicked2.lock().unwrap() = true; + Ok::<(), ()>(()) + }))); + + panic::set_hook(prev_hook); + + assert!( + *panicked.lock().unwrap(), + "nested call should have panicked" + ); + } + + fn threadpool_new() -> Runtime { + Runtime::new().expect("rt new") + } + + #[test] + fn run_in_run() { + test(tokio::run, tokio::run); + } + + #[test] + fn threadpool_block_on_in_run() { + test(tokio::run, |fut| { + let mut rt = threadpool_new(); + rt.block_on(fut).unwrap(); + }); + } + + #[test] + fn threadpool_block_on_all_in_run() { + test(tokio::run, |fut| { + let rt = threadpool_new(); + rt.block_on_all(fut).unwrap(); + }); + } + + #[test] + fn current_thread_block_on_all_in_run() { + test(tokio::run, |fut| { + current_thread::block_on_all(fut).unwrap(); + }); + } +} + +#[test] +fn runtime_reactor_handle() { + #![allow(deprecated)] + + use futures::Stream; + use std::net::{TcpListener as StdListener, TcpStream as StdStream}; + + let rt = Runtime::new().unwrap(); + + let std_listener = StdListener::bind("127.0.0.1:0").unwrap(); + let tk_listener = TcpListener::from_std(std_listener, rt.handle()).unwrap(); + + let addr = tk_listener.local_addr().unwrap(); + + // Spawn a thread since we are avoiding the runtime + let th = thread::spawn(|| for _ in tk_listener.incoming().take(1).wait() {}); + + let _ = StdStream::connect(&addr).unwrap(); + + th.join().unwrap(); +} + +#[test] +fn after_start_and_before_stop_is_called() { + let _ = env_logger::try_init(); + + let after_start = Arc::new(atomic::AtomicUsize::new(0)); + let before_stop = Arc::new(atomic::AtomicUsize::new(0)); + + let after_inner = after_start.clone(); + let before_inner = before_stop.clone(); + let runtime = tokio::runtime::Builder::new() + .after_start(move || { + after_inner.clone().fetch_add(1, atomic::Ordering::Relaxed); + }) + .before_stop(move || { + before_inner.clone().fetch_add(1, atomic::Ordering::Relaxed); + }) + .build() + .unwrap(); + + runtime.block_on_all(create_client_server_future()).unwrap(); + + assert!(after_start.load(atomic::Ordering::Relaxed) > 0); + assert!(before_stop.load(atomic::Ordering::Relaxed) > 0); +} diff --git a/third_party/rust/tokio-0.1.22/tests/timer.rs b/third_party/rust/tokio-0.1.22/tests/timer.rs new file mode 100644 index 0000000000..54c3b9d31d --- /dev/null +++ b/third_party/rust/tokio-0.1.22/tests/timer.rs @@ -0,0 +1,113 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use tokio::prelude::*; +use tokio::timer::*; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +#[test] +fn timer_with_runtime() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(100); + let (tx, rx) = mpsc::channel(); + + tokio::run({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn starving() { + use futures::{task, Async, Poll}; + + let _ = env_logger::try_init(); + + struct Starve(Delay, u64); + + impl Future for Starve { + type Item = u64; + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, ()> { + if self.0.poll().unwrap().is_ready() { + return Ok(self.1.into()); + } + + self.1 += 1; + + task::current().notify(); + + Ok(Async::NotReady) + } + } + + let when = Instant::now() + Duration::from_millis(20); + let starve = Starve(Delay::new(when), 0); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + starve.and_then(move |_ticks| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn deadline() { + use futures::future; + + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(20); + let (tx, rx) = mpsc::channel(); + + #[allow(deprecated)] + tokio::run({ + future::empty::<(), ()>().deadline(when).then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn timeout() { + use futures::future; + + let _ = env_logger::try_init(); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + future::empty::<(), ()>() + .timeout(Duration::from_millis(20)) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} |