diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-0.1.11 | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-0.1.11')
58 files changed, 9071 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/.cargo-checksum.json b/third_party/rust/tokio-0.1.11/.cargo-checksum.json new file mode 100644 index 0000000000..0090f5eebd --- /dev/null +++ b/third_party/rust/tokio-0.1.11/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"a1e6cce4e7ec3d2d83b05a4ae2ed84d547ac35cede41fa4da773e29585eaf332","CONTRIBUTING.md":"dc3eeb8c4f2bbf9ac909a6e797070c9ea26c8eeb0b7f430926951c8b94e3e2c0","Cargo.toml":"ba61efd66540563b475d5875b3f62aa86cfe81dd2452be9819db25ef9e46251a","LICENSE":"4899c290472c872cf8a1904a60e73ec58a1bc1db2e20bc143aa3d1498be49c96","README.md":"8d46731ef115749268a3f0cd6781d75e41f442fb7ad385b7722f91429e1fb7b7","benches/latency.rs":"b8f62578cf5784efa201549535e2a0d3bda78d78cf24d2be662851c35a1339df","benches/mio-ops.rs":"0df1a47f9bb3c8ed4291244da8c39d956b7a23cad9be7b2a5009fa58d39f8330","benches/tcp.rs":"851a4af3e8ae552605f16be158aa8778db2fa1e13bcf7b9347560b735a971e67","ci/tsan":"1c1773c80bd9bff2f2a5f60ca83e13952e523228650fc22e774c48f81f90c32a","examples/README.md":"c976ccc5c8b44caf31ef3a1c5ac13605898e087bc9b5f7e5a7ac9e8c44157a99","examples/chat-combinator.rs":"a0a621dfbc0ec63fe78c5ef49403787fcf44f9ecc83e0cae36dd2e6f0b3480aa","examples/chat.rs":"c66333084964d9a8b2fb038c60fca6edc395ae910a61f0f3d8f3f5b9f60d0122","examples/connect.rs":"b9b3527bb0a9dcf2cd875c6136339b23ba7151789ca49a1446cda3fce63f5865","examples/echo-udp.rs":"3200231d92c47f516d760f89c88b6faadc85bfa2d7686d696e648cee9745b7b1","examples/echo.rs":"0819864913f0e390b48c620bb67a2b88d6c5d6e11229ec1f5eeb935092353dce","examples/hello_world.rs":"766db4454412ff30dad9655c80a5fcd43e15327ea4703c20771a4e7ac4112dac","examples/manual-runtime.rs":"45c1ec5c3b3cad2ee6c9aeed2283477705f9c28c61b45711bc769c675c1ef8e4","examples/print_each_packet.rs":"fa37cd7dcb426f3847ef0829959f8655c1873a69c2cd8edd0db05c5a0c7865df","examples/proxy.rs":"3f28f1a0bac9ae57c3bf6770679717d55220460fcfab05fb62e356f5f07586ee","examples/tinydb.rs":"b56ee3458dc18a284653fb099a9f72eb5612710d314085e72a6c8ed3edbfcce9","examples/tinyhttp.rs":"d402343998b3a2175d46b3801983999e0e1ffae59b8d415d46f8a71f818240d3","examples/udp-client.rs":"2df32c59e5dda17d7c18c6c7faf28a3f9ce2e07c97a908c5ee4ca97c3d775874","examples/udp-codec.rs":"ff1cfd19a7c0788de52a27857488a4ce117fe72b72ba340d836cb5d184e53696","src/async_await.rs":"a23eea1f6d1ac20248f4cd38bb1782a576775c81e0493d8fe8423478f0fa64e0","src/clock.rs":"ad45adb859163b40a51e566e0e53abff8682c8bbb7813fd106aa70ea55efe5f3","src/codec/length_delimited.rs":"da5a3b5c1139396019d0dbe86628e32baa36cf8ecf7fbaf488be39d0921ab057","src/codec/mod.rs":"249941c50f38b56aaac0688db0100b07b04bc038e759aae87dee9f3e26c986d8","src/executor/current_thread/mod.rs":"c6f183735fda8f2081e88960eaf10e7e9435fa7f9e3db118626f55366e22094f","src/executor/mod.rs":"418807abde295d8f728c027a8426d2bd702a6a93a73242a64c2adeb92c7cef12","src/fs.rs":"1a4dd1c0eacabfb22661174b2f1441bdce6941cd61ccd24b61d02cf3cbf339a3","src/io.rs":"dfd320da22ed8d6a66868fc2282fac61e8e042d64e83c4eef4a52ae47e6fa8ac","src/lib.rs":"24a388cf4eed4b75b0be3ba76875dc28dddf6ae7072a2a648c720b34d178bb98","src/net.rs":"e478a0ca41afc2fa39529045983b99104541526c486fa672b8d2d590da65a699","src/prelude.rs":"11624e3b508192bee12731044e36e5a98cf8a83b7b36f3efa7a89743c6059edf","src/reactor/mod.rs":"9b68b4d1aabe99b2e851e2f98135c35a891825a7017dc835e4d3de3c5f162bd5","src/reactor/poll_evented.rs":"6d1233e82ebdefe42c63c3bbca839d99aab5873aeca2bb11787660ecd39ad2f9","src/runtime/builder.rs":"1bfb071b81f7651425edebb7886b203cefd9606b14058abe81f0307f64910294","src/runtime/current_thread/builder.rs":"30f1dae794e0db2b4263eb1e96b71c12c91fc08d2845cbb7cb540cd31a3b1612","src/runtime/current_thread/mod.rs":"3b23d84b9db6ca594abccbe0dcc7e03611aba43f21240a5c086453c6a1693726","src/runtime/current_thread/runtime.rs":"e95b0e162e9aec999b4d749f4137e258fdedf761542da67712a6fbe2331d2833","src/runtime/mod.rs":"1cd29284700470a39118f41ae85df7de99ed5c72316074d7b716867011d29eac","src/runtime/shutdown.rs":"a3f23cbff014a0ac6dc8a75839f636bc71fce4d5e2e2d34fa76f4ba51ca3ff95","src/runtime/task_executor.rs":"8c6b122116a18377d6bda4c8778b37e4f8b91fc2a73b9511a1ff63d34264c293","src/timer.rs":"4ffd3244dad044bd0025c2bb1cf8710ae00a5136787a266c3f04a3010daed37f","src/util/future.rs":"915181c9d26f74ace2e151906864e8e7d5e7419d4c59a0036db7b4a214666fd7","src/util/mod.rs":"da2469ecae7a3bce199f12be54ae8757b6724c871ae3532efd73416c20d2af4b","src/util/stream.rs":"8bba870c5969ec5abbd02ff35d2441781b731adade2a7037f7487359b2d5eba6","tests/buffered.rs":"8c1444a7d3de1e897448805fa925964174fa282d6c91dacf89398a3fb6a5d844","tests/clock.rs":"890d40deabf559199f4c555937efe676839ad1c780e0d9b03333c42a1267c383","tests/drop-core.rs":"9a074dd521840d28e5c740a767aaae4c957614901a7f4c6b330cc1560ef50fd9","tests/global.rs":"3e3e3793a7aa2923014b5e5684cf00c877e2217806a8526902c8e2dd94b1ba10","tests/length_delimited.rs":"d5e7759a69288b985a60583db030ab61e7fb3429eb806ed23ca31049e343edc9","tests/line-frames.rs":"858849c9d260349a83289d6ae8ad7e1bc4c438bb0a89f2030b869d99e826c6f1","tests/pipe-hup.rs":"4b73016172ee8ee9c81cb9f92c786a3efb28e6649fcbc897a5c15b2d36ac9d1b","tests/reactor.rs":"179556b37cb99a25579ec438a5d1233f259501dc2687939a65a062dc5b22c9a9","tests/runtime.rs":"70205a34b1ed2f192efcb59e5b897267888c2f8e68b09d0a6c9c514314979ee9","tests/timer.rs":"10a5b654afd7cebf3927aab4cf29891f0e40cf01d08eed01082c14c5f2ac8ac4"},"package":"6e93c78d23cc61aa245a8acd2c4a79c4d7fa7fb5c3ca90d5737029f043a84895"}
\ No newline at end of file diff --git a/third_party/rust/tokio-0.1.11/CHANGELOG.md b/third_party/rust/tokio-0.1.11/CHANGELOG.md new file mode 100644 index 0000000000..33e6e3febd --- /dev/null +++ b/third_party/rust/tokio-0.1.11/CHANGELOG.md @@ -0,0 +1,80 @@ +This changelog only applies to the `tokio` crate proper. Each sub crate +maintains its own changelog tracking changes made in each respective sub crate. + +# 0.1.11 (September 28, 2018) + +* Fix `tokio-async-await` dependency (#675). + +# 0.1.10 (September 27, 2018) + +* Fix minimal versions + +# 0.1.9 (September 27, 2018) + +* Experimental async/await improvements (#661). +* Re-export `TaskExecutor` from `tokio-current-thread` (#652). +* Improve `Runtime` builder API (#645). +* `tokio::run` panics when called from the context of an executor + (#646). +* Introduce `StreamExt` with a `timeout` helper (#573). +* Move `length_delimited` into `tokio` (#575). +* Re-organize `tokio::net` module (#548). +* Re-export `tokio-current-thread::spawn` in current_thread runtime + (#579). + +# 0.1.8 (August 23, 2018) + +* Extract tokio::executor::current_thread to a sub crate (#370) +* Add `Runtime::block_on` (#398) +* Add `runtime::current_thread::block_on_all` (#477) +* Misc documentation improvements (#450) +* Implement `std::error::Error` for error types (#501) + +# 0.1.7 (June 6, 2018) + +* Add `Runtime::block_on` for concurrent runtime (#391). +* Provide handle to `current_thread::Runtime` that allows spawning tasks from + other threads (#340). +* Provide `clock::now()`, a configurable source of time (#381). + +# 0.1.6 (May 2, 2018) + +* Add asynchronous filesystem APIs (#323). +* Add "current thread" runtime variant (#308). +* `CurrentThread`: Expose inner `Park` instance. +* Improve fairness of `CurrentThread` executor (#313). + +# 0.1.5 (March 30, 2018) + +* Provide timer API (#266) + +# 0.1.4 (March 22, 2018) + +* Fix build on FreeBSD (#218) +* Shutdown the Runtime when the handle is dropped (#214) +* Set Runtime thread name prefix for worker threads (#232) +* Add builder for Runtime (#234) +* Extract TCP and UDP types into separate crates (#224) +* Optionally support futures 0.2. + +# 0.1.3 (March 09, 2018) + +* Fix `CurrentThread::turn` to block on idle (#212). + +# 0.1.2 (March 09, 2018) + +* Introduce Tokio Runtime (#141) +* Provide `CurrentThread` for more flexible usage of current thread executor (#141). +* Add Lio for platforms that support it (#142). +* I/O resources now lazily bind to the reactor (#160). +* Extract Reactor to dedicated crate (#169) +* Add facade to sub crates and add prelude (#166). +* Switch TCP/UDP fns to poll_ -> Poll<...> style (#175) + +# 0.1.1 (February 09, 2018) + +* Doc fixes + +# 0.1.0 (February 07, 2018) + +* Initial crate released based on [RFC](https://github.com/tokio-rs/tokio-rfcs/pull/3). diff --git a/third_party/rust/tokio-0.1.11/CONTRIBUTING.md b/third_party/rust/tokio-0.1.11/CONTRIBUTING.md new file mode 100644 index 0000000000..212fa3d42b --- /dev/null +++ b/third_party/rust/tokio-0.1.11/CONTRIBUTING.md @@ -0,0 +1,387 @@ +# Contributing to Tokio + +:balloon: Thanks for your help improving the project! We are so happy to have +you! + +There are opportunities to contribute to Tokio at any level. It doesn't matter if +you are just getting started with Rust or are the most weathered expert, we can +use your help. + +**No contribution is too small and all contributions are valued.** + +This guide will help you get started. **Do not let this guide intimidate you**. +It should be considered a map to help you navigate the process. + +You may also get help with contributing in the [dev channel][dev], please join +us! + +[dev]: https://gitter.im/tokio-rs/dev + +## Conduct + +The Tokio project adheres to the [Rust Code of Conduct][coc]. This describes +the _minimum_ behavior expected from all contributors. + +[coc]: https://github.com/rust-lang/rust/blob/master/CODE_OF_CONDUCT.md + +## Contributing in Issues + +For any issue, there are fundamentally three ways an individual can contribute: + +1. By opening the issue for discussion: For instance, if you believe that you + have uncovered a bug in Tokio, creating a new issue in the tokio-rs/tokio + issue tracker is the way to report it. + +2. By helping to triage the issue: This can be done by providing + supporting details (a test case that demonstrates a bug), providing + suggestions on how to address the issue, or ensuring that the issue is tagged + correctly. + +3. By helping to resolve the issue: Typically this is done either in the form of + demonstrating that the issue reported is not a problem after all, or more + often, by opening a Pull Request that changes some bit of something in + Tokio in a concrete and reviewable manner. + +**Anybody can participate in any stage of contribution**. We urge you to +participate in the discussion around bugs and participate in reviewing PRs. + +### Asking for General Help + +If you have reviewed existing documentation and still have questions or are +having problems, you can open an issue asking for help. + +In exchange for receiving help, we ask that you contribute back a documentation +PR that helps others avoid the problems that you encountered. + +### Submitting a Bug Report + +When opening a new issue in the Tokio issue tracker, users will be presented +with a [basic template][template] that should be filled in. If you believe that you have +uncovered a bug, please fill out this form, following the template to the best +of your ability. Do not worry if you cannot answer every detail, just fill in +what you can. + +The two most important pieces of information we need in order to properly +evaluate the report is a description of the behavior you are seeing and a simple +test case we can use to recreate the problem on our own. If we cannot recreate +the issue, it becomes impossible for us to fix. + +In order to rule out the possibility of bugs introduced by userland code, test +cases should be limited, as much as possible, to using only Tokio APIs. + +See [How to create a Minimal, Complete, and Verifiable example][mcve]. + +[mcve]: https://stackoverflow.com/help/mcve +[template]: .github/PULL_REQUEST_TEMPLATE.md + +### Triaging a Bug Report + +Once an issue has been opened, it is not uncommon for there to be discussion +around it. Some contributors may have differing opinions about the issue, +including whether the behavior being seen is a bug or a feature. This discussion +is part of the process and should be kept focused, helpful, and professional. + +Short, clipped responses—that provide neither additional context nor supporting +detail—are not helpful or professional. To many, such responses are simply +annoying and unfriendly. + +Contributors are encouraged to help one another make forward progress as much as +possible, empowering one another to solve issues collaboratively. If you choose +to comment on an issue that you feel either is not a problem that needs to be +fixed, or if you encounter information in an issue that you feel is incorrect, +explain why you feel that way with additional supporting context, and be willing +to be convinced that you may be wrong. By doing so, we can often reach the +correct outcome much faster. + +### Resolving a Bug Report + +In the majority of cases, issues are resolved by opening a Pull Request. The +process for opening and reviewing a Pull Request is similar to that of opening +and triaging issues, but carries with it a necessary review and approval +workflow that ensures that the proposed changes meet the minimal quality and +functional guidelines of the Tokio project. + +## Pull Requests + +Pull Requests are the way concrete changes are made to the code, documentation, +and dependencies in the Tokio repository. + +Even tiny pull requests (e.g., one character pull request fixing a typo in API +documentation) are greatly appreciated. Before making a large change, it is +usually a good idea to first open an issue describing the change to solicit +feedback and guidance. This will increasethe likelihood of the PR getting +merged. + +### Tests + +If the change being proposed alters code (as opposed to only documentation for +example), it is either adding new functionality to Tokio or it is fixing +existing, broken functionality. In both of these cases, the pull request should +include one or more tests to ensure that Tokio does not regress in the future. +There are two ways to write tests: integration tests and documentation tests +(Tokio avoids unit tests as much as possible). + +#### Integration tests + +Integration tests go in the same crate as the code they are testing. Each sub +crate should have a `dev-dependency` on `tokio` itself. This makes all Tokio +utilities available to use in tests, no matter the crate being tested. + +The best strategy for writing a new integration test is to look at existing +integration tests in the crate and follow the style. + +#### Documentation tests + +Ideally, every API has at least one [documentation test] that demonstrates how to +use the API. Documentation tests are run with `cargo test --doc`. This ensures +that the example is correct and provides additional test coverage. + +The trick to documentation tests is striking a balance between being succinct +for a reader to understand and actually testing the API. + +Same as with integration tests, when writing a documentation test, the full +`tokio` crate is available. This is especially useful for getting access to the +runtime to run the example. + +The documentation tests will be visible from both the crate specific +documentation **and** the `tokio` facade documentation via the re-export. The +example should be written from the point of view of a user that is using the +`tokio` crate. As such, the example should use the API via the facade and not by +directly referencing the crate. + +The type level example for `tokio_timer::Timeout` provides a good example of a +documentation test: + +``` +/// # extern crate futures; +/// # extern crate tokio; +/// // import the `timeout` function, usually this is done +/// // with `use tokio::prelude::*` +/// use tokio::prelude::FutureExt; +/// use futures::Stream; +/// use futures::sync::mpsc; +/// use std::time::Duration; +/// +/// # fn main() { +/// let (tx, rx) = mpsc::unbounded(); +/// # tx.unbounded_send(()).unwrap(); +/// # drop(tx); +/// +/// let process = rx.for_each(|item| { +/// // do something with `item` +/// # drop(item); +/// # Ok(()) +/// }); +/// +/// # tokio::runtime::current_thread::block_on_all( +/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. +/// process.timeout(Duration::from_millis(10)) +/// # ).unwrap(); +/// # } +``` + +Given that this is a *type* level documentation test and the primary way users +of `tokio` will create an instance of `Timeout` is by using +`FutureExt::timeout`, this is how the documentation test is structured. + +Lines that start with `/// #` are removed when the documentation is generated. +They are only there to get the test to run. The `block_on_all` function is the +easiest way to execute a future from a test. + +If this were a documentation test for the `Timeout::new` function, then the +example would explicitly use `Timeout::new`. For example: + +``` +/// # extern crate futures; +/// # extern crate tokio; +/// use tokio::timer::Timeout; +/// use futures::Future; +/// use futures::sync::oneshot; +/// use std::time::Duration; +/// +/// # fn main() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// # tokio::runtime::current_thread::block_on_all( +/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. +/// Timeout::new(rx, Duration::from_millis(10)) +/// # ).unwrap(); +/// # } +``` + +### Commits + +It is a recommended best practice to keep your changes as logically grouped as +possible within individual commits. There is no limit to the number of commits +any single Pull Request may have, and many contributors find it easier to review +changes that are split across multiple commits. + +That said, if you have a number of commits that are "checkpoints" and don't +represent a single logical change, please squash those together. + +Note that multiple commits often get squashed when they are landed (see the +notes about [commit squashing]). + +#### Commit message guidelines + +A good commit message should describe what changed and why. + +1. The first line should: + + * contain a short description of the change (preferably 50 characters or less, + and no more than 72 characters) + * be entirely in lowercase with the exception of proper nouns, acronyms, and + the words that refer to code, like function/variable names + * be prefixed with the name of the sub crate being changed (without the `tokio-` + prefix) and start with an imperative verb. If modifying `tokio` proper, + omit the crate prefix. + + Examples: + + * timer: introduce `Timeout` and deprecate `Deadline` + * export `Encoder`, `Decoder`, `Framed*` from tokio_codec + +2. Keep the second line blank. +3. Wrap all other lines at 72 columns (except for long URLs). +4. If your patch fixes an open issue, you can add a reference to it at the end + of the log. Use the `Fixes: #` prefix and the issue number. For other + references use `Refs: #`. `Refs` may include multiple issues, separated by a + comma. + + Examples: + + - `Fixes: #1337` + - `Refs: #1234` + +Sample complete commit message: + +```txt +subcrate: explain the commit in one line + +Body of commit message is a few lines of text, explaining things +in more detail, possibly giving some background about the issue +being fixed, etc. + +The body of the commit message can be several paragraphs, and +please do proper word-wrap and keep columns shorter than about +72 characters or so. That way, `git log` will show things +nicely even when it is indented. + +Fixes: #1337 +Refs: #453, #154 +``` + +### Opening the Pull Request + +From within GitHub, opening a new Pull Request will present you with a +[template] that should be filled out. Please try to do your best at filling out +the details, but feel free to skip parts if you're not sure what to put. + +[template]: .github/PULL_REQUEST_TEMPLATE.md + +### Discuss and update + +You will probably get feedback or requests for changes to your Pull Request. +This is a big part of the submission process so don't be discouraged! Some +contributors may sign off on the Pull Request right away, others may have +more detailed comments or feedback. This is a necessary part of the process +in order to evaluate whether the changes are correct and necessary. + +**Any community member can review a PR and you might get conflicting feedback**. +Keep an eye out for comments from code owners to provide guidance on conflicting +feedback. + +**Once the PR is open, do not rebase the commits**. See [Commit Squashing] for +more details. + +### Commit Squashing + +In most cases, **do not squash commits that you add to your Pull Request during +the review process**. When the commits in your Pull Request land, they may be +squashed into one commit per logical change. Metadata will be added to the +commit message (including links to the Pull Request, links to relevant issues, +and the names of the reviewers). The commit history of your Pull Request, +however, will stay intact on the Pull Request page. + +## Reviewing Pull Requests + +**Any Tokio community member is welcome to review any pull request**. + +All Tokio contributors who choose to review and provide feedback on Pull +Requests have a responsibility to both the project and the individual making the +contribution. Reviews and feedback must be helpful, insightful, and geared +towards improving the contribution as opposed to simply blocking it. If there +are reasons why you feel the PR should not land, explain what those are. Do not +expect to be able to block a Pull Request from advancing simply because you say +"No" without giving an explanation. Be open to having your mind changed. Be open +to working with the contributor to make the Pull Request better. + +Reviews that are dismissive or disrespectful of the contributor or any other +reviewers are strictly counter to the Code of Conduct. + +When reviewing a Pull Request, the primary goals are for the codebase to improve +and for the person submitting the request to succeed. **Even if a Pull Request +does not land, the submitters should come away from the experience feeling like +their effort was not wasted or unappreciated**. Every Pull Request from a new +contributor is an opportunity to grow the community. + +### Review a bit at a time. + +Do not overwhelm new contributors. + +It is tempting to micro-optimize and make everything about relative performance, +perfect grammar, or exact style matches. Do not succumb to that temptation. + +Focus first on the most significant aspects of the change: + +1. Does this change make sense for Tokio? +2. Does this change make Tokio better, even if only incrementally? +3. Are there clear bugs or larger scale issues that need attending to? +4. Is the commit message readable and correct? If it contains a breaking change + is it clear enough? + +Note that only **incremental** improvement is needed to land a PR. This means +that the PR does not need to be perfect, only better than the status quo. Follow +up PRs may be opened to continue iterating. + +When changes are necessary, *request* them, do not *demand* them, and **do not +assume that the submitter already knows how to add a test or run a benchmark**. + +Specific performance optimization techniques, coding styles and conventions +change over time. The first impression you give to a new contributor never does. + +Nits (requests for small changes that are not essential) are fine, but try to +avoid stalling the Pull Request. Most nits can typically be fixed by the Tokio +Collaborator landing the Pull Request but they can also be an opportunity for +the contributor to learn a bit more about the project. + +It is always good to clearly indicate nits when you comment: e.g. +`Nit: change foo() to bar(). But this is not blocking.` + +If your comments were addressed but were not folded automatically after new +commits or if they proved to be mistaken, please, [hide them][hiding-a-comment] +with the appropriate reason to keep the conversation flow concise and relevant. + +### Be aware of the person behind the code + +Be aware that *how* you communicate requests and reviews in your feedback can +have a significant impact on the success of the Pull Request. Yes, we may land +a particular change that makes Tokio better, but the individual might just not +want to have anything to do with Tokio ever again. The goal is not just having +good code. + +### Abandoned or Stalled Pull Requests + +If a Pull Request appears to be abandoned or stalled, it is polite to first +check with the contributor to see if they intend to continue the work before +checking if they would mind if you took it over (especially if it just has nits +left). When doing so, it is courteous to give the original contributor credit +for the work they started (either by preserving their name and email address in +the commit log, or by using an `Author: ` meta-data tag in the commit. + +_Adapted from the [Node.js contributing guide][node]_ + +[node]: https://github.com/nodejs/node/blob/master/CONTRIBUTING.md. +[hiding-a-comment]: https://help.github.com/articles/managing-disruptive-comments/#hiding-a-comment +[documentation test]: https://doc.rust-lang.org/rustdoc/documentation-tests.html diff --git a/third_party/rust/tokio-0.1.11/Cargo.toml b/third_party/rust/tokio-0.1.11/Cargo.toml new file mode 100644 index 0000000000..5cad3149b7 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/Cargo.toml @@ -0,0 +1,111 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g. crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio" +version = "0.1.11" +authors = ["Carl Lerche <me@carllerche.com>"] +description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio/0.1.11/tokio/" +readme = "README.md" +keywords = ["io", "async", "non-blocking", "futures"] +categories = ["asynchronous", "network-programming"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.bytes] +version = "0.4" + +[dependencies.futures] +version = "0.1.20" + +[dependencies.mio] +version = "0.6.14" + +[dependencies.tokio-async-await] +version = "0.1.0" +optional = true + +[dependencies.tokio-codec] +version = "0.1.0" + +[dependencies.tokio-current-thread] +version = "0.1.3" + +[dependencies.tokio-executor] +version = "0.1.5" + +[dependencies.tokio-fs] +version = "0.1.3" + +[dependencies.tokio-io] +version = "0.1.6" + +[dependencies.tokio-reactor] +version = "0.1.1" + +[dependencies.tokio-tcp] +version = "0.1.0" + +[dependencies.tokio-threadpool] +version = "0.1.4" + +[dependencies.tokio-timer] +version = "0.2.6" + +[dependencies.tokio-udp] +version = "0.1.0" +[dev-dependencies.env_logger] +version = "0.5" +default-features = false + +[dev-dependencies.flate2] +version = "1" +features = ["tokio"] + +[dev-dependencies.futures-cpupool] +version = "0.1" + +[dev-dependencies.http] +version = "0.1" + +[dev-dependencies.httparse] +version = "1.0" + +[dev-dependencies.libc] +version = "0.2" + +[dev-dependencies.num_cpus] +version = "1.0" + +[dev-dependencies.serde] +version = "1.0" + +[dev-dependencies.serde_derive] +version = "1.0" + +[dev-dependencies.serde_json] +version = "1.0" + +[dev-dependencies.time] +version = "0.1" + +[features] +async-await-preview = ["tokio-async-await/async-await-preview"] +[target."cfg(unix)".dependencies.tokio-uds] +version = "0.2.1" +[badges.appveyor] +id = "s83yxhy9qeb58va7" +repository = "carllerche/tokio" + +[badges.travis-ci] +repository = "tokio-rs/tokio" diff --git a/third_party/rust/tokio-0.1.11/LICENSE b/third_party/rust/tokio-0.1.11/LICENSE new file mode 100644 index 0000000000..38c1e27b8e --- /dev/null +++ b/third_party/rust/tokio-0.1.11/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-0.1.11/README.md b/third_party/rust/tokio-0.1.11/README.md new file mode 100644 index 0000000000..32ec4968e5 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/README.md @@ -0,0 +1,189 @@ +# Tokio + +A runtime for writing reliable, asynchronous, and slim applications with +the Rust programming language. It is: + +* **Fast**: Tokio's zero-cost abstractions give you bare-metal + performance. + +* **Reliable**: Tokio leverages Rust's ownership, type system, and + concurrency model to reduce bugs and ensure thread safety. + +* **Scalable**: Tokio has a minimal footprint, and handles backpressure + and cancellation naturally. + +[![Crates.io][crates-badge]][crates-url] +[![MIT licensed][mit-badge]][mit-url] +[![Travis Build Status][travis-badge]][travis-url] +[![Appveyor Build Status][appveyor-badge]][appveyor-url] +[![Gitter chat][gitter-badge]][gitter-url] + +[crates-badge]: https://img.shields.io/crates/v/tokio.svg +[crates-url]: https://crates.io/crates/tokio +[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg +[mit-url]: LICENSE-MIT +[travis-badge]: https://travis-ci.org/tokio-rs/tokio.svg?branch=master +[travis-url]: https://travis-ci.org/tokio-rs/tokio +[appveyor-badge]: https://ci.appveyor.com/api/projects/status/s83yxhy9qeb58va7/branch/master?svg=true +[appveyor-url]: https://ci.appveyor.com/project/carllerche/tokio/branch/master +[gitter-badge]: https://img.shields.io/gitter/room/tokio-rs/tokio.svg +[gitter-url]: https://gitter.im/tokio-rs/tokio + +[Website](https://tokio.rs) | +[Guides](https://tokio.rs/docs/getting-started/hello-world/) | +[API Docs](https://docs.rs/tokio) | +[Chat](https://gitter.im/tokio-rs/tokio) + +The API docs for the master branch are published [here][master-dox]. + +[master-dox]: https://tokio-rs.github.io/tokio/tokio/ + +## Overview + +Tokio is an event-driven, non-blocking I/O platform for writing +asynchronous applications with the Rust programming language. At a high +level, it provides a few major components: + +* A multithreaded, work-stealing based task [scheduler]. +* A [reactor] backed by the operating system's event queue (epoll, kqueue, + IOCP, etc...). +* Asynchronous [TCP and UDP][net] sockets. + +These components provide the runtime components necessary for building +an asynchronous application. + +[net]: https://docs.rs/tokio/0.1/tokio/net/index.html +[reactor]: https://docs.rs/tokio/0.1/tokio/reactor/index.html +[scheduler]: https://tokio-rs.github.io/tokio/tokio/runtime/index.html + +## Example + +A basic TCP echo server with Tokio: + +```rust +extern crate tokio; + +use tokio::prelude::*; +use tokio::io::copy; +use tokio::net::TcpListener; + +fn main() { + // Bind the server's socket. + let addr = "127.0.0.1:12345".parse().unwrap(); + let listener = TcpListener::bind(&addr) + .expect("unable to bind TCP listener"); + + // Pull out a stream of sockets for incoming connections + let server = listener.incoming() + .map_err(|e| eprintln!("accept failed = {:?}", e)) + .for_each(|sock| { + // Split up the reading and writing parts of the + // socket. + let (reader, writer) = sock.split(); + + // A future that echos the data and returns how + // many bytes were copied... + let bytes_copied = copy(reader, writer); + + // ... after which we'll print what happened. + let handle_conn = bytes_copied.map(|amt| { + println!("wrote {:?} bytes", amt) + }).map_err(|err| { + eprintln!("IO error {:?}", err) + }); + + // Spawn the future as a concurrent task. + tokio::spawn(handle_conn) + }); + + // Start the Tokio runtime + tokio::run(server); +} +``` + +More examples can be found [here](examples). + +## Getting Help + +First, see if the answer to your question can be found in the [Guides] or the +[API documentation]. If the answer is not there, there is an active community in +the [Tokio Gitter channel][chat]. We would be happy to try to answer your +question. Last, if that doesn't work, try opening an [issue] with the question. + +[chat]: https://gitter.im/tokio-rs/tokio +[issue]: https://github.com/tokio-rs/tokio/issues/new + +## Contributing + +:balloon: Thanks for your help improving the project! We are so happy to have +you! We have a [contributing guide][guide] to help you get involved in the Tokio +project. + +[guide]: CONTRIBUTING.md + +## Project layout + +The `tokio` crate, found at the root, is primarily intended for use by +application developers. Library authors should depend on the sub crates, which +have greater guarantees of stability. + +The crates included as part of Tokio are: + +* [`tokio-async-await`]: Experimental `async` / `await` support. + +* [`tokio-codec`]: Utilities for encoding and decoding protocol frames. + +* [`tokio-current-thread`]: Schedule the execution of futures on the current + thread. + +* [`tokio-executor`]: Task execution related traits and utilities. + +* [`tokio-fs`]: Filesystem (and standard in / out) APIs. + +* [`tokio-io`]: Asynchronous I/O related traits and utilities. + +* [`tokio-reactor`]: Event loop that drives I/O resources (like TCP and UDP + sockets). + +* [`tokio-tcp`]: TCP bindings for use with `tokio-io` and `tokio-reactor`. + +* [`tokio-threadpool`]: Schedules the execution of futures across a pool of + threads. + +* [ `tokio-timer`]: Time related APIs. + +* [`tokio-udp`]: UDP bindings for use with `tokio-io` and `tokio-reactor`. + +* [`tokio-uds`]: Unix Domain Socket bindings for use with `tokio-io` and + `tokio-reactor`. + +[`tokio-async-await`]: tokio-async-await +[`tokio-codec`]: tokio-codec +[`tokio-current-thread`]: tokio-current-thread +[`tokio-executor`]: tokio-executor +[`tokio-fs`]: tokio-fs +[`tokio-io`]: tokio-io +[`tokio-reactor`]: tokio-reactor +[`tokio-tcp`]: tokio-tcp +[`tokio-threadpool`]: tokio-threadpool +[`tokio-timer`]: tokio-timer +[`tokio-udp`]: tokio-udp +[`tokio-uds`]: tokio-uds + +## Supported Rust Versions + +Tokio is built against the latest stable, nightly, and beta Rust releases. The +minimum version supported is the stable release from three months before the +current stable release version. For example, if the latest stable Rust is 1.29, +the minimum version supported is 1.26. The current Tokio version is not +guaranteed to build on Rust versions earlier than the minimum supported version. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-0.1.11/benches/latency.rs b/third_party/rust/tokio-0.1.11/benches/latency.rs new file mode 100644 index 0000000000..c2619b7115 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/benches/latency.rs @@ -0,0 +1,117 @@ +#![feature(test)] +#![deny(warnings)] + +extern crate test; +#[macro_use] +extern crate futures; +extern crate tokio; + +use std::io; +use std::net::SocketAddr; +use std::thread; + +use futures::sync::oneshot; +use futures::sync::mpsc; +use futures::{Future, Poll, Sink, Stream}; +use test::Bencher; +use tokio::net::UdpSocket; + +/// UDP echo server +struct EchoServer { + socket: UdpSocket, + buf: Vec<u8>, + to_send: Option<(usize, SocketAddr)>, +} + +impl EchoServer { + fn new(s: UdpSocket) -> Self { + EchoServer { + socket: s, + to_send: None, + buf: vec![0u8; 1600], + } + } +} + +impl Future for EchoServer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + loop { + if let Some(&(size, peer)) = self.to_send.as_ref() { + try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); + self.to_send = None; + } + self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); + } + } +} + +#[bench] +fn udp_echo_latency(b: &mut Bencher) { + let any_addr = "127.0.0.1:0".to_string(); + let any_addr = any_addr.parse::<SocketAddr>().unwrap(); + + let (stop_c, stop_p) = oneshot::channel::<()>(); + let (tx, rx) = oneshot::channel(); + + let child = thread::spawn(move || { + + let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap(); + tx.send(socket.local_addr().unwrap()).unwrap(); + + let server = EchoServer::new(socket); + let server = server.select(stop_p.map_err(|_| panic!())); + let server = server.map_err(|_| ()); + server.wait().unwrap(); + }); + + + let client = std::net::UdpSocket::bind(&any_addr).unwrap(); + + let server_addr = rx.wait().unwrap(); + let mut buf = [0u8; 1000]; + + // warmup phase; for some reason initial couple of + // runs are much slower + // + // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? + for _ in 0..8 { + client.send_to(&buf, &server_addr).unwrap(); + let _ = client.recv_from(&mut buf).unwrap(); + } + + b.iter(|| { + client.send_to(&buf, &server_addr).unwrap(); + let _ = client.recv_from(&mut buf).unwrap(); + }); + + stop_c.send(()).unwrap(); + child.join().unwrap(); +} + +#[bench] +fn futures_channel_latency(b: &mut Bencher) { + let (mut in_tx, in_rx) = mpsc::channel(32); + let (out_tx, out_rx) = mpsc::channel::<_>(32); + + let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait()); + let mut rx_iter = out_rx.wait(); + + // warmup phase; for some reason initial couple of runs are much slower + // + // TODO: Describe the exact reasons; caching? branch predictor? lazy closures? + for _ in 0..8 { + in_tx.start_send(Ok(1usize)).unwrap(); + let _ = rx_iter.next(); + } + + b.iter(|| { + in_tx.start_send(Ok(1usize)).unwrap(); + let _ = rx_iter.next(); + }); + + drop(in_tx); + child.join().unwrap().unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/benches/mio-ops.rs b/third_party/rust/tokio-0.1.11/benches/mio-ops.rs new file mode 100644 index 0000000000..6a71bebfe0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/benches/mio-ops.rs @@ -0,0 +1,58 @@ +// Measure cost of different operations +// to get a sense of performance tradeoffs +#![feature(test)] +#![deny(warnings)] + +extern crate test; +extern crate mio; + +use test::Bencher; + +use mio::tcp::TcpListener; +use mio::{Token, Ready, PollOpt}; + + +#[bench] +fn mio_register_deregister(b: &mut Bencher) { + let addr = "127.0.0.1:0".parse().unwrap(); + // Setup the server socket + let sock = TcpListener::bind(&addr).unwrap(); + let poll = mio::Poll::new().unwrap(); + + const CLIENT: Token = Token(1); + + b.iter(|| { + poll.register(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + poll.deregister(&sock).unwrap(); + }); +} + +#[bench] +fn mio_reregister(b: &mut Bencher) { + let addr = "127.0.0.1:0".parse().unwrap(); + // Setup the server socket + let sock = TcpListener::bind(&addr).unwrap(); + let poll = mio::Poll::new().unwrap(); + + const CLIENT: Token = Token(1); + poll.register(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + + b.iter(|| { + poll.reregister(&sock, CLIENT, Ready::readable(), + PollOpt::edge()).unwrap(); + }); + poll.deregister(&sock).unwrap(); +} + +#[bench] +fn mio_poll(b: &mut Bencher) { + let poll = mio::Poll::new().unwrap(); + let timeout = std::time::Duration::new(0, 0); + let mut events = mio::Events::with_capacity(1024); + + b.iter(|| { + poll.poll(&mut events, Some(timeout)).unwrap(); + }); +} diff --git a/third_party/rust/tokio-0.1.11/benches/tcp.rs b/third_party/rust/tokio-0.1.11/benches/tcp.rs new file mode 100644 index 0000000000..fde72ce092 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/benches/tcp.rs @@ -0,0 +1,248 @@ +#![feature(test)] +#![deny(warnings)] + +extern crate futures; +extern crate tokio; + +#[macro_use] +extern crate tokio_io; + +pub extern crate test; + +mod prelude { + pub use futures::*; + pub use tokio::reactor::Reactor; + pub use tokio::net::{TcpListener, TcpStream}; + pub use tokio_io::io::read_to_end; + + pub use test::{self, Bencher}; + pub use std::thread; + pub use std::time::Duration; + pub use std::io::{self, Read, Write}; +} + +mod connect_churn { + use ::prelude::*; + + const NUM: usize = 300; + const CONCURRENT: usize = 8; + + #[bench] + fn one_thread(b: &mut Bencher) { + let addr = "127.0.0.1:0".parse().unwrap(); + + b.iter(move || { + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn a single future that accepts & drops connections + let serve_incomings = listener.incoming() + .map_err(|e| panic!("server err: {:?}", e)) + .for_each(|_| Ok(())); + + let connects = stream::iter_result((0..NUM).map(|_| { + Ok(TcpStream::connect(&addr) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + read_to_end(sock, vec![]) + })) + })); + + let connects_concurrent = connects.buffer_unordered(CONCURRENT) + .map_err(|e| panic!("client err: {:?}", e)) + .for_each(|_| Ok(())); + + serve_incomings.select(connects_concurrent) + .map(|_| ()).map_err(|_| ()) + .wait().unwrap(); + }); + } + + fn n_workers(n: usize, b: &mut Bencher) { + let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); + let (addr_tx, addr_rx) = sync::oneshot::channel(); + + // Spawn reactor thread + let server_thread = thread::spawn(move || { + // Bind the TCP listener + let listener = TcpListener::bind( + &"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Get the address being listened on. + let addr = listener.local_addr().unwrap(); + + // Send the remote & address back to the main thread + addr_tx.send(addr).unwrap(); + + // Spawn a single future that accepts & drops connections + let serve_incomings = listener.incoming() + .map_err(|e| panic!("server err: {:?}", e)) + .for_each(|_| Ok(())); + + // Run server + serve_incomings.select(shutdown_rx) + .map(|_| ()).map_err(|_| ()) + .wait().unwrap(); + }); + + // Get the bind addr of the server + let addr = addr_rx.wait().unwrap(); + + b.iter(move || { + use std::sync::{Barrier, Arc}; + + // Create a barrier to coordinate threads + let barrier = Arc::new(Barrier::new(n + 1)); + + // Spawn worker threads + let threads: Vec<_> = (0..n).map(|_| { + let barrier = barrier.clone(); + let addr = addr.clone(); + + thread::spawn(move || { + let connects = stream::iter_result((0..(NUM / n)).map(|_| { + Ok(TcpStream::connect(&addr) + .map_err(|e| panic!("connect err: {:?}", e)) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + read_to_end(sock, vec![]) + })) + })); + + barrier.wait(); + + connects.buffer_unordered(CONCURRENT) + .map_err(|e| panic!("client err: {:?}", e)) + .for_each(|_| Ok(())).wait().unwrap(); + }) + }).collect(); + + barrier.wait(); + + for th in threads { + th.join().unwrap(); + } + }); + + // Shutdown the server + shutdown_tx.send(()).unwrap(); + server_thread.join().unwrap(); + } + + #[bench] + fn two_threads(b: &mut Bencher) { + n_workers(1, b); + } + + #[bench] + fn multi_threads(b: &mut Bencher) { + n_workers(4, b); + } +} + +mod transfer { + use ::prelude::*; + use std::{cmp, mem}; + + const MB: usize = 3 * 1024 * 1024; + + struct Drain { + sock: TcpStream, + chunk: usize, + } + + impl Future for Drain { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + let mut buf: [u8; 1024] = unsafe { mem::uninitialized() }; + + loop { + match try_nb!(self.sock.read(&mut buf[..self.chunk])) { + 0 => return Ok(Async::Ready(())), + _ => {} + } + } + } + } + + struct Transfer { + sock: TcpStream, + rem: usize, + chunk: usize, + } + + impl Future for Transfer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + while self.rem > 0 { + let len = cmp::min(self.rem, self.chunk); + let buf = &DATA[..len]; + + let n = try_nb!(self.sock.write(&buf)); + self.rem -= n; + } + + Ok(Async::Ready(())) + } + } + + static DATA: [u8; 1024] = [0; 1024]; + + fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) { + let addr = "127.0.0.1:0".parse().unwrap(); + + b.iter(move || { + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn a single future that accepts 1 connection, Drain it and drops + let server = listener.incoming() + .into_future() // take the first connection + .map_err(|(e, _other_incomings)| e) + .map(|(connection, _other_incomings)| connection.unwrap()) + .and_then(|sock| { + sock.set_linger(Some(Duration::from_secs(0))).unwrap(); + let drain = Drain { + sock: sock, + chunk: read_size, + }; + drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e)) + }) + .map_err(|e| panic!("server err: {:?}", e)); + + let client = TcpStream::connect(&addr) + .and_then(move |sock| { + Transfer { + sock: sock, + rem: MB, + chunk: write_size, + } + }) + .map_err(|e| panic!("client err: {:?}", e)); + + server.join(client).wait().unwrap(); + }); + } + + mod small_chunks { + use ::prelude::*; + + #[bench] + fn one_thread(b: &mut Bencher) { + super::one_thread(b, 32, 32); + } + } + + mod big_chunks { + use ::prelude::*; + + #[bench] + fn one_thread(b: &mut Bencher) { + super::one_thread(b, 1_024, 1_024); + } + } +} diff --git a/third_party/rust/tokio-0.1.11/ci/tsan b/third_party/rust/tokio-0.1.11/ci/tsan new file mode 100644 index 0000000000..65d65eff24 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/ci/tsan @@ -0,0 +1,33 @@ +# TSAN suppressions file for Tokio + +# TSAN does not understand fences and `Arc::drop` is implemented using a fence. +# This causes many false positives. +race:Arc*drop +race:Weak*drop + +# `std` mpsc is not used in any Tokio code base. This race is triggered by some +# rust runtime logic. +race:std*mpsc_queue + +# Probably more fences in std. +race:__call_tls_dtors + +# The epoch-based GC uses fences. +race:crossbeam_epoch + +# Push and steal operations in crossbeam-deque may cause data races, but such +# data races are safe. If a data race happens, the value read by `steal` is +# forgotten and the steal operation is then retried. +race:crossbeam_deque*push +race:crossbeam_deque*steal + +# This filters out expected data race in the Treiber stack implementations. +# Treiber stacks are inherently racy. The pop operation will attempt to access +# the "next" pointer on the node it is attempting to pop. However, at this +# point it has not gained ownership of the node and another thread might beat +# it and take ownership of the node first (touching the next pointer). The +# original pop operation will fail due to the ABA guard, but tsan still picks +# up the access on the next pointer. +race:Backup::next_sleeper +race:Backup::set_next_sleeper +race:WorkerEntry::set_next_sleeper diff --git a/third_party/rust/tokio-0.1.11/examples/README.md b/third_party/rust/tokio-0.1.11/examples/README.md new file mode 100644 index 0000000000..63634c82b6 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/README.md @@ -0,0 +1,60 @@ +## Examples of how to use Tokio + +This directory contains a number of examples showcasing various capabilities of +the `tokio` crate. + +All examples can be executed with: + +``` +cargo run --example $name +``` + +A high level description of each example is: + +* [`hello_world`](hello_world.rs) - a tiny server that writes "hello world" to + all connected clients and then terminates the connection, should help see how + to create and initialize `tokio`. + +* [`echo`](echo.rs) - this is your standard TCP "echo server" which accepts + connections and then echos back any contents that are read from each connected + client. + +* [`print_each_packet`](print_each_packet.rs) - this server will create a TCP + listener, accept connections in a loop, and put down in the stdout everything + that's read off of each TCP connection. + +* [`echo-udp`](echo-udp.rs) - again your standard "echo server", except for UDP + instead of TCP. This will echo back any packets received to the original + sender. + +* [`connect`](connect.rs) - this is a `nc`-like clone which can be used to + interact with most other examples. The program creates a TCP connection or UDP + socket to sends all information read on stdin to the remote peer, displaying + any data received on stdout. Often quite useful when interacting with the + various other servers here! + +* [`chat`](chat.rs) - this spins up a local TCP server which will broadcast from + any connected client to all other connected clients. You can connect to this + in multiple terminals and use it to chat between the terminals. + +* [`chat-combinator`](chat-combinator.rs) - Similar to `chat`, but this uses a + much more functional programming approach using combinators. + +* [`proxy`](proxy.rs) - an example proxy server that will forward all connected + TCP clients to the remote address specified when starting the program. + +* [`tinyhttp`](tinyhttp.rs) - a tiny HTTP/1.1 server which doesn't support HTTP + request bodies showcasing running on multiple cores, working with futures and + spawning tasks, and finally framing a TCP connection to discrete + request/response objects. + +* [`tinydb`](tinydb.rs) - an in-memory database which shows sharing state + between all connected clients, notably the key/value store of this database. + +* [`udp-client`](udp-client.rs) - a simple `send_dgram`/`recv_dgram` example. + +* [`manual-runtime`](manual-runtime.rs) - manually composing a runtime. + +If you've got an example you'd like to see here, please feel free to open an +issue. Otherwise if you've got an example you'd like to add, please feel free +to make a PR! diff --git a/third_party/rust/tokio-0.1.11/examples/chat-combinator.rs b/third_party/rust/tokio-0.1.11/examples/chat-combinator.rs new file mode 100644 index 0000000000..1175418370 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/chat-combinator.rs @@ -0,0 +1,150 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a line-based server which accepts connections, reads lines from +//! those connections, and broadcasts the lines to all other connected clients. +//! +//! This example is similar to chat.rs, but uses combinators and a much more +//! functional style. +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another window run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate tokio; +extern crate futures; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +use std::collections::HashMap; +use std::iter; +use std::env; +use std::io::{BufReader}; +use std::sync::{Arc, Mutex}; + +fn main() { + // Create the TCP listener we'll accept connections on. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse().unwrap(); + + let socket = TcpListener::bind(&addr).unwrap(); + println!("Listening on: {}", addr); + + // This is running on the Tokio runtime, so it will be multi-threaded. The + // `Arc<Mutex<...>>` allows state to be shared across the threads. + let connections = Arc::new(Mutex::new(HashMap::new())); + + // The server task asynchronously iterates over and processes each incoming + // connection. + let srv = socket.incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |stream| { + // The client's socket address + let addr = stream.peer_addr().unwrap(); + + println!("New Connection: {}", addr); + + // Split the TcpStream into two separate handles. One handle for reading + // and one handle for writing. This lets us use separate tasks for + // reading and writing. + let (reader, writer) = stream.split(); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + connections.lock().unwrap().insert(addr, tx); + + // Define here what we do for the actual I/O. That is, read a bunch of + // lines from the socket and dispatch them while we also write any lines + // from other sockets. + let connections_inner = connections.clone(); + let reader = BufReader::new(reader); + + // Model the read portion of this socket by mapping an infinite + // iterator to each line off the socket. This "loop" is then + // terminated with an error once we hit EOF on the socket. + let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); + + let socket_reader = iter.fold(reader, move |reader, _| { + // Read a line off the socket, failing if we're at EOF + let line = io::read_until(reader, b'\n', Vec::new()); + let line = line.and_then(|(reader, vec)| { + if vec.len() == 0 { + Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) + } else { + Ok((reader, vec)) + } + }); + + // Convert the bytes we read into a string, and then send that + // string to all other connected clients. + let line = line.map(|(reader, vec)| { + (reader, String::from_utf8(vec)) + }); + + // Move the connection state into the closure below. + let connections = connections_inner.clone(); + + line.map(move |(reader, message)| { + println!("{}: {:?}", addr, message); + let mut conns = connections.lock().unwrap(); + + if let Ok(msg) = message { + // For each open connection except the sender, send the + // string via the channel. + let iter = conns.iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); + } + } else { + let tx = conns.get_mut(&addr).unwrap(); + tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap(); + } + + reader + }) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf<TcpStream>`. + let socket_writer = rx.fold(writer, |writer, msg| { + let amt = io::write_all(writer, msg.into_bytes()); + let amt = amt.map(|(writer, _)| writer); + amt.map_err(|_| ()) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connections = connections.clone(); + let socket_reader = socket_reader.map_err(|_| ()); + let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); + + // Spawn a task to process the connection + tokio::spawn(connection.then(move |_| { + connections.lock().unwrap().remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + })); + + Ok(()) + }); + + // execute server + tokio::run(srv); +} diff --git a/third_party/rust/tokio-0.1.11/examples/chat.rs b/third_party/rust/tokio-0.1.11/examples/chat.rs new file mode 100644 index 0000000000..bdc742c9d1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/chat.rs @@ -0,0 +1,474 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines sent by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! +//! You can run the `telnet` command in any number of additional windows. +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate tokio; +#[macro_use] +extern crate futures; +extern crate bytes; + +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use futures::sync::mpsc; +use futures::future::{self, Either}; +use bytes::{BytesMut, Bytes, BufMut}; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender<Bytes>; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver<Bytes>; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap<SocketAddr, Tx>, +} + +/// The state for each connected client. +struct Peer { + /// Name of the peer. + /// + /// When a client connects, the first line sent is treated as the client's + /// name (like alice or bob). The name is used to preface all messages that + /// arrive from the client so that we can simulate a real chat server: + /// + /// ```text + /// alice: Hello everyone. + /// bob: Welcome to telnet chat! + /// ``` + name: BytesMut, + + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Lines, + + /// Handle to the shared chat state. + /// + /// This is used to broadcast messages read off the socket to all connected + /// peers. + state: Arc<Mutex<Shared>>, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, + + /// Client socket address. + /// + /// The socket address is used as the key in the `peers` HashMap. The + /// address is saved so that the `Peer` drop implementation can clean up its + /// entry. + addr: SocketAddr, +} + +/// Line based codec +/// +/// This decorates a socket and presents a line based read / write interface. +/// +/// As a user of `Lines`, we can focus on working at the line level. So, we send +/// and receive values that represent entire lines. The `Lines` codec will +/// handle the encoding and decoding as well as reading from and writing to the +/// socket. +#[derive(Debug)] +struct Lines { + /// The TCP socket. + socket: TcpStream, + + /// Buffer used when reading from the socket. Data is not returned from this + /// buffer until an entire line has been read. + rd: BytesMut, + + /// Buffer used to stage data before writing it to the socket. + wr: BytesMut, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } +} + +impl Peer { + /// Create a new instance of `Peer`. + fn new(name: BytesMut, + state: Arc<Mutex<Shared>>, + lines: Lines) -> Peer + { + // Get the client socket address + let addr = lines.socket.peer_addr().unwrap(); + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded(); + + // Add an entry for this `Peer` in the shared state map. + state.lock().unwrap() + .peers.insert(addr, tx); + + Peer { + name, + lines, + state, + rx, + addr, + } + } +} + +/// This is where a connected client is managed. +/// +/// A `Peer` is also a future representing completely processing the client. +/// +/// When a `Peer` is created, the first line (representing the client's name) +/// has already been read. When the socket closes, the `Peer` future completes. +/// +/// While processing, the peer future implementation will: +/// +/// 1) Receive messages on its message channel and write them to the socket. +/// 2) Receive messages from the socket and broadcast them to all peers. +/// +impl Future for Peer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + // Tokio (and futures) use cooperative scheduling without any + // preemption. If a task never yields execution back to the executor, + // then other tasks may be starved. + // + // To deal with this, robust applications should not have any unbounded + // loops. In this example, we will read at most `LINES_PER_TICK` lines + // from the client on each tick. + // + // If the limit is hit, the current task is notified, informing the + // executor to schedule the task again asap. + const LINES_PER_TICK: usize = 10; + + // Receive all messages from peers. + for i in 0..LINES_PER_TICK { + // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is + // safe. + match self.rx.poll().unwrap() { + Async::Ready(Some(v)) => { + // Buffer the line. Once all lines are buffered, they will + // be flushed to the socket (right below). + self.lines.buffer(&v); + + // If this is the last iteration, the loop will break even + // though there could still be lines to read. Because we did + // not reach `Async::NotReady`, we have to notify ourselves + // in order to tell the executor to schedule the task again. + if i+1 == LINES_PER_TICK { + task::current().notify(); + } + } + _ => break, + } + } + + // Flush the write buffer to the socket + let _ = self.lines.poll_flush()?; + + // Read new lines from the socket + while let Async::Ready(line) = self.lines.poll()? { + println!("Received line ({:?}) : {:?}", self.name, line); + + if let Some(message) = line { + // Append the peer's name to the front of the line: + let mut line = self.name.clone(); + line.extend_from_slice(b": "); + line.extend_from_slice(&message); + line.extend_from_slice(b"\r\n"); + + // We're using `Bytes`, which allows zero-copy clones (by + // storing the data in an Arc internally). + // + // However, before cloning, we must freeze the data. This + // converts it from mutable -> immutable, allowing zero copy + // cloning. + let line = line.freeze(); + + // Now, send the line to all other peers + for (addr, tx) in &self.state.lock().unwrap().peers { + // Don't send the message to ourselves + if *addr != self.addr { + // The send only fails if the rx half has been dropped, + // however this is impossible as the `tx` half will be + // removed from the map before the `rx` is dropped. + tx.unbounded_send(line.clone()).unwrap(); + } + } + } else { + // EOF was reached. The remote client has disconnected. There is + // nothing more to do. + return Ok(Async::Ready(())); + } + } + + // As always, it is important to not just return `NotReady` without + // ensuring an inner future also returned `NotReady`. + // + // We know we got a `NotReady` from either `self.rx` or `self.lines`, so + // the contract is respected. + Ok(Async::NotReady) + } +} + +impl Drop for Peer { + fn drop(&mut self) { + self.state.lock().unwrap().peers + .remove(&self.addr); + } +} + +impl Lines { + /// Create a new `Lines` codec backed by the socket + fn new(socket: TcpStream) -> Self { + Lines { + socket, + rd: BytesMut::new(), + wr: BytesMut::new(), + } + } + + /// Buffer a line. + /// + /// This writes the line to an internal buffer. Calls to `poll_flush` will + /// attempt to flush this buffer to the socket. + fn buffer(&mut self, line: &[u8]) { + // Ensure the buffer has capacity. Ideally this would not be unbounded, + // but to keep the example simple, we will not limit this. + self.wr.reserve(line.len()); + + // Push the line onto the end of the write buffer. + // + // The `put` function is from the `BufMut` trait. + self.wr.put(line); + } + + /// Flush the write buffer to the socket + fn poll_flush(&mut self) -> Poll<(), io::Error> { + // As long as there is buffered data to write, try to write it. + while !self.wr.is_empty() { + // Try to write some bytes to the socket + let n = try_ready!(self.socket.poll_write(&self.wr)); + + // As long as the wr is not empty, a successful write should + // never write 0 bytes. + assert!(n > 0); + + // This discards the first `n` bytes of the buffer. + let _ = self.wr.split_to(n); + } + + Ok(Async::Ready(())) + } + + /// Read data from the socket. + /// + /// This only returns `Ready` when the socket has closed. + fn fill_read_buf(&mut self) -> Poll<(), io::Error> { + loop { + // Ensure the read buffer has capacity. + // + // This might result in an internal allocation. + self.rd.reserve(1024); + + // Read data into the buffer. + let n = try_ready!(self.socket.read_buf(&mut self.rd)); + + if n == 0 { + return Ok(Async::Ready(())); + } + } + } +} + +impl Stream for Lines { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // First, read any new data that might have been received off the socket + let sock_closed = self.fill_read_buf()?.is_ready(); + + // Now, try finding lines + let pos = self.rd.windows(2).enumerate() + .find(|&(_, bytes)| bytes == b"\r\n") + .map(|(i, _)| i); + + if let Some(pos) = pos { + // Remove the line from the read buffer and set it to `line`. + let mut line = self.rd.split_to(pos + 2); + + // Drop the trailing \r\n + line.split_off(pos); + + // Return the line + return Ok(Async::Ready(Some(line))); + } + + if sock_closed { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +/// Spawn a task to manage the socket. +/// +/// This will read the first line from the socket to identify the client, then +/// add the client to the set of connected peers in the chat service. +fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { + // Wrap the socket with the `Lines` codec that we wrote above. + // + // By doing this, we can operate at the line level instead of doing raw byte + // manipulation. + let lines = Lines::new(socket); + + // The first line is treated as the client's name. The client is not added + // to the set of connected peers until this line is received. + // + // We use the `into_future` combinator to extract the first item from the + // lines stream. `into_future` takes a `Stream` and converts it to a future + // of `(first, rest)` where `rest` is the original stream instance. + let connection = lines.into_future() + // `into_future` doesn't have the right error type, so map the error to + // make it work. + .map_err(|(e, _)| e) + // Process the first received line as the client's name. + .and_then(|(name, lines)| { + // If `name` is `None`, then the client disconnected without + // actually sending a line of data. + // + // Since the connection is closed, there is no further work that we + // need to do. So, we just terminate processing by returning + // `future::ok()`. + // + // The problem is that only a single future type can be returned + // from a combinator closure, but we want to return both + // `future::ok()` and `Peer` (below). + // + // This is a common problem, so the `futures` crate solves this by + // providing the `Either` helper enum that allows creating a single + // return type that covers two concrete future types. + let name = match name { + Some(name) => name, + None => { + // The remote client closed the connection without sending + // any data. + return Either::A(future::ok(())); + } + }; + + println!("`{:?}` is joining the chat", name); + + // Create the peer. + // + // This is also a future that processes the connection, only + // completing when the socket closes. + let peer = Peer::new( + name, + state, + lines); + + // Wrap `peer` with `Either::B` to make the return type fit. + Either::B(peer) + }) + // Task futures have an error of type `()`, this ensures we handle the + // error. We do this by printing the error to STDOUT. + .map_err(|e| { + println!("connection error = {:?}", e); + }); + + // Spawn the task. Internally, this submits the task to a thread pool. + tokio::spawn(connection); +} + +pub fn main() { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Arc::new(Mutex::new(Shared::new())); + + let addr = "127.0.0.1:6142".parse().unwrap(); + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr).unwrap(); + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener.incoming().for_each(move |socket| { + // Spawn a task to process the connection + process(socket, state.clone()); + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); + + println!("server running on localhost:6142"); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + // + // In our example, we have not defined a shutdown strategy, so this will + // block until `ctrl-c` is pressed at the terminal. + tokio::run(server); +} diff --git a/third_party/rust/tokio-0.1.11/examples/connect.rs b/third_party/rust/tokio-0.1.11/examples/connect.rs new file mode 100644 index 0000000000..fa3824c4a1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/connect.rs @@ -0,0 +1,245 @@ +//! An example of hooking up stdin/stdout to either a TCP or UDP stream. +//! +//! This example will connect to a socket address specified in the argument list +//! and then forward all data read on stdin to the server, printing out all data +//! received on stdout. An optional `--udp` argument can be passed to specify +//! that the connection should be made over UDP instead of TCP, translating each +//! line entered on stdin to a UDP packet to be sent to the remote address. +//! +//! Note that this is not currently optimized for performance, especially +//! around buffer management. Rather it's intended to show an example of +//! working with a client. +//! +//! This example can be quite useful when interacting with the other examples in +//! this repository! Many of them recommend running this as a simple "hook up +//! stdin/stdout to a server" to get up and running. + +#![deny(warnings)] + +extern crate tokio; +extern crate tokio_io; +extern crate futures; +extern crate bytes; + +use std::env; +use std::io::{self, Read, Write}; +use std::net::SocketAddr; +use std::thread; + +use tokio::prelude::*; +use futures::sync::mpsc; + +fn main() { + // Determine if we're going to run in TCP or UDP mode + let mut args = env::args().skip(1).collect::<Vec<_>>(); + let tcp = match args.iter().position(|a| a == "--udp") { + Some(i) => { + args.remove(i); + false + } + None => true, + }; + + // Parse what address we're going to connect to + let addr = args.first().unwrap_or_else(|| { + panic!("this program requires at least one argument") + }); + let addr = addr.parse::<SocketAddr>().unwrap(); + + // Right now Tokio doesn't support a handle to stdin running on the event + // loop, so we farm out that work to a separate thread. This thread will + // read data (with blocking I/O) from stdin and then send it to the event + // loop over a standard futures channel. + let (stdin_tx, stdin_rx) = mpsc::channel(0); + thread::spawn(|| read_stdin(stdin_tx)); + let stdin_rx = stdin_rx.map_err(|_| panic!()); // errors not possible on rx + + // Now that we've got our stdin read we either set up our TCP connection or + // our UDP connection to get a stream of bytes we're going to emit to + // stdout. + let stdout = if tcp { + tcp::connect(&addr, Box::new(stdin_rx)) + } else { + udp::connect(&addr, Box::new(stdin_rx)) + }; + + // And now with our stream of bytes to write to stdout, we execute that in + // the event loop! Note that this is doing blocking I/O to emit data to + // stdout, and in general it's a no-no to do that sort of work on the event + // loop. In this case, though, we know it's ok as the event loop isn't + // otherwise running anything useful. + let mut out = io::stdout(); + + tokio::run({ + stdout + .for_each(move |chunk| { + out.write_all(&chunk) + }) + .map_err(|e| println!("error reading stdout; error = {:?}", e)) + }); +} + +mod codec { + use std::io; + use bytes::{BufMut, BytesMut}; + use tokio::codec::{Encoder, Decoder}; + + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP/UDP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + pub struct Bytes; + + impl Decoder for Bytes { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } + } + + impl Encoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } + } +} + +mod tcp { + use tokio; + use tokio::net::TcpStream; + use tokio::prelude::*; + use tokio::codec::Decoder; + + use bytes::BytesMut; + use codec::Bytes; + + use std::io; + use std::net::SocketAddr; + + pub fn connect(addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) + -> Box<Stream<Item = BytesMut, Error = io::Error> + Send> + { + let tcp = TcpStream::connect(addr); + + // After the TCP connection has been established, we set up our client + // to start forwarding data. + // + // First we use the `Io::framed` method with a simple implementation of + // a `Codec` (listed below) that just ships bytes around. We then split + // that in two to work with the stream and sink separately. + // + // Half of the work we're going to do is to take all data we receive on + // `stdin` and send that along the TCP stream (`sink`). The second half + // is to take all the data we receive (`stream`) and then write that to + // stdout. We'll be passing this handle back out from this method. + // + // You'll also note that we *spawn* the work to read stdin and write it + // to the TCP stream. This is done to ensure that happens concurrently + // with us reading data from the stream. + Box::new(tcp.map(move |stream| { + let (sink, stream) = Bytes.framed(stream).split(); + + tokio::spawn(stdin.forward(sink).then(|result| { + if let Err(e) = result { + panic!("failed to write to socket: {}", e) + } + Ok(()) + })); + + stream + }).flatten_stream()) + } +} + +mod udp { + use std::io; + use std::net::SocketAddr; + + use tokio; + use tokio::net::{UdpSocket, UdpFramed}; + use tokio::prelude::*; + use bytes::BytesMut; + + use codec::Bytes; + + pub fn connect(&addr: &SocketAddr, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>) + -> Box<Stream<Item = BytesMut, Error = io::Error> + Send> + { + // We'll bind our UDP socket to a local IP/port, but for now we + // basically let the OS pick both of those. + let addr_to_bind = if addr.ip().is_ipv4() { + "0.0.0.0:0".parse().unwrap() + } else { + "[::]:0".parse().unwrap() + }; + let udp = UdpSocket::bind(&addr_to_bind) + .expect("failed to bind socket"); + + // Like above with TCP we use an instance of `Bytes` codec to transform + // this UDP socket into a framed sink/stream which operates over + // discrete values. In this case we're working with *pairs* of socket + // addresses and byte buffers. + let (sink, stream) = UdpFramed::new(udp, Bytes).split(); + + // All bytes from `stdin` will go to the `addr` specified in our + // argument list. Like with TCP this is spawned concurrently + let forward_stdin = stdin.map(move |chunk| { + (chunk, addr) + }).forward(sink).then(|result| { + if let Err(e) = result { + panic!("failed to write to socket: {}", e) + } + Ok(()) + }); + + // With UDP we could receive data from any source, so filter out + // anything coming from a different address + let receive = stream.filter_map(move |(chunk, src)| { + if src == addr { + Some(chunk.into()) + } else { + None + } + }); + + Box::new(future::lazy(|| { + tokio::spawn(forward_stdin); + future::ok(receive) + }).flatten_stream()) + } +} + +// Our helper method which will read data from stdin and send it along the +// sender provided. +fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) { + let mut stdin = io::stdin(); + loop { + let mut buf = vec![0; 1024]; + let n = match stdin.read(&mut buf) { + Err(_) | + Ok(0) => break, + Ok(n) => n, + }; + buf.truncate(n); + tx = match tx.send(buf).wait() { + Ok(tx) => tx, + Err(_) => break, + }; + } +} diff --git a/third_party/rust/tokio-0.1.11/examples/echo-udp.rs b/third_party/rust/tokio-0.1.11/examples/echo-udp.rs new file mode 100644 index 0000000000..89cc3d16e0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/echo-udp.rs @@ -0,0 +1,73 @@ +//! An UDP echo server that just sends back everything that it receives. +//! +//! If you're on Unix you can test this out by in one terminal executing: +//! +//! cargo run --example echo-udp +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect -- --udp 127.0.0.1:8080 +//! +//! Each line you type in to the `nc` terminal should be echo'd back to you! + +#![deny(warnings)] + +#[macro_use] +extern crate futures; +extern crate tokio; + +use std::{env, io}; +use std::net::SocketAddr; + +use tokio::prelude::*; +use tokio::net::UdpSocket; + +struct Server { + socket: UdpSocket, + buf: Vec<u8>, + to_send: Option<(usize, SocketAddr)>, +} + +impl Future for Server { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + loop { + // First we check to see if there's a message we need to echo back. + // If so then we try to send it back to the original source, waiting + // until it's writable and we're able to do so. + if let Some((size, peer)) = self.to_send { + let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); + println!("Echoed {}/{} bytes to {}", amt, size, peer); + self.to_send = None; + } + + // If we're here then `to_send` is `None`, so we take a look for the + // next message we're going to echo back. + self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); + } + } +} + +fn main() { + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>().unwrap(); + + let socket = UdpSocket::bind(&addr).unwrap(); + println!("Listening on: {}", socket.local_addr().unwrap()); + + let server = Server { + socket: socket, + buf: vec![0; 1024], + to_send: None, + }; + + // This starts the server task. + // + // `map_err` handles the error by logging it and maps the future to a type + // that can be spawned. + // + // `tokio::run` spawns the task on the Tokio runtime and starts running. + tokio::run(server.map_err(|e| println!("server error = {:?}", e))); +} diff --git a/third_party/rust/tokio-0.1.11/examples/echo.rs b/third_party/rust/tokio-0.1.11/examples/echo.rs new file mode 100644 index 0000000000..92d65a90ff --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/echo.rs @@ -0,0 +1,114 @@ +//! A "hello world" echo server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! write back everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example echo +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be echo'd back to +//! you! If you open up multiple terminals running the `connect` example you +//! should be able to see them all make progress simultaneously. + +#![deny(warnings)] + +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +use std::env; +use std::net::SocketAddr; + +fn main() { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>().unwrap(); + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let socket = TcpListener::bind(&addr).unwrap(); + println!("Listening on: {}", addr); + + // Here we convert the `TcpListener` to a stream of incoming connections + // with the `incoming` method. We then define how to process each element in + // the stream with the `for_each` method. + // + // This combinator, defined on the `Stream` trait, will allow us to define a + // computation to happen for all items on the stream (in this case TCP + // connections made to the server). The return value of the `for_each` + // method is itself a future representing processing the entire stream of + // connections, and ends up being our server. + let done = socket.incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + // Once we're inside this closure this represents an accepted client + // from our server. The `socket` is the client connection (similar to + // how the standard library operates). + // + // We just want to copy all data read from the socket back onto the + // socket itself (e.g. "echo"). We can use the standard `io::copy` + // combinator in the `tokio-core` crate to do precisely this! + // + // The `copy` function takes two arguments, where to read from and where + // to write to. We only have one argument, though, with `socket`. + // Luckily there's a method, `Io::split`, which will split an Read/Write + // stream into its two halves. This operation allows us to work with + // each stream independently, such as pass them as two arguments to the + // `copy` function. + // + // The `copy` function then returns a future, and this future will be + // resolved when the copying operation is complete, resolving to the + // amount of data that was copied. + let (reader, writer) = socket.split(); + let amt = io::copy(reader, writer); + + // After our copy operation is complete we just print out some helpful + // information. + let msg = amt.then(move |result| { + match result { + Ok((amt, _, _)) => println!("wrote {} bytes", amt), + Err(e) => println!("error: {}", e), + } + + Ok(()) + }); + + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // This function will transfer ownership of the future (`msg` in this + // case) to the Tokio runtime thread pool that. The thread pool will + // drive the future to completion. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(msg) + }); + + // And finally now that we've define what our server is, we run it! + // + // This starts the Tokio runtime, spawns the server task, and blocks the + // current thread until all tasks complete execution. Since the `done` task + // never completes (it just keeps accepting sockets), `tokio::run` blocks + // forever (until ctrl-c is pressed). + tokio::run(done); +} diff --git a/third_party/rust/tokio-0.1.11/examples/hello_world.rs b/third_party/rust/tokio-0.1.11/examples/hello_world.rs new file mode 100644 index 0000000000..398ec11aac --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/hello_world.rs @@ -0,0 +1,70 @@ +//! Hello world server. +//! +//! A simple server that accepts connections, writes "hello world\n", and closes +//! the connection. +//! +//! You can test this out by running: +//! +//! cargo run --example hello_world +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! + +#![deny(warnings)] + +extern crate tokio; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::prelude::*; + +pub fn main() { + let addr = "127.0.0.1:6142".parse().unwrap(); + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr).unwrap(); + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener.incoming().for_each(|socket| { + println!("accepted socket; addr={:?}", socket.peer_addr().unwrap()); + + let connection = io::write_all(socket, "hello world\n") + .then(|res| { + println!("wrote message; success={:?}", res.is_ok()); + Ok(()) + }); + + // Spawn a new task that processes the socket: + tokio::spawn(connection); + + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); + + println!("server running on localhost:6142"); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + // + // In our example, we have not defined a shutdown strategy, so this will + // block until `ctrl-c` is pressed at the terminal. + tokio::run(server); +} diff --git a/third_party/rust/tokio-0.1.11/examples/manual-runtime.rs b/third_party/rust/tokio-0.1.11/examples/manual-runtime.rs new file mode 100644 index 0000000000..6cbb8cd45c --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/manual-runtime.rs @@ -0,0 +1,86 @@ +//! An example how to manually assemble a runtime and run some tasks on it. +//! +//! This is closer to the single-threaded runtime than the default tokio one, as it is simpler to +//! grasp. There are conceptually similar, but the multi-threaded one would be more code. If you +//! just want to *use* a single-threaded runtime, use the one provided by tokio directly +//! (`tokio::runtime::current_thread::Runtime::new()`. This is a demonstration only. +//! +//! Note that the error handling is a bit left out. Also, the `run` could be modified to return the +//! result of the provided future. + +extern crate futures; +extern crate tokio; +extern crate tokio_current_thread; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_timer; + +use std::io::Error as IoError; +use std::time::{Duration, Instant}; + +use futures::{future, Future}; +use tokio_current_thread::CurrentThread; +use tokio_reactor::Reactor; +use tokio_timer::timer::{self, Timer}; + +/// Creates a "runtime". +/// +/// This is similar to running `tokio::runtime::current_thread::Runtime::new()`. +fn run<F: Future<Item = (), Error = ()>>(f: F) -> Result<(), IoError> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new(reactor); + let timer_handle = timer.handle(); + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor generate some new stimuli for the + // futures to continue in their life. + let mut executor = CurrentThread::new_with_park(timer); + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + // This will set the default handle and timer to use inside the closure and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the current single-threaded + // executor when used. This is a trick, because we need two mutable references to the + // executor (one to run the provided future, another to install as the default one). We + // use the fake one here as the default one. + let mut default_executor = tokio_current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + // Run the provided future + executor.block_on(f).unwrap(); + // Run all the other futures that are still left in the executor + executor.run().unwrap(); + }); + }); + }); + Ok(()) +} + +fn main() { + run(future::lazy(|| { + // Here comes the application logic. It can spawn further tasks by tokio_current_thread::spawn(). + // It also can use the default reactor and create timeouts. + + // Connect somewhere. And then do nothing with it. Yes, useless. + // + // This will use the default reactor which runs in the current thread. + let connect = tokio::net::TcpStream::connect(&"127.0.0.1:53".parse().unwrap()) + .map(|_| println!("Connected")) + .map_err(|e| println!("Failed to connect: {}", e)); + // We can spawn it without requiring Send. This would panic if we run it outside of the + // `run` (or outside of anything else) + tokio_current_thread::spawn(connect); + + // We can also create timeouts. + let deadline = tokio::timer::Delay::new(Instant::now() + Duration::from_secs(5)) + .map(|()| println!("5 seconds are over")) + .map_err(|e| println!("Failed to wait: {}", e)); + // We can spawn on the default executor, which is also the local one. + tokio::executor::spawn(deadline); + Ok(()) + })).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/examples/print_each_packet.rs b/third_party/rust/tokio-0.1.11/examples/print_each_packet.rs new file mode 100644 index 0000000000..644d144cf8 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/print_each_packet.rs @@ -0,0 +1,149 @@ +//! A "print-each-packet" server with Tokio +//! +//! This server will create a TCP listener, accept connections in a loop, and +//! put down in the stdout everything that's read off of each TCP connection. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! To see this server in action, you can run this in one terminal: +//! +//! cargo run --example print\_each\_packet +//! +//! and in another terminal you can run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! Each line you type in to the `connect` terminal should be written to terminal! +//! +//! Minimal js example: +//! +//! ```js +//! var net = require("net"); +//! +//! var listenPort = 8080; +//! +//! var server = net.createServer(function (socket) { +//! socket.on("data", function (bytes) { +//! console.log("bytes", bytes); +//! }); +//! +//! socket.on("end", function() { +//! console.log("Socket received FIN packet and closed connection"); +//! }); +//! socket.on("error", function (error) { +//! console.log("Socket closed with error", error); +//! }); +//! +//! socket.on("close", function (with_error) { +//! if (with_error) { +//! console.log("Socket closed with result: Err(SomeError)"); +//! } else { +//! console.log("Socket closed with result: Ok(())"); +//! } +//! }); +//! +//! }); +//! +//! server.listen(listenPort); +//! +//! console.log("Listening on:", listenPort); +//! ``` +//! + +#![deny(warnings)] + +extern crate tokio; +extern crate tokio_codec; + +use tokio_codec::BytesCodec; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio::codec::Decoder; + +use std::env; +use std::net::SocketAddr; + +fn main() { + // Allow passing an address to listen on as the first argument of this + // program, but otherwise we'll just set up our TCP listener on + // 127.0.0.1:8080 for connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>().unwrap(); + + // Next up we create a TCP listener which will listen for incoming + // connections. This TCP listener is bound to the address we determined + // above and must be associated with an event loop, so we pass in a handle + // to our event loop. After the socket's created we inform that we're ready + // to go and start accepting connections. + let socket = TcpListener::bind(&addr).unwrap(); + println!("Listening on: {}", addr); + + // Here we convert the `TcpListener` to a stream of incoming connections + // with the `incoming` method. We then define how to process each element in + // the stream with the `for_each` method. + // + // This combinator, defined on the `Stream` trait, will allow us to define a + // computation to happen for all items on the stream (in this case TCP + // connections made to the server). The return value of the `for_each` + // method is itself a future representing processing the entire stream of + // connections, and ends up being our server. + let done = socket + .incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + // Once we're inside this closure this represents an accepted client + // from our server. The `socket` is the client connection (similar to + // how the standard library operates). + // + // We're parsing each socket with the `BytesCodec` included in `tokio_io`, + // and then we `split` each codec into the reader/writer halves. + // + // See https://docs.rs/tokio-codec/0.1/src/tokio_codec/bytes_codec.rs.html + let framed = BytesCodec::new().framed(socket); + let (_writer, reader) = framed.split(); + + let processor = reader + .for_each(|bytes| { + println!("bytes: {:?}", bytes); + Ok(()) + }) + // After our copy operation is complete we just print out some helpful + // information. + .and_then(|()| { + println!("Socket received FIN packet and closed connection"); + Ok(()) + }) + .or_else(|err| { + println!("Socket closed with error: {:?}", err); + // We have to return the error to catch it in the next ``.then` call + Err(err) + }) + .then(|result| { + println!("Socket closed with result: {:?}", result); + Ok(()) + }); + + // And this is where much of the magic of this server happens. We + // crucially want all clients to make progress concurrently, rather than + // blocking one on completion of another. To achieve this we use the + // `tokio::spawn` function to execute the work in the background. + // + // This function will transfer ownership of the future (`msg` in this + // case) to the Tokio runtime thread pool that. The thread pool will + // drive the future to completion. + // + // Essentially here we're executing a new task to run concurrently, + // which will allow all of our clients to be processed concurrently. + tokio::spawn(processor) + }); + + // And finally now that we've define what our server is, we run it! + // + // This starts the Tokio runtime, spawns the server task, and blocks the + // current thread until all tasks complete execution. Since the `done` task + // never completes (it just keeps accepting sockets), `tokio::run` blocks + // forever (until ctrl-c is pressed). + tokio::run(done); +} diff --git a/third_party/rust/tokio-0.1.11/examples/proxy.rs b/third_party/rust/tokio-0.1.11/examples/proxy.rs new file mode 100644 index 0000000000..bed8314a31 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/proxy.rs @@ -0,0 +1,128 @@ +//! A proxy that forwards data to another server and forwards that server's +//! responses back to clients. +//! +//! Because the Tokio runtime uses a thread pool, each TCP connection is +//! processed concurrently with all other TCP connections across multiple +//! threads. +//! +//! You can showcase this by running this in one terminal: +//! +//! cargo run --example proxy +//! +//! This in another terminal +//! +//! cargo run --example echo +//! +//! And finally this in another terminal +//! +//! cargo run --example connect 127.0.0.1:8081 +//! +//! This final terminal will connect to our proxy, which will in turn connect to +//! the echo server, and you'll be able to see data flowing between them. + +#![deny(warnings)] + +extern crate tokio; + +use std::sync::{Arc, Mutex}; +use std::env; +use std::net::{Shutdown, SocketAddr}; +use std::io::{self, Read, Write}; + +use tokio::io::{copy, shutdown}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +fn main() { + let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string()); + let listen_addr = listen_addr.parse::<SocketAddr>().unwrap(); + + let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string()); + let server_addr = server_addr.parse::<SocketAddr>().unwrap(); + + // Create a TCP listener which will listen for incoming connections. + let socket = TcpListener::bind(&listen_addr).unwrap(); + println!("Listening on: {}", listen_addr); + println!("Proxying to: {}", server_addr); + + let done = socket.incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |client| { + let server = TcpStream::connect(&server_addr); + let amounts = server.and_then(move |server| { + // Create separate read/write handles for the TCP clients that we're + // proxying data between. Note that typically you'd use + // `AsyncRead::split` for this operation, but we want our writer + // handles to have a custom implementation of `shutdown` which + // actually calls `TcpStream::shutdown` to ensure that EOF is + // transmitted properly across the proxied connection. + // + // As a result, we wrap up our client/server manually in arcs and + // use the impls below on our custom `MyTcpStream` type. + let client_reader = MyTcpStream(Arc::new(Mutex::new(client))); + let client_writer = client_reader.clone(); + let server_reader = MyTcpStream(Arc::new(Mutex::new(server))); + let server_writer = server_reader.clone(); + + // Copy the data (in parallel) between the client and the server. + // After the copy is done we indicate to the remote side that we've + // finished by shutting down the connection. + let client_to_server = copy(client_reader, server_writer) + .and_then(|(n, _, server_writer)| { + shutdown(server_writer).map(move |_| n) + }); + + let server_to_client = copy(server_reader, client_writer) + .and_then(|(n, _, client_writer)| { + shutdown(client_writer).map(move |_| n) + }); + + client_to_server.join(server_to_client) + }); + + let msg = amounts.map(move |(from_client, from_server)| { + println!("client wrote {} bytes and received {} bytes", + from_client, from_server); + }).map_err(|e| { + // Don't panic. Maybe the client just disconnected too soon. + println!("error: {}", e); + }); + + tokio::spawn(msg); + + Ok(()) + }); + + tokio::run(done); +} + +// This is a custom type used to have a custom implementation of the +// `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to +// notify the remote end that we're done writing. +#[derive(Clone)] +struct MyTcpStream(Arc<Mutex<TcpStream>>); + +impl Read for MyTcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.lock().unwrap().read(buf) + } +} + +impl Write for MyTcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.lock().unwrap().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncRead for MyTcpStream {} + +impl AsyncWrite for MyTcpStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + try!(self.0.lock().unwrap().shutdown(Shutdown::Write)); + Ok(().into()) + } +} diff --git a/third_party/rust/tokio-0.1.11/examples/tinydb.rs b/third_party/rust/tokio-0.1.11/examples/tinydb.rs new file mode 100644 index 0000000000..134d01b15a --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/tinydb.rs @@ -0,0 +1,206 @@ +//! A "tiny database" and accompanying protocol +//! +//! This example shows the usage of shared state amongst all connected clients, +//! namely a database of key/value pairs. Each connected client can send a +//! series of GET/SET commands to query the current value of a key or set the +//! value of a key. +//! +//! This example has a simple protocol you can use to interact with the server. +//! To run, first run this in one terminal window: +//! +//! cargo run --example tinydb +//! +//! and next in another windows run: +//! +//! cargo run --example connect 127.0.0.1:8080 +//! +//! In the `connect` window you can type in commands where when you hit enter +//! you'll get a response from the server for that command. An example session +//! is: +//! +//! +//! $ cargo run --example connect 127.0.0.1:8080 +//! GET foo +//! foo = bar +//! GET FOOBAR +//! error: no key FOOBAR +//! SET FOOBAR my awesome string +//! set FOOBAR = `my awesome string`, previous: None +//! SET foo tokio +//! set foo = `tokio`, previous: Some("bar") +//! GET foo +//! foo = tokio +//! +//! Namely you can issue two forms of commands: +//! +//! * `GET $key` - this will fetch the value of `$key` from the database and +//! return it. The server's database is initially populated with the key `foo` +//! set to the value `bar` +//! * `SET $key $value` - this will set the value of `$key` to `$value`, +//! returning the previous value, if any. + +#![deny(warnings)] + +extern crate tokio; + +use std::collections::HashMap; +use std::io::BufReader; +use std::env; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +use tokio::io::{lines, write_all}; +use tokio::net::TcpListener; +use tokio::prelude::*; + +/// The in-memory database shared amongst all clients. +/// +/// This database will be shared via `Arc`, so to mutate the internal map we're +/// going to use a `Mutex` for interior mutability. +struct Database { + map: Mutex<HashMap<String, String>>, +} + +/// Possible requests our clients can send us +enum Request { + Get { key: String }, + Set { key: String, value: String }, +} + +/// Responses to the `Request` commands above +enum Response { + Value { key: String, value: String }, + Set { key: String, value: String, previous: Option<String> }, + Error { msg: String }, +} + +fn main() { + // Parse the address we're going to run this server on + // and set up our TCP listener to accept connections. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>().unwrap(); + let listener = TcpListener::bind(&addr).expect("failed to bind"); + println!("Listening on: {}", addr); + + // Create the shared state of this server that will be shared amongst all + // clients. We populate the initial database and then create the `Database` + // structure. Note the usage of `Arc` here which will be used to ensure that + // each independently spawned client will have a reference to the in-memory + // database. + let mut initial_db = HashMap::new(); + initial_db.insert("foo".to_string(), "bar".to_string()); + let db = Arc::new(Database { + map: Mutex::new(initial_db), + }); + + let done = listener.incoming() + .map_err(|e| println!("error accepting socket; error = {:?}", e)) + .for_each(move |socket| { + // As with many other small examples, the first thing we'll do is + // *split* this TCP stream into two separately owned halves. This'll + // allow us to work with the read and write halves independently. + let (reader, writer) = socket.split(); + + // Since our protocol is line-based we use `tokio_io`'s `lines` utility + // to convert our stream of bytes, `reader`, into a `Stream` of lines. + let lines = lines(BufReader::new(reader)); + + // Here's where the meat of the processing in this server happens. First + // we see a clone of the database being created, which is creating a + // new reference for this connected client to use. Also note the `move` + // keyword on the closure here which moves ownership of the reference + // into the closure, which we'll need for spawning the client below. + // + // The `map` function here means that we'll run some code for all + // requests (lines) we receive from the client. The actual handling here + // is pretty simple, first we parse the request and if it's valid we + // generate a response based on the values in the database. + let db = db.clone(); + let responses = lines.map(move |line| { + let request = match Request::parse(&line) { + Ok(req) => req, + Err(e) => return Response::Error { msg: e }, + }; + + let mut db = db.map.lock().unwrap(); + match request { + Request::Get { key } => { + match db.get(&key) { + Some(value) => Response::Value { key, value: value.clone() }, + None => Response::Error { msg: format!("no key {}", key) }, + } + } + Request::Set { key, value } => { + let previous = db.insert(key.clone(), value.clone()); + Response::Set { key, value, previous } + } + } + }); + + // At this point `responses` is a stream of `Response` types which we + // now want to write back out to the client. To do that we use + // `Stream::fold` to perform a loop here, serializing each response and + // then writing it out to the client. + let writes = responses.fold(writer, |writer, response| { + let mut response = response.serialize(); + response.push('\n'); + write_all(writer, response.into_bytes()).map(|(w, _)| w) + }); + + // Like with other small servers, we'll `spawn` this client to ensure it + // runs concurrently with all other clients, for now ignoring any errors + // that we see. + let msg = writes.then(move |_| Ok(())); + + tokio::spawn(msg) + }); + + tokio::run(done); +} + +impl Request { + fn parse(input: &str) -> Result<Request, String> { + let mut parts = input.splitn(3, " "); + match parts.next() { + Some("GET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("GET must be followed by a key")), + }; + if parts.next().is_some() { + return Err(format!("GET's key must not be followed by anything")) + } + Ok(Request::Get { key: key.to_string() }) + } + Some("SET") => { + let key = match parts.next() { + Some(key) => key, + None => return Err(format!("SET must be followed by a key")), + }; + let value = match parts.next() { + Some(value) => value, + None => return Err(format!("SET needs a value")), + }; + Ok(Request::Set { key: key.to_string(), value: value.to_string() }) + } + Some(cmd) => Err(format!("unknown command: {}", cmd)), + None => Err(format!("empty input")), + } + } +} + +impl Response { + fn serialize(&self) -> String { + match *self { + Response::Value { ref key, ref value } => { + format!("{} = {}", key, value) + } + Response::Set { ref key, ref value, ref previous } => { + format!("set {} = `{}`, previous: {:?}", key, value, previous) + } + Response::Error { ref msg } => { + format!("error: {}", msg) + } + } + } +} diff --git a/third_party/rust/tokio-0.1.11/examples/tinyhttp.rs b/third_party/rust/tokio-0.1.11/examples/tinyhttp.rs new file mode 100644 index 0000000000..1e4f22bde6 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/tinyhttp.rs @@ -0,0 +1,308 @@ +//! A "tiny" example of HTTP request/response handling using just tokio-core +//! +//! This example is intended for *learning purposes* to see how various pieces +//! hook up together and how HTTP can get up and running. Note that this example +//! is written with the restriction that it *can't* use any "big" library other +//! than tokio-core, if you'd like a "real world" HTTP library you likely want a +//! crate like Hyper. +//! +//! Code here is based on the `echo-threads` example and implements two paths, +//! the `/plaintext` and `/json` routes to respond with some text and json, +//! respectively. By default this will run I/O on all the cores your system has +//! available, and it doesn't support HTTP request bodies. + +#![deny(warnings)] + +extern crate bytes; +extern crate http; +extern crate httparse; +#[macro_use] +extern crate serde_derive; +extern crate serde_json; +extern crate time; +extern crate tokio; +extern crate tokio_io; + +use std::{env, fmt, io}; +use std::net::SocketAddr; + +use tokio::net::{TcpStream, TcpListener}; +use tokio::prelude::*; +use tokio::codec::{Encoder, Decoder}; + +use bytes::BytesMut; +use http::header::HeaderValue; +use http::{Request, Response, StatusCode}; + +fn main() { + // Parse the arguments, bind the TCP socket we'll be listening to, spin up + // our worker threads, and start shipping sockets to those worker threads. + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr.parse::<SocketAddr>().unwrap(); + + let listener = TcpListener::bind(&addr).expect("failed to bind"); + println!("Listening on: {}", addr); + + tokio::run({ + listener.incoming() + .map_err(|e| println!("failed to accept socket; error = {:?}", e)) + .for_each(|socket| { + process(socket); + Ok(()) + }) + }); +} + +fn process(socket: TcpStream) { + let (tx, rx) = + // Frame the socket using the `Http` protocol. This maps the TCP socket + // to a Stream + Sink of HTTP frames. + Http.framed(socket) + // This splits a single `Stream + Sink` value into two separate handles + // that can be used independently (even on different tasks or threads). + .split(); + + // Map all requests into responses and send them back to the client. + let task = tx.send_all(rx.and_then(respond)) + .then(|res| { + if let Err(e) = res { + println!("failed to process connection; error = {:?}", e); + } + + Ok(()) + }); + + // Spawn the task that handles the connection. + tokio::spawn(task); +} + +/// "Server logic" is implemented in this function. +/// +/// This function is a map from and HTTP request to a future of a response and +/// represents the various handling a server might do. Currently the contents +/// here are pretty uninteresting. +fn respond(req: Request<()>) + -> Box<Future<Item = Response<String>, Error = io::Error> + Send> +{ + let mut ret = Response::builder(); + let body = match req.uri().path() { + "/plaintext" => { + ret.header("Content-Type", "text/plain"); + "Hello, World!".to_string() + } + "/json" => { + ret.header("Content-Type", "application/json"); + + #[derive(Serialize)] + struct Message { + message: &'static str, + } + serde_json::to_string(&Message { message: "Hello, World!" }) + .unwrap() + } + _ => { + ret.status(StatusCode::NOT_FOUND); + String::new() + } + }; + Box::new(future::ok(ret.body(body).unwrap())) +} + +struct Http; + +/// Implementation of encoding an HTTP response into a `BytesMut`, basically +/// just writing out an HTTP/1.1 response. +impl Encoder for Http { + type Item = Response<String>; + type Error = io::Error; + + fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> { + use std::fmt::Write; + + write!(BytesWrite(dst), "\ + HTTP/1.1 {}\r\n\ + Server: Example\r\n\ + Content-Length: {}\r\n\ + Date: {}\r\n\ + ", item.status(), item.body().len(), date::now()).unwrap(); + + for (k, v) in item.headers() { + dst.extend_from_slice(k.as_str().as_bytes()); + dst.extend_from_slice(b": "); + dst.extend_from_slice(v.as_bytes()); + dst.extend_from_slice(b"\r\n"); + } + + dst.extend_from_slice(b"\r\n"); + dst.extend_from_slice(item.body().as_bytes()); + + return Ok(()); + + // Right now `write!` on `Vec<u8>` goes through io::Write and is not + // super speedy, so inline a less-crufty implementation here which + // doesn't go through io::Error. + struct BytesWrite<'a>(&'a mut BytesMut); + + impl<'a> fmt::Write for BytesWrite<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } + + fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result { + fmt::write(self, args) + } + } + } +} + +/// Implementation of decoding an HTTP request from the bytes we've read so far. +/// This leverages the `httparse` crate to do the actual parsing and then we use +/// that information to construct an instance of a `http::Request` object, +/// trying to avoid allocations where possible. +impl Decoder for Http { + type Item = Request<()>; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> { + // TODO: we should grow this headers array if parsing fails and asks + // for more headers + let mut headers = [None; 16]; + let (method, path, version, amt) = { + let mut parsed_headers = [httparse::EMPTY_HEADER; 16]; + let mut r = httparse::Request::new(&mut parsed_headers); + let status = r.parse(src).map_err(|e| { + let msg = format!("failed to parse http request: {:?}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let amt = match status { + httparse::Status::Complete(amt) => amt, + httparse::Status::Partial => return Ok(None), + }; + + let toslice = |a: &[u8]| { + let start = a.as_ptr() as usize - src.as_ptr() as usize; + assert!(start < src.len()); + (start, start + a.len()) + }; + + for (i, header) in r.headers.iter().enumerate() { + let k = toslice(header.name.as_bytes()); + let v = toslice(header.value); + headers[i] = Some((k, v)); + } + + (toslice(r.method.unwrap().as_bytes()), + toslice(r.path.unwrap().as_bytes()), + r.version.unwrap(), + amt) + }; + if version != 1 { + return Err(io::Error::new(io::ErrorKind::Other, "only HTTP/1.1 accepted")) + } + let data = src.split_to(amt).freeze(); + let mut ret = Request::builder(); + ret.method(&data[method.0..method.1]); + ret.uri(data.slice(path.0, path.1)); + ret.version(http::Version::HTTP_11); + for header in headers.iter() { + let (k, v) = match *header { + Some((ref k, ref v)) => (k, v), + None => break, + }; + let value = unsafe { + HeaderValue::from_shared_unchecked(data.slice(v.0, v.1)) + }; + ret.header(&data[k.0..k.1], value); + } + + let req = ret.body(()).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + })?; + Ok(Some(req)) + } +} + +mod date { + use std::cell::RefCell; + use std::fmt::{self, Write}; + use std::str; + + use time::{self, Duration}; + + pub struct Now(()); + + /// Returns a struct, which when formatted, renders an appropriate `Date` + /// header value. + pub fn now() -> Now { + Now(()) + } + + // Gee Alex, doesn't this seem like premature optimization. Well you see + // there Billy, you're absolutely correct! If your server is *bottlenecked* + // on rendering the `Date` header, well then boy do I have news for you, you + // don't need this optimization. + // + // In all seriousness, though, a simple "hello world" benchmark which just + // sends back literally "hello world" with standard headers actually is + // bottlenecked on rendering a date into a byte buffer. Since it was at the + // top of a profile, and this was done for some competitive benchmarks, this + // module was written. + // + // Just to be clear, though, I was not intending on doing this because it + // really does seem kinda absurd, but it was done by someone else [1], so I + // blame them! :) + // + // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66 + + struct LastRenderedNow { + bytes: [u8; 128], + amt: usize, + next_update: time::Timespec, + } + + thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow { + bytes: [0; 128], + amt: 0, + next_update: time::Timespec::new(0, 0), + })); + + impl fmt::Display for Now { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + LAST.with(|cache| { + let mut cache = cache.borrow_mut(); + let now = time::get_time(); + if now >= cache.next_update { + cache.update(now); + } + f.write_str(cache.buffer()) + }) + } + } + + impl LastRenderedNow { + fn buffer(&self) -> &str { + str::from_utf8(&self.bytes[..self.amt]).unwrap() + } + + fn update(&mut self, now: time::Timespec) { + self.amt = 0; + write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap(); + self.next_update = now + Duration::seconds(1); + self.next_update.nsec = 0; + } + } + + struct LocalBuffer<'a>(&'a mut LastRenderedNow); + + impl<'a> fmt::Write for LocalBuffer<'a> { + fn write_str(&mut self, s: &str) -> fmt::Result { + let start = self.0.amt; + let end = start + s.len(); + self.0.bytes[start..end].copy_from_slice(s.as_bytes()); + self.0.amt += s.len(); + Ok(()) + } + } +} diff --git a/third_party/rust/tokio-0.1.11/examples/udp-client.rs b/third_party/rust/tokio-0.1.11/examples/udp-client.rs new file mode 100644 index 0000000000..3af7c3beaa --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/udp-client.rs @@ -0,0 +1,74 @@ +//! A UDP client that just sends everything it gets via `stdio` in a single datagram, and then +//! waits for a reply. +//! +//! For the reasons of simplicity data from `stdio` is read until `EOF` in a blocking manner. +//! +//! You can test this out by running an echo server: +//! +//! ``` +//! $ cargo run --example echo-udp -- 127.0.0.1:8080 +//! ``` +//! +//! and running the client in another terminal: +//! +//! ``` +//! $ cargo run --example udp-client +//! ``` +//! +//! You can optionally provide any custom endpoint address for the client: +//! +//! ``` +//! $ cargo run --example udp-client -- 127.0.0.1:8080 +//! ``` +//! +//! Don't forget to pass `EOF` to the standard input of the client! +//! +//! Please mind that since the UDP protocol doesn't have any capabilities to detect a broken +//! connection the server needs to be run first, otherwise the client will block forever. + +extern crate futures; +extern crate tokio; + +use std::env; +use std::io::stdin; +use std::net::SocketAddr; +use tokio::net::UdpSocket; +use tokio::prelude::*; + +fn get_stdin_data() -> Vec<u8> { + let mut buf = Vec::new(); + stdin().read_to_end(&mut buf).unwrap(); + buf +} + +fn main() { + let remote_addr: SocketAddr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".into()) + .parse() + .unwrap(); + // We use port 0 to let the operating system allocate an available port for us. + let local_addr: SocketAddr = if remote_addr.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + }.parse() + .unwrap(); + let socket = UdpSocket::bind(&local_addr).unwrap(); + const MAX_DATAGRAM_SIZE: usize = 65_507; + let processing = socket + .send_dgram(get_stdin_data(), &remote_addr) + .and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE])) + .map(|(_, data, len, _)| { + println!( + "Received {} bytes:\n{}", + len, + String::from_utf8_lossy(&data[..len]) + ) + }) + .wait(); + match processing { + Ok(_) => {} + Err(e) => eprintln!("Encountered an error: {}", e), + } +} diff --git a/third_party/rust/tokio-0.1.11/examples/udp-codec.rs b/third_party/rust/tokio-0.1.11/examples/udp-codec.rs new file mode 100644 index 0000000000..b273a36061 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/examples/udp-codec.rs @@ -0,0 +1,64 @@ +//! This example leverages `BytesCodec` to create a UDP client and server which +//! speak a custom protocol. +//! +//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of +//! client messages. These messages are then processed and returned back as a +//! new message with a new destination. Overall, we then use this to construct a +//! "ping pong" pair where two sockets are sending messages back and forth. + +#![deny(warnings)] + +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate env_logger; + +use std::net::SocketAddr; + +use tokio::prelude::*; +use tokio::net::{UdpSocket, UdpFramed}; +use tokio_codec::BytesCodec; + +fn main() { + let _ = env_logger::init(); + + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + + // Bind both our sockets and then figure out what ports we got. + let a = UdpSocket::bind(&addr).unwrap(); + let b = UdpSocket::bind(&addr).unwrap(); + let b_addr = b.local_addr().unwrap(); + + // We're parsing each socket with the `BytesCodec` included in `tokio_io`, and then we + // `split` each codec into the sink/stream halves. + let (a_sink, a_stream) = UdpFramed::new(a, BytesCodec::new()).split(); + let (b_sink, b_stream) = UdpFramed::new(b, BytesCodec::new()).split(); + + // Start off by sending a ping from a to b, afterwards we just print out + // what they send us and continually send pings + // let pings = stream::iter((0..5).map(Ok)); + let a = a_sink.send(("PING".into(), b_addr)).and_then(|a_sink| { + let mut i = 0; + let a_stream = a_stream.take(4).map(move |(msg, addr)| { + i += 1; + println!("[a] recv: {}", String::from_utf8_lossy(&msg)); + (format!("PING {}", i).into(), addr) + }); + a_sink.send_all(a_stream) + }); + + // The second client we have will receive the pings from `a` and then send + // back pongs. + let b_stream = b_stream.map(|(msg, addr)| { + println!("[b] recv: {}", String::from_utf8_lossy(&msg)); + ("PONG".into(), addr) + }); + let b = b_sink.send_all(b_stream); + + // Spawn the sender of pongs and then wait for our pinger to finish. + tokio::run({ + b.join(a) + .map(|_| ()) + .map_err(|e| println!("error = {:?}", e)) + }); +} diff --git a/third_party/rust/tokio-0.1.11/src/async_await.rs b/third_party/rust/tokio-0.1.11/src/async_await.rs new file mode 100644 index 0000000000..88903643ff --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/async_await.rs @@ -0,0 +1,26 @@ +use std::future::{Future as StdFuture}; + +async fn map_ok<T: StdFuture>(future: T) -> Result<(), ()> { + let _ = await!(future); + Ok(()) +} + +/// Like `tokio::run`, but takes an `async` block +pub fn run_async<F>(future: F) +where F: StdFuture<Output = ()> + Send + 'static, +{ + use tokio_async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); + + ::run(future); +} + +/// Like `tokio::spawn`, but takes an `async` block +pub fn spawn_async<F>(future: F) +where F: StdFuture<Output = ()> + Send + 'static, +{ + use tokio_async_await::compat::backward; + let future = backward::Compat::new(map_ok(future)); + + ::spawn(future); +} diff --git a/third_party/rust/tokio-0.1.11/src/clock.rs b/third_party/rust/tokio-0.1.11/src/clock.rs new file mode 100644 index 0000000000..7ddbbf37fe --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/clock.rs @@ -0,0 +1,15 @@ +//! A configurable source of time. +//! +//! This module provides the [`now`][n] function, which returns an `Instant` +//! representing "now". The source of time used by this function is configurable +//! (via the [`tokio-timer`] crate) and allows mocking out the source of time in +//! tests or performing caching operations to reduce the number of syscalls. +//! +//! Note that, because the source of time is configurable, it is possible to +//! observe non-monotonic behavior when calling [`now`][n] from different +//! executors. +//! +//! [n]: fn.now.html +//! [`tokio-timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/clock/index.html + +pub use tokio_timer::clock::now; diff --git a/third_party/rust/tokio-0.1.11/src/codec/length_delimited.rs b/third_party/rust/tokio-0.1.11/src/codec/length_delimited.rs new file mode 100644 index 0000000000..54ec202bb1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/codec/length_delimited.rs @@ -0,0 +1,971 @@ +//! Frame a stream of bytes based on a length prefix +//! +//! Many protocols delimit their frames by prefacing frame data with a +//! frame head that specifies the length of the frame. The +//! `length_delimited` module provides utilities for handling the length +//! based framing. This allows the consumer to work with entire frames +//! without having to worry about buffering or other framing logic. +//! +//! # Getting started +//! +//! If implementing a protocol from scratch, using length delimited framing +//! is an easy way to get started. [`Codec::new()`] will return a length +//! delimited codec using default configuration values. This can then be +//! used to construct a framer to adapt a full-duplex byte stream into a +//! stream of frames. +//! +//! ``` +//! # extern crate tokio; +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use tokio::codec::*; +//! +//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) +//! -> Framed<T, LengthDelimitedCodec> +//! { +//! Framed::new(io, LengthDelimitedCodec::new()) +//! } +//! # pub fn main() {} +//! ``` +//! +//! The returned transport implements `Sink + Stream` for `BytesMut`. It +//! encodes the frame with a big-endian `u32` header denoting the frame +//! payload length: +//! +//! ```text +//! +----------+--------------------------------+ +//! | len: u32 | frame payload | +//! +----------+--------------------------------+ +//! ``` +//! +//! Specifically, given the following: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate bytes; +//! # extern crate futures; +//! # +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use tokio::codec::*; +//! use bytes::Bytes; +//! use futures::{Sink, Future}; +//! +//! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { +//! let mut transport = Framed::new(io, LengthDelimitedCodec::new()); +//! let frame = Bytes::from("hello world"); +//! +//! transport.send(frame).wait().unwrap(); +//! } +//! # +//! # pub fn main() {} +//! ``` +//! +//! The encoded frame will look like this: +//! +//! ```text +//! +---- len: u32 ----+---- data ----+ +//! | \x00\x00\x00\x0b | hello world | +//! +------------------+--------------+ +//! ``` +//! +//! # Decoding +//! +//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], +//! such that each yielded [`BytesMut`] value contains the contents of an +//! entire frame. There are many configuration parameters enabling +//! [`FramedRead`] to handle a wide range of protocols. Here are some +//! examples that will cover the various options at a high level. +//! +//! ## Example 1 +//! +//! The following will parse a `u16` length field at offset 0, including the +//! frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(0) // default value +//! .num_skip(0) // Do not strip frame header +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ +//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | +//! +----------+---------------+ +----------+---------------+ +//! ``` +//! +//! The value of the length field is 11 (`\x0B`) which represents the length +//! of the payload, `hello world`. By default, [`FramedRead`] assumes that +//! the length field represents the number of bytes that **follows** the +//! length field. Thus, the entire frame has a length of 13: 2 bytes for the +//! frame head + 11 bytes for the payload. +//! +//! ## Example 2 +//! +//! The following will parse a `u16` length field at offset 0, omitting the +//! frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(0) // default value +//! // `num_skip` is not needed, the default is to skip +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +--- Payload ---+ +//! | \x00\x0B | Hello world | --> | Hello world | +//! +----------+---------------+ +---------------+ +//! ``` +//! +//! This is similar to the first example, the only difference is that the +//! frame head is **not** included in the yielded `BytesMut` value. +//! +//! ## Example 3 +//! +//! The following will parse a `u16` length field at offset 0, including the +//! frame head in the yielded `BytesMut`. In this case, the length field +//! **includes** the frame head length. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(2) +//! .length_adjustment(-2) // size of head +//! .num_skip(0) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT DECODED +//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ +//! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | +//! +----------+---------------+ +----------+---------------+ +//! ``` +//! +//! In most cases, the length field represents the length of the payload +//! only, as shown in the previous examples. However, in some protocols the +//! length field represents the length of the whole frame, including the +//! head. In such cases, we specify a negative `length_adjustment` to adjust +//! the value provided in the frame head to represent the payload length. +//! +//! ## Example 4 +//! +//! The following will parse a 3 byte length field at offset 0 in a 5 byte +//! frame head, including the frame head in the yielded `BytesMut`. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(0) // default value +//! .length_field_length(3) +//! .length_adjustment(2) // remaining head +//! .num_skip(0) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +---- len -----+- head -+--- Payload ---+ +//! | \x00\x00\x0B | \xCAFE | Hello world | +//! +--------------+--------+---------------+ +//! +//! DECODED +//! +---- len -----+- head -+--- Payload ---+ +//! | \x00\x00\x0B | \xCAFE | Hello world | +//! +--------------+--------+---------------+ +//! ``` +//! +//! A more advanced example that shows a case where there is extra frame +//! head data between the length field and the payload. In such cases, it is +//! usually desirable to include the frame head as part of the yielded +//! `BytesMut`. This lets consumers of the length delimited framer to +//! process the frame head as needed. +//! +//! The positive `length_adjustment` value lets `FramedRead` factor in the +//! additional head into the frame length calculation. +//! +//! ## Example 5 +//! +//! The following will parse a `u16` length field at offset 1 of a 4 byte +//! frame head. The first byte and the length field will be omitted from the +//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be +//! included. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(1) // length of hdr1 +//! .length_field_length(2) +//! .length_adjustment(1) // length of hdr2 +//! .num_skip(3) // length of hdr1 + LEN +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ +//! | \xCA | \x00\x0B | \xFE | Hello world | +//! +--------+----------+--------+---------------+ +//! +//! DECODED +//! +- hdr2 -+--- Payload ---+ +//! | \xFE | Hello world | +//! +--------+---------------+ +//! ``` +//! +//! The length field is situated in the middle of the frame head. In this +//! case, the first byte in the frame head could be a version or some other +//! identifier that is not needed for processing. On the other hand, the +//! second half of the head is needed. +//! +//! `length_field_offset` indicates how many bytes to skip before starting +//! to read the length field. `length_adjustment` is the number of bytes to +//! skip starting at the end of the length field. In this case, it is the +//! second half of the head. +//! +//! ## Example 6 +//! +//! The following will parse a `u16` length field at offset 1 of a 4 byte +//! frame head. The first byte and the length field will be omitted from the +//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be +//! included. In this case, the length field **includes** the frame head +//! length. +//! +//! ``` +//! # extern crate tokio; +//! # use tokio::io::AsyncRead; +//! # use tokio::codec::length_delimited; +//! # fn bind_read<T: AsyncRead>(io: T) { +//! length_delimited::Builder::new() +//! .length_field_offset(1) // length of hdr1 +//! .length_field_length(2) +//! .length_adjustment(-3) // length of hdr1 + LEN, negative +//! .num_skip(3) +//! .new_read(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! The following frame will be decoded as such: +//! +//! ```text +//! INPUT +//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ +//! | \xCA | \x00\x0F | \xFE | Hello world | +//! +--------+----------+--------+---------------+ +//! +//! DECODED +//! +- hdr2 -+--- Payload ---+ +//! | \xFE | Hello world | +//! +--------+---------------+ +//! ``` +//! +//! Similar to the example above, the difference is that the length field +//! represents the length of the entire frame instead of just the payload. +//! The length of `hdr1` and `len` must be counted in `length_adjustment`. +//! Note that the length of `hdr2` does **not** need to be explicitly set +//! anywhere because it already is factored into the total frame length that +//! is read from the byte stream. +//! +//! # Encoding +//! +//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], +//! such that each submitted [`BytesMut`] is prefaced by a length field. +//! There are fewer configuration options than [`FramedRead`]. Given +//! protocols that have more complex frame heads, an encoder should probably +//! be written by hand using [`Encoder`]. +//! +//! Here is a simple example, given a `FramedWrite` with the following +//! configuration: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate bytes; +//! # use tokio::io::AsyncWrite; +//! # use tokio::codec::length_delimited; +//! # use bytes::BytesMut; +//! # fn write_frame<T: AsyncWrite>(io: T) { +//! # let _ = +//! length_delimited::Builder::new() +//! .length_field_length(2) +//! .new_write(io); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! A payload of `hello world` will be encoded as: +//! +//! ```text +//! +- len: u16 -+---- data ----+ +//! | \x00\x0b | hello world | +//! +------------+--------------+ +//! ``` +//! +//! [`FramedRead`]: struct.FramedRead.html +//! [`FramedWrite`]: struct.FramedWrite.html +//! [`AsyncRead`]: ../../trait.AsyncRead.html +//! [`AsyncWrite`]: ../../trait.AsyncWrite.html +//! [`Encoder`]: ../trait.Encoder.html +//! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html + +use { + codec::{ + Decoder, Encoder, FramedRead, FramedWrite, Framed + }, + io::{ + AsyncRead, AsyncWrite + }, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; + +use std::{cmp, fmt}; +use std::error::Error as StdError; +use std::io::{self, Cursor}; + +/// Configure length delimited `LengthDelimitedCodec`s. +/// +/// `Builder` enables constructing configured length delimited codecs. Note +/// that not all configuration settings apply to both encoding and decoding. See +/// the documentation for specific methods for more detail. +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + max_frame_len: usize, + + // Number of bytes representing the field length + length_field_len: usize, + + // Number of bytes in the header before the length field + length_field_offset: usize, + + // Adjust the length specified in the header field by this amount + length_adjustment: isize, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: Option<usize>, + + // Length field byte order (little or big endian) + length_field_is_big_endian: bool, +} + +/// An error when the number of bytes read is more than max frame length. +pub struct FrameTooBig { + _priv: (), +} + +/// A codec for frames delimited by a frame head specifying their lengths. +/// +/// This allows the consumer to work with entire frames without having to worry +/// about buffering or other framing logic. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[derive(Debug)] +pub struct LengthDelimitedCodec { + // Configuration values + builder: Builder, + + // Read state + state: DecodeState, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +// ===== impl LengthDelimitedCodec ====== + +impl LengthDelimitedCodec { + /// Creates a new `LengthDelimitedCodec` with the default configuration values. + pub fn new() -> Self { + Self { + builder: Builder::new(), + state: DecodeState::Head, + } + } + + /// Returns the current max frame setting + /// + /// This is the largest size this codec will accept from the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is decoded. In other + /// words, if a frame is currently in process of being decoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.builder.max_frame_length(val); + } + + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { + let head_len = self.builder.num_head_bytes(); + let field_len = self.builder.length_field_len; + + if src.len() < head_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + // Skip the required bytes + src.advance(self.builder.length_field_offset); + + // match endianess + let n = if self.builder.length_field_is_big_endian { + src.get_uint_be(field_len) + } else { + src.get_uint_le(field_len) + }; + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new(io::ErrorKind::InvalidData, FrameTooBig { + _priv: (), + })); + } + + // The check above ensures there is no overflow + let n = n as usize; + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_sub(-self.builder.length_adjustment as usize) + } else { + n.checked_add(self.builder.length_adjustment as usize) + }; + + // Error handling + match n { + Some(n) => n, + None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")), + } + }; + + let num_skip = self.builder.get_num_skip(); + + if num_skip > 0 { + let _ = src.split_to(num_skip); + } + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + return Ok(Some(n)); + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + +impl Decoder for LengthDelimitedCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + let n = match self.state { + DecodeState::Head => { + match try!(self.decode_head(src)) { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + } + } + DecodeState::Data(n) => n, + }; + + match try!(self.decode_data(n, src)) { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + src.reserve(self.builder.num_head_bytes()); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +impl Encoder for LengthDelimitedCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { + let n = (&data).into_buf().remaining(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new(io::ErrorKind::InvalidInput, FrameTooBig { + _priv: (), + })); + } + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_add(-self.builder.length_adjustment as usize) + } else { + n.checked_sub(self.builder.length_adjustment as usize) + }; + + let n = n.ok_or_else(|| io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + ))?; + + if self.builder.length_field_is_big_endian { + dst.put_uint_be(n as u64, self.builder.length_field_len); + } else { + dst.put_uint_le(n as u64, self.builder.length_field_len); + } + + // Write the frame to the buffer + dst.extend_from_slice(&data[..]); + + Ok(()) + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited codec builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new() -> Builder { + Builder { + // Default max frame length of 8MB + max_frame_len: 8 * 1_024 * 1_024, + + // Default byte length of 4 + length_field_len: 4, + + // Default to the header field being at the start of the header. + length_field_offset: 0, + + length_adjustment: 0, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: None, + + // Default to reading the length field in network (big) endian. + length_field_is_big_endian: true, + } + } + + /// Read the length field as a big endian integer + /// + /// This is the default setting. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .big_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn big_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = true; + self + } + + /// Read the length field as a little endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .little_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn little_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = false; + self + } + + /// Read the length field as a native endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .native_endian() + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn native_endian(&mut self) -> &mut Self { + if cfg!(target_endian = "big") { + self.big_endian() + } else { + self.little_endian() + } + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `FrameTooBig` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Sets the number of bytes used to represent the length field + /// + /// The default value is `4`. The max value is `8`. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_length(4) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_field_length(&mut self, val: usize) -> &mut Self { + assert!(val > 0 && val <= 8, "invalid length field length"); + self.length_field_len = val; + self + } + + /// Sets the number of bytes in the header before the length field + /// + /// This configuration option only applies to decoding. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(1) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_field_offset(&mut self, val: usize) -> &mut Self { + self.length_field_offset = val; + self + } + + /// Delta between the payload length specified in the header and the real + /// payload length + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_adjustment(-2) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn length_adjustment(&mut self, val: isize) -> &mut Self { + self.length_adjustment = val; + self + } + + /// Sets the number of bytes to skip before reading the payload + /// + /// Default value is `length_field_len + length_field_offset` + /// + /// This configuration option only applies to decoding + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .num_skip(4) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn num_skip(&mut self, val: usize) -> &mut Self { + self.num_skip = Some(val); + self + } + + /// Create a configured length delimited `LengthDelimitedCodec` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// # pub fn main() { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_codec(); + /// # } + /// ``` + pub fn new_codec(&self) -> LengthDelimitedCodec { + LengthDelimitedCodec { + builder: *self, + state: DecodeState::Head, + } + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::io::AsyncRead; + /// use tokio::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec> + where T: AsyncRead, + { + FramedRead::new(upstream, self.new_codec()) + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate bytes; + /// # use tokio::io::AsyncWrite; + /// # use tokio::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncWrite>(io: T) { + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec> + where T: AsyncWrite, + { + FramedWrite::new(inner, self.new_codec()) + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate bytes; + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use tokio::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + /// # let _ = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec> + where T: AsyncRead + AsyncWrite, + { + Framed::new(inner, self.new_codec()) + } + + fn num_head_bytes(&self) -> usize { + let num = self.length_field_offset + self.length_field_len; + cmp::max(num, self.num_skip.unwrap_or(0)) + } + + fn get_num_skip(&self) -> usize { + self.num_skip.unwrap_or(self.length_field_offset + self.length_field_len) + } +} + + +// ===== impl FrameTooBig ===== + +impl fmt::Debug for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameTooBig") + .finish() + } +} + +impl fmt::Display for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for FrameTooBig { + fn description(&self) -> &str { + "frame size too big" + } +} diff --git a/third_party/rust/tokio-0.1.11/src/codec/mod.rs b/third_party/rust/tokio-0.1.11/src/codec/mod.rs new file mode 100644 index 0000000000..cb0fc922de --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/codec/mod.rs @@ -0,0 +1,26 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: ../io/trait.AsyncRead.html +//! [`AsyncWrite`]: ../io/trait.AsyncWrite.html +//! [`Sink`]: https://docs.rs/futures/0.1/futures/sink/trait.Sink.html +//! [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html +//! [transports]: https://tokio.rs/docs/going-deeper/frames/ + +pub use tokio_codec::{ + Decoder, + Encoder, + Framed, + FramedParts, + FramedRead, + FramedWrite, + BytesCodec, + LinesCodec, +}; + +pub mod length_delimited; + +pub use self::length_delimited::LengthDelimitedCodec; diff --git a/third_party/rust/tokio-0.1.11/src/executor/current_thread/mod.rs b/third_party/rust/tokio-0.1.11/src/executor/current_thread/mod.rs new file mode 100644 index 0000000000..6036aa997b --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/executor/current_thread/mod.rs @@ -0,0 +1,170 @@ +#![allow(deprecated)] + +//! Execute many tasks concurrently on the current thread. +//! +//! [`CurrentThread`] is an executor that keeps tasks on the same thread that +//! they were spawned from. This allows it to execute futures that are not +//! `Send`. +//! +//! A single [`CurrentThread`] instance is able to efficiently manage a large +//! number of tasks and will attempt to schedule all tasks fairly. +//! +//! All tasks that are being managed by a [`CurrentThread`] executor are able to +//! spawn additional tasks by calling [`spawn`]. This function only works from +//! within the context of a running [`CurrentThread`] instance. +//! +//! The easiest way to start a new [`CurrentThread`] executor is to call +//! [`block_on_all`] with an initial task to seed the executor. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread; +//! use futures::future::lazy; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! current_thread::block_on_all(lazy(|| { +//! // The execution context is setup, futures may be executed. +//! current_thread::spawn(lazy(|| { +//! println!("called from the current thread executor"); +//! Ok(()) +//! })); +//! +//! Ok::<_, ()>(()) +//! })); +//! # } +//! ``` +//! +//! The `block_on_all` function will block the current thread until **all** +//! tasks that have been spawned onto the [`CurrentThread`] instance have +//! completed. +//! +//! More fine-grain control can be achieved by using [`CurrentThread`] directly. +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! # use tokio::executor::current_thread::CurrentThread; +//! use futures::future::{lazy, empty}; +//! use std::time::Duration; +//! +//! // Calling execute here results in a panic +//! // current_thread::spawn(my_future); +//! +//! # pub fn main() { +//! let mut current_thread = CurrentThread::new(); +//! +//! // Spawn a task, the task is not executed yet. +//! current_thread.spawn(lazy(|| { +//! println!("Spawning a task"); +//! Ok(()) +//! })); +//! +//! // Spawn a task that never completes +//! current_thread.spawn(empty()); +//! +//! // Run the executor, but only until the provided future completes. This +//! // provides the opportunity to start executing previously spawned tasks. +//! let res = current_thread.block_on(lazy(|| { +//! Ok::<_, ()>("Hello") +//! })).unwrap(); +//! +//! // Now, run the executor for *at most* 1 second. Since a task was spawned +//! // that never completes, this function will return with an error. +//! current_thread.run_timeout(Duration::from_secs(1)).unwrap_err(); +//! # } +//! ``` +//! +//! # Execution model +//! +//! Internally, [`CurrentThread`] maintains a queue. When one of its tasks is +//! notified, the task gets added to the queue. The executor will pop tasks from +//! the queue and call [`Future::poll`]. If the task gets notified while it is +//! being executed, it won't get re-executed until all other tasks currently in +//! the queue get polled. +//! +//! Before the task is polled, a thread-local variable referencing the current +//! [`CurrentThread`] instance is set. This enables [`spawn`] to spawn new tasks +//! onto the same executor without having to thread through a handle value. +//! +//! If the [`CurrentThread`] instance still has uncompleted tasks, but none of +//! these tasks are ready to be polled, the current thread is put to sleep. When +//! a task is notified, the thread is woken up and processing resumes. +//! +//! All tasks managed by [`CurrentThread`] remain on the current thread. When a +//! task completes, it is dropped. +//! +//! [`spawn`]: fn.spawn.html +//! [`block_on_all`]: fn.block_on_all.html +//! [`CurrentThread`]: struct.CurrentThread.html +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll + +pub use tokio_current_thread::{ + BlockError, + CurrentThread, + Entered, + Handle, + RunError, + RunTimeoutError, + TaskExecutor, + Turn, + TurnError, + block_on_all, + spawn, +}; + +use std::cell::Cell; +use std::marker::PhantomData; + +use futures::future::{self}; + +#[deprecated(since = "0.1.2", note = "use block_on_all instead")] +#[doc(hidden)] +#[derive(Debug)] +pub struct Context<'a> { + cancel: Cell<bool>, + _p: PhantomData<&'a ()>, +} + +impl<'a> Context<'a> { + /// Cancels *all* executing futures. + pub fn cancel_all_spawned(&self) { + self.cancel.set(true); + } +} + +#[deprecated(since = "0.1.2", note = "use block_on_all instead")] +#[doc(hidden)] +pub fn run<F, R>(f: F) -> R + where F: FnOnce(&mut Context) -> R +{ + let mut context = Context { + cancel: Cell::new(false), + _p: PhantomData, + }; + + let mut current_thread = CurrentThread::new(); + + let ret = current_thread + .block_on(future::lazy(|| Ok::<_, ()>(f(&mut context)))) + .unwrap(); + + if context.cancel.get() { + return ret; + } + + current_thread.run().unwrap(); + ret +} + +#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")] +#[doc(hidden)] +pub fn task_executor() -> TaskExecutor { + TaskExecutor::current() +} + diff --git a/third_party/rust/tokio-0.1.11/src/executor/mod.rs b/third_party/rust/tokio-0.1.11/src/executor/mod.rs new file mode 100644 index 0000000000..e1e47ae1d6 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/executor/mod.rs @@ -0,0 +1,145 @@ +//! Task execution utilities. +//! +//! In the Tokio execution model, futures are lazy. When a future is created, no +//! work is performed. In order for the work defined by the future to happen, +//! the future must be submitted to an executor. A future that is submitted to +//! an executor is called a "task". +//! +//! The executor is responsible for ensuring that [`Future::poll`] is +//! called whenever the task is [notified]. Notification happens when the +//! internal state of a task transitions from "not ready" to ready. For +//! example, a socket might have received data and a call to `read` will now be +//! able to succeed. +//! +//! The specific strategy used to manage the tasks is left up to the +//! executor. There are two main flavors of executors: single-threaded and +//! multi-threaded. Tokio provides implementation for both of these in the +//! [`runtime`] module. +//! +//! # `Executor` trait. +//! +//! This module provides the [`Executor`] trait (re-exported from +//! [`tokio-executor`]), which describes the API that all executors must +//! implement. +//! +//! A free [`spawn`] function is provided that allows spawning futures onto the +//! default executor (tracked via a thread-local variable) without referencing a +//! handle. It is expected that all executors will set a value for the default +//! executor. This value will often be set to the executor itself, but it is +//! possible that the default executor might be set to a different executor. +//! +//! For example, a single threaded executor might set the default executor to a +//! thread pool instead of itself, allowing futures to spawn new tasks onto the +//! thread pool when those tasks are `Send`. +//! +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll +//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify +//! [`runtime`]: ../runtime/index.html +//! [`tokio-executor`]: https://docs.rs/tokio-executor/0.1 +//! [`Executor`]: trait.Executor.html +//! [`spawn`]: fn.spawn.html + +#[deprecated( + since = "0.1.8", + note = "use tokio-current-thread crate or functions in tokio::runtime::current_thread instead", +)] +#[doc(hidden)] +pub mod current_thread; + +#[deprecated(since = "0.1.8", note = "use tokio-threadpool crate instead")] +#[doc(hidden)] +/// Re-exports of [`tokio-threadpool`], deprecated in favor of the crate. +/// +/// [`tokio-threadpool`]: https://docs.rs/tokio-threadpool/0.1 +pub mod thread_pool { + pub use tokio_threadpool::{ + Builder, + Sender, + Shutdown, + ThreadPool, + }; +} + +pub use tokio_executor::{Executor, DefaultExecutor, SpawnError}; + +use futures::{Future, IntoFuture}; +use futures::future::{self, FutureResult}; + +/// Return value from the `spawn` function. +/// +/// Currently this value doesn't actually provide any functionality. However, it +/// provides a way to add functionality later without breaking backwards +/// compatibility. +/// +/// This also implements `IntoFuture` so that it can be used as the return value +/// in a `for_each` loop. +/// +/// See [`spawn`] for more details. +/// +/// [`spawn`]: fn.spawn.html +#[derive(Debug)] +pub struct Spawn(()); + +/// Spawns a future on the default executor. +/// +/// In order for a future to do work, it must be spawned on an executor. The +/// `spawn` function is the easiest way to do this. It spawns a future on the +/// [default executor] for the current execution context (tracked using a +/// thread-local variable). +/// +/// The default executor is **usually** a thread pool. +/// +/// # Examples +/// +/// In this example, a server is started and `spawn` is used to start a new task +/// that processes each received connection. +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// [default executor]: struct.DefaultExecutor.html +/// +/// # Panics +/// +/// This function will panic if the default executor is not set or if spawning +/// onto the default executor returns an error. To avoid the panic, use +/// [`DefaultExecutor`]. +/// +/// [`DefaultExecutor`]: struct.DefaultExecutor.html +pub fn spawn<F>(f: F) -> Spawn +where F: Future<Item = (), Error = ()> + 'static + Send +{ + ::tokio_executor::spawn(f); + Spawn(()) +} + +impl IntoFuture for Spawn { + type Future = FutureResult<(), ()>; + type Item = (); + type Error = (); + + fn into_future(self) -> Self::Future { + future::ok(()) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/fs.rs b/third_party/rust/tokio-0.1.11/src/fs.rs new file mode 100644 index 0000000000..689a601368 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/fs.rs @@ -0,0 +1,12 @@ +//! Asynchronous filesystem manipulation operations. +//! +//! This module contains basic methods and types for manipulating the contents +//! of the local filesystem from within the context of the Tokio runtime. +//! +//! Unlike *most* other Tokio APIs, the filesystem APIs **must** be used from +//! the context of the Tokio runtime as they require Tokio specific features to +//! function. + +pub use tokio_fs::{create_dir, create_dir_all, file, hard_link, metadata, os, read_dir, read_link}; +pub use tokio_fs::{remove_dir, remove_file, rename, set_permissions, symlink_metadata, File}; +pub use tokio_fs::OpenOptions; diff --git a/third_party/rust/tokio-0.1.11/src/io.rs b/third_party/rust/tokio-0.1.11/src/io.rs new file mode 100644 index 0000000000..1d6bfd3a70 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/io.rs @@ -0,0 +1,93 @@ +//! Asynchronous I/O. +//! +//! This module is the asynchronous version of `std::io`. Primarily, it +//! defines two traits, [`AsyncRead`] and [`AsyncWrite`], which extend the +//! `Read` and `Write` traits of the standard library. +//! +//! # AsyncRead and AsyncWrite +//! +//! [`AsyncRead`] and [`AsyncWrite`] must only be implemented for +//! non-blocking I/O types that integrate with the futures type system. In +//! other words, these types must never block the thread, and instead the +//! current task is notified when the I/O resource is ready. +//! +//! # Standard input and output +//! +//! Tokio provides asynchronous APIs to standard [input], [output], and [error]. +//! These APIs are very similar to the ones provided by `std`, but they also +//! implement [`AsyncRead`] and [`AsyncWrite`]. +//! +//! Unlike *most* other Tokio APIs, the standard input / output APIs +//! **must** be used from the context of the Tokio runtime as they require +//! Tokio specific features to function. +//! +//! [input]: fn.stdin.html +//! [output]: fn.stdout.html +//! [error]: fn.stderr.html +//! +//! # Utility functions +//! +//! Utilities functions are provided for working with [`AsyncRead`] / +//! [`AsyncWrite`] types. For example, [`copy`] asynchronously copies all +//! data from a source to a destination. +//! +//! # `std` re-exports +//! +//! Additionally, [`Read`], [`Write`], [`Error`], [`ErrorKind`], and +//! [`Result`] are re-exported from `std::io` for ease of use. +//! +//! [`AsyncRead`]: trait.AsyncRead.html +//! [`AsyncWrite`]: trait.AsyncWrite.html +//! [`copy`]: fn.copy.html +//! [`Read`]: trait.Read.html +//! [`Write`]: trait.Write.html +//! [`Error`]: struct.Error.html +//! [`ErrorKind`]: enum.ErrorKind.html +//! [`Result`]: type.Result.html + +pub use tokio_io::{ + AsyncRead, + AsyncWrite, +}; + +// standard input, output, and error +pub use tokio_fs::{ + stdin, + Stdin, + stdout, + Stdout, + stderr, + Stderr, +}; + +// Utils +pub use tokio_io::io::{ + copy, + Copy, + flush, + Flush, + lines, + Lines, + read_exact, + ReadExact, + read_to_end, + ReadToEnd, + read_until, + ReadUntil, + ReadHalf, + shutdown, + Shutdown, + write_all, + WriteAll, + WriteHalf, +}; + +// Re-export io::Error so that users don't have to deal +// with conflicts when `use`ing `futures::io` and `std::io`. +pub use ::std::io::{ + Error, + ErrorKind, + Result, + Read, + Write, +}; diff --git a/third_party/rust/tokio-0.1.11/src/lib.rs b/third_party/rust/tokio-0.1.11/src/lib.rs new file mode 100644 index 0000000000..f652e53a90 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/lib.rs @@ -0,0 +1,120 @@ +#![doc(html_root_url = "https://docs.rs/tokio/0.1.11")] +#![deny(missing_docs, warnings, missing_debug_implementations)] +#![cfg_attr(feature = "async-await-preview", feature( + async_await, + await_macro, + futures_api, + ))] + +//! A runtime for writing reliable, asynchronous, and slim applications. +//! +//! Tokio is an event-driven, non-blocking I/O platform for writing asynchronous +//! applications with the Rust programming language. At a high level, it +//! provides a few major components: +//! +//! * A multi threaded, work-stealing based task [scheduler][runtime]. +//! * A [reactor] backed by the operating system's event queue (epoll, kqueue, +//! IOCP, etc...). +//! * Asynchronous [TCP and UDP][net] sockets. +//! * Asynchronous [filesystem][fs] operations. +//! * [Timer][timer] API for scheduling work in the future. +//! +//! Tokio is built using [futures] as the abstraction for managing the +//! complexity of asynchronous programming. +//! +//! Guide level documentation is found on the [website]. +//! +//! [website]: https://tokio.rs/docs/getting-started/hello-world/ +//! [futures]: http://docs.rs/futures/0.1 +//! +//! # Examples +//! +//! A simple TCP echo server: +//! +//! ```no_run +//! extern crate tokio; +//! +//! use tokio::prelude::*; +//! use tokio::io::copy; +//! use tokio::net::TcpListener; +//! +//! fn main() { +//! // Bind the server's socket. +//! let addr = "127.0.0.1:12345".parse().unwrap(); +//! let listener = TcpListener::bind(&addr) +//! .expect("unable to bind TCP listener"); +//! +//! // Pull out a stream of sockets for incoming connections +//! let server = listener.incoming() +//! .map_err(|e| eprintln!("accept failed = {:?}", e)) +//! .for_each(|sock| { +//! // Split up the reading and writing parts of the +//! // socket. +//! let (reader, writer) = sock.split(); +//! +//! // A future that echos the data and returns how +//! // many bytes were copied... +//! let bytes_copied = copy(reader, writer); +//! +//! // ... after which we'll print what happened. +//! let handle_conn = bytes_copied.map(|amt| { +//! println!("wrote {:?} bytes", amt) +//! }).map_err(|err| { +//! eprintln!("IO error {:?}", err) +//! }); +//! +//! // Spawn the future as a concurrent task. +//! tokio::spawn(handle_conn) +//! }); +//! +//! // Start the Tokio runtime +//! tokio::run(server); +//! } +//! ``` + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate mio; +extern crate tokio_current_thread; +extern crate tokio_io; +extern crate tokio_executor; +extern crate tokio_codec; +extern crate tokio_fs; +extern crate tokio_reactor; +extern crate tokio_threadpool; +extern crate tokio_timer; +extern crate tokio_tcp; +extern crate tokio_udp; + +#[cfg(feature = "async-await-preview")] +extern crate tokio_async_await; + +#[cfg(unix)] +extern crate tokio_uds; + +pub mod clock; +pub mod codec; +pub mod executor; +pub mod fs; +pub mod io; +pub mod net; +pub mod prelude; +pub mod reactor; +pub mod runtime; +pub mod timer; +pub mod util; + +pub use executor::spawn; +pub use runtime::run; + +// ===== Experimental async/await support ===== + +#[cfg(feature = "async-await-preview")] +mod async_await; + +#[cfg(feature = "async-await-preview")] +pub use async_await::{run_async, spawn_async}; + +#[cfg(feature = "async-await-preview")] +pub use tokio_async_await::await; diff --git a/third_party/rust/tokio-0.1.11/src/net.rs b/third_party/rust/tokio-0.1.11/src/net.rs new file mode 100644 index 0000000000..79810b6a73 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/net.rs @@ -0,0 +1,85 @@ +//! TCP/UDP/Unix bindings for `tokio`. +//! +//! This module contains the TCP/UDP/Unix networking types, similar to the standard +//! library, which can be used to implement networking protocols. +//! +//! # Organization +//! +//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP +//! * [`UdpSocket`] and [`UdpFramed`] provide functionality for communication over UDP +//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a +//! Unix Domain Socket **(available on Unix only)** +//! +//! [`TcpListener`]: struct.TcpListener.html +//! [`TcpStream`]: struct.TcpStream.html +//! [`UdpSocket`]: struct.UdpSocket.html +//! [`UdpFramed`]: struct.UdpFramed.html +//! [`UnixListener`]: struct.UnixListener.html +//! [`UnixStream`]: struct.UnixStream.html + +pub mod tcp { + //! TCP bindings for `tokio`. + //! + //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s + //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` + //! implements a future which returns a `TcpStream`. + //! + //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s + //! [`incoming`][incoming_method] method can be used to accept new connections. + //! It return the [`Incoming`] struct, which implements a stream which returns + //! `TcpStream`s. + //! + //! [`TcpStream`]: struct.TcpStream.html + //! [`connect`]: struct.TcpStream.html#method.connect + //! [`ConnectFuture`]: struct.ConnectFuture.html + //! [`TcpListener`]: struct.TcpListener.html + //! [incoming_method]: struct.TcpListener.html#method.incoming + //! [`Incoming`]: struct.Incoming.html + pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; +} +pub use self::tcp::{TcpListener, TcpStream}; + +#[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")] +#[doc(hidden)] +pub type ConnectFuture = self::tcp::ConnectFuture; +#[deprecated(note = "use `tokio::net::tcp::Incoming` instead")] +#[doc(hidden)] +pub type Incoming = self::tcp::Incoming; + +pub mod udp { + //! UDP bindings for `tokio`. + //! + //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. + //! Reading and writing to it can be done using futures, which return the + //! [`RecvDgram`] and [`SendDgram`] structs respectively. + //! + //! For convenience it's also possible to convert raw datagrams into higher-level + //! frames. + //! + //! [`UdpSocket`]: struct.UdpSocket.html + //! [`RecvDgram`]: struct.RecvDgram.html + //! [`SendDgram`]: struct.SendDgram.html + //! [`UdpFramed`]: struct.UdpFramed.html + //! [`framed`]: struct.UdpSocket.html#method.framed + pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket}; +} +pub use self::udp::{UdpFramed, UdpSocket}; + +#[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")] +#[doc(hidden)] +pub type RecvDgram<T> = self::udp::RecvDgram<T>; +#[deprecated(note = "use `tokio::net::udp::SendDgram` instead")] +#[doc(hidden)] +pub type SendDgram<T> = self::udp::SendDgram<T>; + +#[cfg(unix)] +pub mod unix { + //! Unix domain socket bindings for `tokio` (only available on unix systems). + + pub use tokio_uds::{ + ConnectFuture, Incoming, RecvDgram, SendDgram, UCred, UnixDatagram, UnixListener, + UnixStream, + }; +} +#[cfg(unix)] +pub use self::unix::{UnixListener, UnixStream}; diff --git a/third_party/rust/tokio-0.1.11/src/prelude.rs b/third_party/rust/tokio-0.1.11/src/prelude.rs new file mode 100644 index 0000000000..ecd82fe40c --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/prelude.rs @@ -0,0 +1,54 @@ +//! A "prelude" for users of the `tokio` crate. +//! +//! This prelude is similar to the standard library's prelude in that you'll +//! almost always want to import its entire contents, but unlike the standard +//! library's prelude you'll have to do so manually: +//! +//! ``` +//! use tokio::prelude::*; +//! ``` +//! +//! The prelude may grow over time as additional items see ubiquitous use. + +pub use tokio_io::{ + AsyncRead, + AsyncWrite, +}; + +pub use util::{ + FutureExt, + StreamExt, +}; + +pub use ::std::io::{ + Read, + Write, +}; + +pub use futures::{ + Future, + future, + Stream, + stream, + Sink, + IntoFuture, + Async, + AsyncSink, + Poll, + task, +}; + +#[cfg(feature = "async-await-preview")] +#[doc(inline)] +pub use tokio_async_await::{ + io::{ + AsyncReadExt, + AsyncWriteExt, + }, + sink::{ + SinkExt, + }, + stream::{ + StreamExt as StreamAsyncExt, + }, +}; diff --git a/third_party/rust/tokio-0.1.11/src/reactor/mod.rs b/third_party/rust/tokio-0.1.11/src/reactor/mod.rs new file mode 100644 index 0000000000..a7263fd83c --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/reactor/mod.rs @@ -0,0 +1,149 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ```rust +//! # extern crate tokio; +//! # fn dox() { +//! use tokio::prelude::*; +//! use tokio::net::TcpStream; +//! +//! let addr = "93.184.216.34:9243".parse().unwrap(); +//! +//! let connect_future = TcpStream::connect(&addr); +//! +//! let task = connect_future +//! .and_then(|socket| { +//! println!("successfully connected"); +//! Ok(()) +//! }) +//! .map_err(|e| println!("failed to connect; err={:?}", e)); +//! +//! tokio::run(task); +//! # } +//! # fn main() {} +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Lazy registration +//! +//! Notice how the snippet above does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with a reactor, +//! but no reactor is specified. This works because the registration process +//! mentioned above is actually lazy. It doesn't *actually* happen in the +//! [`connect`] function. Instead, the registration is established the first +//! time that the task is polled (again, see [runtime model]). +//! +//! A reactor instance is automatically made available when using the Tokio +//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor +//! sets a thread-local variable referencing the associated [`Reactor`] instance +//! and [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## Implementation +//! +//! The reactor implementation uses [`mio`] to interface with the operating +//! system's event queue. A call to [`Reactor::poll`] results in a single +//! call to [`Poll::poll`] which in turn results in a single call to the +//! operating system's selector. +//! +//! The reactor maintains state for each registered I/O resource. This tracks +//! the executor task to notify when events are provided by the operating +//! system's selector. This state is stored in a `Sync` data structure and +//! referenced by [`Registration`]. When the [`Registration`] instance is +//! dropped, this state is cleaned up. Because the state is stored in a `Sync` +//! data structure, the [`Registration`] instance is able to be moved to other +//! threads. +//! +//! By default, a runtime's default reactor runs on a background thread. This +//! ensures that application code cannot significantly impact the reactor's +//! responsiveness. +//! +//! ## Integrating with the reactor +//! +//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that +//! automatically integrate with the reactor. However, library authors or +//! applications may wish to implement their own resources that are also backed +//! by the reactor. +//! +//! There are a couple of ways to do this. +//! +//! If the custom I/O resource implements [`mio::Evented`] and implements +//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the +//! most suited. +//! +//! Otherwise, [`Registration`] can be used directly. This provides the lowest +//! level primitive needed for integrating with the reactor: a stream of +//! readiness events. +//! +//! [`Reactor`]: struct.Reactor.html +//! [`Registration`]: struct.Registration.html +//! [runtime model]: https://tokio.rs/docs/getting-started/runtime-model/ +//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html +//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx +//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect +//! [`connect`]: ../net/struct.TcpStream.html#method.connect +//! [connect-future]: ../net/struct.ConnectFuture.html +//! [`tokio::run`]: ../runtime/fn.run.html +//! [`TcpStream`]: ../net/struct.TcpStream.html +//! [runtime]: ../runtime +//! [`Handle::current`]: struct.Handle.html#method.current +//! [`mio`]: https://github.com/carllerche/mio +//! [`Reactor::poll`]: struct.Reactor.html#method.poll +//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll +//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +//! [`PollEvented`]: struct.PollEvented.html +//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +pub use tokio_reactor::{ + Reactor, + Handle, + Background, + Turn, + Registration, + PollEvented as PollEvented2, +}; + +mod poll_evented; +#[allow(deprecated)] +pub use self::poll_evented::PollEvented; diff --git a/third_party/rust/tokio-0.1.11/src/reactor/poll_evented.rs b/third_party/rust/tokio-0.1.11/src/reactor/poll_evented.rs new file mode 100644 index 0000000000..d5f6750b6b --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/reactor/poll_evented.rs @@ -0,0 +1,539 @@ +//! Readiness tracking streams, backing I/O objects. +//! +//! This module contains the core type which is used to back all I/O on object +//! in `tokio-core`. The `PollEvented` type is the implementation detail of +//! all I/O. Each `PollEvented` manages registration with a reactor, +//! acquisition of a token, and tracking of the readiness state on the +//! underlying I/O primitive. + +#![allow(deprecated, warnings)] + +use std::fmt; +use std::io::{self, Read, Write}; +use std::sync::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +use futures::{task, Async, Poll}; +use mio::event::Evented; +use mio::Ready; +use tokio_io::{AsyncRead, AsyncWrite}; + +use reactor::{Handle, Registration}; + +#[deprecated(since = "0.1.2", note = "PollEvented2 instead")] +#[doc(hidden)] +pub struct PollEvented<E> { + io: E, + inner: Inner, + handle: Handle, +} + +struct Inner { + registration: Mutex<Registration>, + + /// Currently visible read readiness + read_readiness: AtomicUsize, + + /// Currently visible write readiness + write_readiness: AtomicUsize, +} + +impl<E: fmt::Debug> fmt::Debug for PollEvented<E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PollEvented") + .field("io", &self.io) + .finish() + } +} + +impl<E> PollEvented<E> { + /// Creates a new readiness stream associated with the provided + /// `loop_handle` and for the given `source`. + pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> + where E: Evented, + { + let registration = Registration::new(); + registration.register(&io)?; + + Ok(PollEvented { + io: io, + inner: Inner { + registration: Mutex::new(registration), + read_readiness: AtomicUsize::new(0), + write_readiness: AtomicUsize::new(0), + }, + handle: handle.clone(), + }) + } + + /// Tests to see if this source is ready to be read from or not. + /// + /// If this stream is not ready for a read then `Async::NotReady` will be + /// returned and the current task will be scheduled to receive a + /// notification when the stream is readable again. In other words, this + /// method is only safe to call from within the context of a future's task, + /// typically done in a `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::readable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_read(&mut self) -> Async<()> { + if self.poll_read2().is_ready() { + return ().into(); + } + + Async::NotReady + } + + fn poll_read2(&self) -> Async<Ready> { + let r = self.inner.registration.lock().unwrap(); + + // Load the cached readiness + match self.inner.read_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = r.take_read_ready().unwrap() { + n |= ready2usize(ready); + self.inner.read_readiness.store(n, Relaxed); + } + + return usize2ready(n).into(); + } + } + + let ready = match r.poll_read_ready().unwrap() { + Async::Ready(r) => r, + _ => return Async::NotReady, + }; + + // Cache the value + self.inner.read_readiness.store(ready2usize(ready), Relaxed); + + ready.into() + } + + /// Tests to see if this source is ready to be written to or not. + /// + /// If this stream is not ready for a write then `Async::NotReady` will be returned + /// and the current task will be scheduled to receive a notification when + /// the stream is writable again. In other words, this method is only safe + /// to call from within the context of a future's task, typically done in a + /// `Future::poll` method. + /// + /// This is mostly equivalent to `self.poll_ready(Ready::writable())`. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_write(&mut self) -> Async<()> { + let r = self.inner.registration.lock().unwrap(); + + match self.inner.write_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = r.take_write_ready().unwrap() { + n |= ready2usize(ready); + self.inner.write_readiness.store(n, Relaxed); + } + + return ().into(); + } + } + + let ready = match r.poll_write_ready().unwrap() { + Async::Ready(r) => r, + _ => return Async::NotReady, + }; + + // Cache the value + self.inner.write_readiness.store(ready2usize(ready), Relaxed); + + ().into() + } + + /// Test to see whether this source fulfills any condition listed in `mask` + /// provided. + /// + /// The `mask` given here is a mio `Ready` set of possible events. This can + /// contain any events like read/write but also platform-specific events + /// such as hup and error. The `mask` indicates events that are interested + /// in being ready. + /// + /// If any event in `mask` is ready then it is returned through + /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty + /// and contains all events that are currently ready in the `mask` provided. + /// + /// If no events are ready in the `mask` provided then the current task is + /// scheduled to receive a notification when any of them become ready. If + /// the `writable` event is contained within `mask` then this + /// `PollEvented`'s `write` task will be blocked and otherwise the `read` + /// task will be blocked. This is generally only relevant if you're working + /// with this `PollEvented` object on multiple tasks. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> { + let mut ret = Ready::empty(); + + if mask.is_empty() { + return ret.into(); + } + + if mask.is_writable() { + if self.poll_write().is_ready() { + ret = Ready::writable(); + } + } + + let mask = mask - Ready::writable(); + + if !mask.is_empty() { + if let Async::Ready(v) = self.poll_read2() { + ret |= v & mask; + } + } + + if ret.is_empty() { + if mask.is_writable() { + let _ = self.need_write(); + } + + if mask.is_readable() { + let _ = self.need_read(); + } + + Async::NotReady + } else { + ret.into() + } + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer readable, but it needs to be. + /// + /// This function, like `poll_read`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// readable, typically because a "would block" error was seen. + /// + /// *All* readiness bits associated with this stream except the writable bit + /// will be reset when this method is called. The current task is then + /// scheduled to receive a notification whenever anything changes other than + /// the writable bit. Note that this typically just means the readable bit + /// is used here, but if you're using a custom I/O object for events like + /// hup/error this may also be relevant. + /// + /// Note that it is also only valid to call this method if `poll_read` + /// previously indicated that the object is readable. That is, this function + /// must always be paired with calls to `poll_read` previously. + /// + /// # Errors + /// + /// This function will return an error if the `Reactor` that this `PollEvented` + /// is associated with has gone away (been destroyed). The error means that + /// the ambient futures task could not be scheduled to receive a + /// notification and typically means that the error should be propagated + /// outwards. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_read(&mut self) -> io::Result<()> { + self.inner.read_readiness.store(0, Relaxed); + + if self.poll_read().is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer writable, but it needs to be. + /// + /// This function, like `poll_write`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// writable, typically because a "would block" error was seen. + /// + /// The flag indicating that this stream is writable is unset and the + /// current task is scheduled to receive a notification when the stream is + /// then again writable. + /// + /// Note that it is also only valid to call this method if `poll_write` + /// previously indicated that the object is writable. That is, this function + /// must always be paired with calls to `poll_write` previously. + /// + /// # Errors + /// + /// This function will return an error if the `Reactor` that this `PollEvented` + /// is associated with has gone away (been destroyed). The error means that + /// the ambient futures task could not be scheduled to receive a + /// notification and typically means that the error should be propagated + /// outwards. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_write(&mut self) -> io::Result<()> { + self.inner.write_readiness.store(0, Relaxed); + + if self.poll_write().is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Returns a reference to the event loop handle that this readiness stream + /// is associated with. + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Returns a shared reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_ref(&self) -> &E { + &self.io + } + + /// Returns a mutable reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_mut(&mut self) -> &mut E { + &mut self.io + } + + /// Consumes the `PollEvented` and returns the underlying I/O object + pub fn into_inner(self) -> E { + self.io + } + + /// Deregisters this source of events from the reactor core specified. + /// + /// This method can optionally be called to unregister the underlying I/O + /// object with the event loop that the `handle` provided points to. + /// Typically this method is not required as this automatically happens when + /// `E` is dropped, but for some use cases the `E` object doesn't represent + /// an owned reference, so dropping it won't automatically unregister with + /// the event loop. + /// + /// This consumes `self` as it will no longer provide events after the + /// method is called, and will likely return an error if this `PollEvented` + /// was created on a separate event loop from the `handle` specified. + pub fn deregister(&self) -> io::Result<()> + where E: Evented, + { + self.inner.registration.lock().unwrap() + .deregister(&self.io) + } +} + +impl<E: Read> Read for PollEvented<E> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_read() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().read(buf); + + if is_wouldblock(&r) { + self.need_read()?; + } + + return r + } +} + +impl<E: Write> Write for PollEvented<E> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().write(buf); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().flush(); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } +} + +impl<E: Read> AsyncRead for PollEvented<E> { +} + +impl<E: Write> AsyncWrite for PollEvented<E> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +fn is_wouldblock<T>(r: &io::Result<T>) -> bool { + match *r { + Ok(_) => false, + Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + } +} + +const READ: usize = 1 << 0; +const WRITE: usize = 1 << 1; + +fn ready2usize(ready: Ready) -> usize { + let mut bits = 0; + if ready.is_readable() { + bits |= READ; + } + if ready.is_writable() { + bits |= WRITE; + } + bits | platform::ready2usize(ready) +} + +fn usize2ready(bits: usize) -> Ready { + let mut ready = Ready::empty(); + if bits & READ != 0 { + ready.insert(Ready::readable()); + } + if bits & WRITE != 0 { + ready.insert(Ready::writable()); + } + ready | platform::usize2ready(bits) +} + +#[cfg(unix)] +mod platform { + use mio::Ready; + use mio::unix::UnixReady; + + const HUP: usize = 1 << 2; + const ERROR: usize = 1 << 3; + const AIO: usize = 1 << 4; + const LIO: usize = 1 << 5; + + #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] + fn is_aio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_aio() + } + + #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] + fn is_aio(_ready: &Ready) -> bool { + false + } + + #[cfg(target_os = "freebsd")] + fn is_lio(ready: &Ready) -> bool { + UnixReady::from(*ready).is_lio() + } + + #[cfg(not(target_os = "freebsd"))] + fn is_lio(_ready: &Ready) -> bool { + false + } + + pub fn ready2usize(ready: Ready) -> usize { + let ready = UnixReady::from(ready); + let mut bits = 0; + if is_aio(&ready) { + bits |= AIO; + } + if is_lio(&ready) { + bits |= LIO; + } + if ready.is_error() { + bits |= ERROR; + } + if ready.is_hup() { + bits |= HUP; + } + bits + } + + #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "ios", + target_os = "macos"))] + fn usize2ready_aio(ready: &mut UnixReady) { + ready.insert(UnixReady::aio()); + } + + #[cfg(not(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos")))] + fn usize2ready_aio(_ready: &mut UnixReady) { + // aio not available here → empty + } + + #[cfg(target_os = "freebsd")] + fn usize2ready_lio(ready: &mut UnixReady) { + ready.insert(UnixReady::lio()); + } + + #[cfg(not(target_os = "freebsd"))] + fn usize2ready_lio(_ready: &mut UnixReady) { + // lio not available here → empty + } + + pub fn usize2ready(bits: usize) -> Ready { + let mut ready = UnixReady::from(Ready::empty()); + if bits & AIO != 0 { + usize2ready_aio(&mut ready); + } + if bits & LIO != 0 { + usize2ready_lio(&mut ready); + } + if bits & HUP != 0 { + ready.insert(UnixReady::hup()); + } + if bits & ERROR != 0 { + ready.insert(UnixReady::error()); + } + ready.into() + } +} + +#[cfg(windows)] +mod platform { + use mio::Ready; + + pub fn all() -> Ready { + // No platform-specific Readinesses for Windows + Ready::empty() + } + + pub fn hup() -> Ready { + Ready::empty() + } + + pub fn ready2usize(_r: Ready) -> usize { + 0 + } + + pub fn usize2ready(_r: usize) -> Ready { + Ready::empty() + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/builder.rs b/third_party/rust/tokio-0.1.11/src/runtime/builder.rs new file mode 100644 index 0000000000..43eb5ddee1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/builder.rs @@ -0,0 +1,261 @@ +use runtime::{Inner, Runtime}; + +use reactor::Reactor; + +use std::io; + +use tokio_reactor; +use tokio_threadpool::Builder as ThreadPoolBuilder; +use tokio_threadpool::park::DefaultPark; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; + +/// Builds Tokio Runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// # extern crate tokio; +/// # extern crate tokio_threadpool; +/// # use tokio::runtime::Builder; +/// +/// # pub fn main() { +/// // create and configure ThreadPool +/// let mut threadpool_builder = tokio_threadpool::Builder::new(); +/// threadpool_builder +/// .name_prefix("my-runtime-worker-") +/// .pool_size(4); +/// +/// // build Runtime +/// let runtime = Builder::new() +/// .threadpool_builder(threadpool_builder) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// Thread pool specific builder + threadpool_builder: ThreadPoolBuilder, + + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + let mut threadpool_builder = ThreadPoolBuilder::new(); + threadpool_builder.name_prefix("tokio-runtime-worker-"); + + Builder { + threadpool_builder, + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Set builder to set up the thread pool instance. + #[deprecated( + since="0.1.9", + note="use the `core_threads`, `blocking_threads`, `name_prefix`, \ + and `stack_size` functions on `runtime::Builder`, instead")] + #[doc(hidden)] + pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self { + self.threadpool_builder = val; + self + } + + /// Set the maximum number of worker threads for the `Runtime`'s thread pool. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .core_threads(4) + /// .build() + /// .unwrap(); + /// # } + /// ``` + pub fn core_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.pool_size(val); + self + } + + /// Set the maximum number of concurrent blocking sections in the `Runtime`'s + /// thread pool. + /// + /// When the maximum concurrent `blocking` calls is reached, any further + /// calls to `blocking` will return `NotReady` and the task is notified once + /// previously in-flight calls to `blocking` return. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is 100. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .blocking_threads(200) + /// .build(); + /// # } + /// ``` + pub fn blocking_threads(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.max_blocking(val); + self + } + + /// Set name prefix of threads spawned by the `Runtime`'s thread pool. + /// + /// Thread name prefix is used for generating thread names. For example, if + /// prefix is `my-pool-`, then threads in the pool will get names like + /// `my-pool-1` etc. + /// + /// The default prefix is "tokio-runtime-worker-". + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .name_prefix("my-pool-") + /// .build(); + /// # } + /// ``` + pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self { + self.threadpool_builder.name_prefix(val); + self + } + + /// Set the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let mut rt = runtime::Builder::new() + /// .stack_size(32 * 1024) + /// .build(); + /// # } + /// ``` + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.threadpool_builder.stack_size(val); + self + } + + /// Create the configured `Runtime`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # use tokio::runtime::Builder; + /// # pub fn main() { + /// let runtime = Builder::new().build().unwrap(); + /// // ... call runtime.run(...) + /// # let _ = runtime; + /// # } + /// ``` + pub fn build(&mut self) -> io::Result<Runtime> { + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + // Get a handle to the clock for the runtime. + let clock1 = self.clock.clone(); + let clock2 = clock1.clone(); + + let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new())); + let t1 = timers.clone(); + + // Spawn a reactor on a background thread. + let reactor = Reactor::new()?.background()?; + + // Get a handle to the reactor. + let reactor_handle = reactor.handle().clone(); + + let pool = self.threadpool_builder + .around_worker(move |w, enter| { + let timer_handle = t1.lock().unwrap() + .get(w.id()).unwrap() + .clone(); + + tokio_reactor::with_default(&reactor_handle, enter, |enter| { + clock::with_default(&clock1, enter, |enter| { + timer::with_default(&timer_handle, enter, |_| { + w.run(); + }); + }) + }); + }) + .custom_park(move |worker_id| { + // Create a new timer + let timer = Timer::new_with_now(DefaultPark::new(), clock2.clone()); + + timers.lock().unwrap() + .insert(worker_id.clone(), timer.handle()); + + timer + }) + .build(); + + Ok(Runtime { + inner: Some(Inner { + reactor, + pool, + }), + }) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs new file mode 100644 index 0000000000..72960fadf2 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/builder.rs @@ -0,0 +1,88 @@ +use executor::current_thread::CurrentThread; +use runtime::current_thread::Runtime; + +use tokio_reactor::Reactor; +use tokio_timer::clock::Clock; +use tokio_timer::timer::Timer; + +use std::io; + +/// Builds a Single-threaded runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// extern crate tokio; +/// extern crate tokio_timer; +/// +/// use tokio::runtime::current_thread::Builder; +/// use tokio_timer::clock::Clock; +/// +/// # pub fn main() { +/// // build Runtime +/// let runtime = Builder::new() +/// .clock(Clock::new()) +/// .build(); +/// // ... call runtime.run(...) +/// # let _ = runtime; +/// # } +/// ``` +#[derive(Debug)] +pub struct Builder { + /// The clock to use + clock: Clock, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + Builder { + clock: Clock::new(), + } + } + + /// Set the `Clock` instance that will be used by the runtime. + pub fn clock(&mut self, clock: Clock) -> &mut Self { + self.clock = clock; + self + } + + /// Create the configured `Runtime`. + pub fn build(&mut self) -> io::Result<Runtime> { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new_with_now(reactor, self.clock.clone()); + let timer_handle = timer.handle(); + + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor to generate some new stimuli for the + // futures to continue in their life. + let executor = CurrentThread::new_with_park(timer); + + let runtime = Runtime::new2( + reactor_handle, + timer_handle, + self.clock.clone(), + executor); + + Ok(runtime) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs new file mode 100644 index 0000000000..dca41711e8 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/mod.rs @@ -0,0 +1,92 @@ +//! A runtime implementation that runs everything on the current thread. +//! +//! [`current_thread::Runtime`][rt] is similar to the primary +//! [`Runtime`][concurrent-rt] except that it runs all components on the current +//! thread instead of using a thread pool. This means that it is able to spawn +//! futures that do not implement `Send`. +//! +//! Same as the default [`Runtime`][concurrent-rt], the +//! [`current_thread::Runtime`][rt] includes: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself +//! and cannot be safely moved to other threads. +//! +//! # Spawning from other threads +//! +//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot +//! safely be moved to other threads, it provides a `Handle` that can be sent +//! to other threads and allows to spawn new tasks from there. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! use std::thread; +//! +//! # fn main() { +//! let mut runtime = Runtime::new().unwrap(); +//! let handle = runtime.handle(); +//! +//! thread::spawn(move || { +//! handle.spawn(future::ok(())); +//! }).join().unwrap(); +//! +//! # /* +//! runtime.run().unwrap(); +//! # */ +//! # } +//! ``` +//! +//! # Examples +//! +//! Creating a new `Runtime` and running a future `f` until its completion and +//! returning its result. +//! +//! ``` +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! +//! let mut runtime = Runtime::new().unwrap(); +//! +//! // Use the runtime... +//! // runtime.block_on(f); // where f is a future +//! ``` +//! +//! [rt]: struct.Runtime.html +//! [concurrent-rt]: ../struct.Runtime.html +//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html +//! [reactor]: ../../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../../timer/index.html + +mod builder; +mod runtime; + +pub use self::builder::Builder; +pub use self::runtime::{Runtime, Handle}; +pub use tokio_current_thread::spawn; +pub use tokio_current_thread::TaskExecutor; + +use futures::Future; + +/// Run the provided future to completion using a runtime running on the current thread. +/// +/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future, +/// which blocks the current thread until the provided future completes. It then calls +/// [`Runtime::run`] to wait for any other spawned futures to resolve. +pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> +where + F: Future, +{ + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + let v = r.block_on(future)?; + r.run().expect("failed to resolve remaining futures"); + Ok(v) +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs new file mode 100644 index 0000000000..262cb1e72d --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/current_thread/runtime.rs @@ -0,0 +1,234 @@ +use tokio_current_thread::{self as current_thread, CurrentThread}; +use tokio_current_thread::Handle as ExecutorHandle; +use runtime::current_thread::Builder; + +use tokio_reactor::{self, Reactor}; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; +use tokio_executor; + +use futures::{future, Future}; + +use std::fmt; +use std::error::Error; +use std::io; + +/// Single-threaded runtime provides a way to start reactor +/// and executor on the current thread. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +#[derive(Debug)] +pub struct Runtime { + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>, +} + +/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance +#[derive(Debug, Clone)] +pub struct Handle(ExecutorHandle); + +impl Handle { + /// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError> + where F: Future<Item = (), Error = ()> + Send + 'static { + self.0.spawn(future) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), tokio_executor::SpawnError> { + self.0.status() + } +} + +impl<T> future::Executor<T> for Handle +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = self.status() { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + inner: current_thread::RunError, +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.inner) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + self.inner.description() + } + fn cause(&self) -> Option<&Error> { + self.inner.cause() + } +} + +impl Runtime { + /// Returns a new runtime initialized with default configuration values. + pub fn new() -> io::Result<Runtime> { + Builder::new().build() + } + + pub(super) fn new2( + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread<Timer<Reactor>>) -> Runtime + { + Runtime { + reactor_handle, + timer_handle, + clock, + executor, + } + } + + /// Get a new handle to spawn futures on the single-threaded Tokio runtime + /// + /// Different to the runtime itself, the handle can be sent to different + /// threads. + pub fn handle(&self) -> Handle { + Handle(self.executor.handle().clone()) + } + + /// Spawn a future onto the single-threaded Tokio runtime. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::current_thread::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("running on the runtime"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + 'static, + { + self.executor.spawn(future); + self + } + + /// Runs the provided future, blocking the current thread until the future + /// completes. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. Once the function returns, any uncompleted futures + /// remain pending in the `Runtime` instance. These futures will not run + /// until `block_on` or `run` is called again. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution by calling `block_on` or `run`. + pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error> + where F: Future + { + self.enter(|executor| { + // Run the provided future + let ret = executor.block_on(f); + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + }) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.enter(|executor| executor.run()) + .map_err(|e| RunError { + inner: e, + }) + } + + fn enter<F, R>(&mut self, f: F) -> R + where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R + { + let Runtime { + ref reactor_handle, + ref timer_handle, + ref clock, + ref mut executor, + .. + } = *self; + + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + + // This will set the default handle and timer to use inside the closure + // and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + clock::with_default(clock, enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the + // current single-threaded executor when used. This is a trick, + // because we need two mutable references to the executor (one + // to run the provided future, another to install as the default + // one). We use the fake one here as the default one. + let mut default_executor = current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + f(&mut executor) + }) + }) + }) + }) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/mod.rs b/third_party/rust/tokio-0.1.11/src/runtime/mod.rs new file mode 100644 index 0000000000..9ff0cc4c2f --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/mod.rs @@ -0,0 +1,496 @@ +//! A batteries included runtime for applications using Tokio. +//! +//! Applications using Tokio require some runtime support in order to work: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! While it is possible to setup each component manually, this involves a bunch +//! of boilerplate. +//! +//! [`Runtime`] bundles all of these various runtime components into a single +//! handle that can be started and shutdown together, eliminating the necessary +//! boilerplate to run a Tokio application. +//! +//! Most applications wont need to use [`Runtime`] directly. Instead, they will +//! use the [`run`] function, which uses [`Runtime`] under the hood. +//! +//! Creating a [`Runtime`] does the following: +//! +//! * Spawn a background thread running a [`Reactor`] instance. +//! * Start a [`ThreadPool`] for executing futures. +//! * Run an instance of [`Timer`] **per** thread pool worker thread. +//! +//! The thread pool uses a work-stealing strategy and is configured to start a +//! worker thread for each CPU core available on the system. This tends to be +//! the ideal setup for Tokio applications. +//! +//! A timer per thread pool worker thread is used to minimize the amount of +//! synchronization that is required for working with the timer. +//! +//! # Usage +//! +//! Most applications will use the [`run`] function. This takes a future to +//! "seed" the application, blocking the thread until the runtime becomes +//! [idle]. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! tokio::run(server); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! In this function, the `run` function blocks until the runtime becomes idle. +//! See [`shutdown_on_idle`][idle] for more shutdown details. +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```rust +//! # extern crate tokio; +//! # extern crate futures; +//! # use futures::{Future, Stream}; +//! use tokio::runtime::Runtime; +//! use tokio::net::TcpListener; +//! +//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +//! # unimplemented!(); +//! # } +//! # fn dox() { +//! # let addr = "127.0.0.1:8080".parse().unwrap(); +//! let listener = TcpListener::bind(&addr).unwrap(); +//! +//! let server = listener.incoming() +//! .map_err(|e| println!("error = {:?}", e)) +//! .for_each(|socket| { +//! tokio::spawn(process(socket)) +//! }); +//! +//! // Create the runtime +//! let mut rt = Runtime::new().unwrap(); +//! +//! // Spawn the server task +//! rt.spawn(server); +//! +//! // Wait until the runtime becomes idle and shut it down. +//! rt.shutdown_on_idle() +//! .wait().unwrap(); +//! # } +//! # pub fn main() {} +//! ``` +//! +//! [reactor]: ../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../timer/index.html +//! [`Runtime`]: struct.Runtime.html +//! [`Reactor`]: ../reactor/struct.Reactor.html +//! [`ThreadPool`]: ../executor/thread_pool/struct.ThreadPool.html +//! [`run`]: fn.run.html +//! [idle]: struct.Runtime.html#method.shutdown_on_idle +//! [`tokio::spawn`]: ../executor/fn.spawn.html +//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html + +mod builder; +pub mod current_thread; +mod shutdown; +mod task_executor; + +pub use self::builder::Builder; +pub use self::shutdown::Shutdown; +pub use self::task_executor::TaskExecutor; + +use reactor::{Background, Handle}; + +use std::io; + +use tokio_executor::enter; +use tokio_threadpool as threadpool; + +use futures; +use futures::future::Future; + +/// Handle to the Tokio runtime. +/// +/// The Tokio runtime includes a reactor as well as an executor for running +/// tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use [`tokio::run`], which uses a `Runtime` internally. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct.Builder.html +/// [`tokio::run`]: fn.run.html +#[derive(Debug)] +pub struct Runtime { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + /// Reactor running on a background thread. + reactor: Background, + + /// Task execution pool. + pool: threadpool::ThreadPool, +} + +// ===== impl Runtime ===== + +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// This function is used to bootstrap the execution of a Tokio application. It +/// does the following: +/// +/// * Start the Tokio runtime using a default configuration. +/// * Spawn the given future onto the thread pool. +/// * Block the current thread until the runtime shuts down. +/// +/// Note that the function will not return immediately once `future` has +/// completed. Instead it waits for the entire runtime to become idle. +/// +/// See the [module level][mod] documentation for more details. +/// +/// # Examples +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +/// +/// [mod]: ../index.html +pub fn run<F>(future: F) +where F: Future<Item = (), Error = ()> + Send + 'static, +{ + let mut runtime = Runtime::new().unwrap(); + runtime.spawn(future); + enter().expect("nested tokio::run") + .block_on(runtime.shutdown_on_idle()) + .unwrap(); +} + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a reactor, thread pool, and timer being initialized. The + /// thread pool will not spawn any worker threads until it needs to, i.e. + /// tasks are scheduled to run. + /// + /// Most users will not need to call this function directly, instead they + /// will use [`tokio::run`](fn.run.html). + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn new() -> io::Result<Self> { + Builder::new().build() + } + + #[deprecated(since = "0.1.5", note = "use `reactor` instead")] + #[doc(hidden)] + pub fn handle(&self) -> &Handle { + self.reactor() + } + + /// Return a reference to the reactor handle for this runtime instance. + /// + /// The returned handle reference can be cloned in order to get an owned + /// value of the handle. This handle can be used to initialize I/O resources + /// (like TCP or UDP sockets) that will not be used on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let reactor_handle = rt.reactor().clone(); + /// + /// // use `reactor_handle` + /// ``` + pub fn reactor(&self) -> &Handle { + self.inner().reactor.handle() + } + + /// Return a handle to the runtime's executor. + /// + /// The returned handle can be used to spawn tasks that run on this runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let executor_handle = rt.executor(); + /// + /// // use `executor_handle` + /// ``` + pub fn executor(&self) -> TaskExecutor { + let inner = self.inner().pool.sender().clone(); + TaskExecutor { inner } + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&mut self, future: F) -> &mut Self + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner_mut().pool.sender().spawn(future).unwrap(); + self + } + + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() + } + + /// Run a future to completion on the Tokio runtime, then wait for all + /// background futures to complete too. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, waiting for background futures to complete, and yielding + /// its resolved result. Any tasks or timers which the future spawns + /// internally will be executed on the runtime and waited for completion. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on_all<F, R, E>(mut self, future: F) -> Result<R, E> + where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, + { + let res = self.block_on(future); + self.shutdown_on_idle().wait().unwrap(); + res + } + + /// Signals the runtime to shutdown once it becomes idle. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function can be used to perform a graceful shutdown of the runtime. + /// + /// The runtime enters an idle state once **all** of the following occur. + /// + /// * The thread pool has no tasks to execute, i.e., all tasks that were + /// spawned have completed. + /// * The reactor is not managing any I/O resources. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_on_idle() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + + let inner = Box::new({ + let pool = inner.pool; + let reactor = inner.reactor; + + pool.shutdown_on_idle().and_then(|_| { + reactor.shutdown_on_idle() + }) + }); + + Shutdown { inner } + } + + /// Signals the runtime to shutdown immediately. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function will forcibly shutdown the runtime, causing any + /// in-progress work to become canceled. The shutdown steps are: + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + Shutdown::shutdown_now(inner) + } + + fn inner(&self) -> &Inner { + self.inner.as_ref().unwrap() + } + + fn inner_mut(&mut self) -> &mut Inner { + self.inner.as_mut().unwrap() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let shutdown = Shutdown::shutdown_now(inner); + let _ = shutdown.wait(); + } + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs b/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs new file mode 100644 index 0000000000..1aca557277 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/shutdown.rs @@ -0,0 +1,46 @@ +use runtime::Inner; + +use std::fmt; + +use futures::{Future, Poll}; + +/// A future that resolves when the Tokio `Runtime` is shut down. +pub struct Shutdown { + pub(super) inner: Box<Future<Item = (), Error = ()> + Send>, +} + +impl Shutdown { + pub(super) fn shutdown_now(inner: Inner) -> Self { + let inner = Box::new({ + let pool = inner.pool; + let reactor = inner.reactor; + + pool.shutdown_now().and_then(|_| { + reactor.shutdown_now() + .then(|_| { + Ok(()) + }) + }) + }); + + Shutdown { inner } + } +} + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + try_ready!(self.inner.poll()); + Ok(().into()) + } +} + +impl fmt::Debug for Shutdown { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Shutdown") + .field("inner", &"Box<Future<Item = (), Error = ()>>") + .finish() + } +} diff --git a/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs b/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs new file mode 100644 index 0000000000..e213201ab0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/runtime/task_executor.rs @@ -0,0 +1,75 @@ + +use tokio_threadpool::Sender; + +use futures::future::{self, Future}; + +/// Executes futures on the runtime +/// +/// All futures spawned using this executor will be submitted to the associated +/// Runtime's executor. This executor is usually a thread pool. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + pub(super) inner: Sender, +} + +impl TaskExecutor { + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// let executor = rt.executor(); + /// + /// // Spawn a future onto the runtime + /// executor.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&self, future: F) + where F: Future<Item = (), Error = ()> + Send + 'static, + { + self.inner.spawn(future).unwrap(); + } +} + +impl<T> future::Executor<T> for TaskExecutor +where T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + self.inner.execute(future) + } +} + +impl ::executor::Executor for TaskExecutor { + fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>) + -> Result<(), ::executor::SpawnError> + { + self.inner.spawn(future) + } +} diff --git a/third_party/rust/tokio-0.1.11/src/timer.rs b/third_party/rust/tokio-0.1.11/src/timer.rs new file mode 100644 index 0000000000..fc85a2a724 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/timer.rs @@ -0,0 +1,102 @@ +//! Utilities for tracking time. +//! +//! This module provides a number of types for executing code after a set period +//! of time. +//! +//! * [`Delay`][Delay] is a future that does no work and completes at a specific `Instant` +//! in time. +//! +//! * [`Interval`][Interval] is a stream yielding a value at a fixed period. It +//! is initialized with a `Duration` and repeatedly yields each time the +//! duration elapses. +//! +//! * [`Timeout`][Timeout]: Wraps a future or stream, setting an upper bound to the +//! amount of time it is allowed to execute. If the future or stream does not +//! complete in time, then it is canceled and an error is returned. +//! +//! * [`DelayQueue`]: A queue where items are returned once the requested delay +//! has expired. +//! +//! These types are sufficient for handling a large number of scenarios +//! involving time. +//! +//! These types must be used from within the context of the +//! [`Runtime`][runtime] or a timer context must be setup explicitly. See the +//! [`tokio-timer`][tokio-timer] crate for more details on how to setup a timer +//! context. +//! +//! # Examples +//! +//! Wait 100ms and print "Hello World!" +//! +//! ``` +//! use tokio::prelude::*; +//! use tokio::timer::Delay; +//! +//! use std::time::{Duration, Instant}; +//! +//! let when = Instant::now() + Duration::from_millis(100); +//! +//! tokio::run({ +//! Delay::new(when) +//! .map_err(|e| panic!("timer failed; err={:?}", e)) +//! .and_then(|_| { +//! println!("Hello world!"); +//! Ok(()) +//! }) +//! }) +//! ``` +//! +//! Require that an operation takes no more than 300ms. Note that this uses the +//! [`timeout`][ext] function on the [`FutureExt`][ext] trait. This trait is +//! included in the prelude. +//! +//! ``` +//! # extern crate futures; +//! # extern crate tokio; +//! use tokio::prelude::*; +//! +//! use std::time::{Duration, Instant}; +//! +//! fn long_op() -> Box<Future<Item = (), Error = ()> + Send> { +//! // ... +//! # Box::new(futures::future::ok(())) +//! } +//! +//! # fn main() { +//! tokio::run({ +//! long_op() +//! .timeout(Duration::from_millis(300)) +//! .map_err(|e| { +//! println!("operation timed out"); +//! }) +//! }) +//! # } +//! ``` +//! +//! [runtime]: ../runtime/struct.Runtime.html +//! [tokio-timer]: https://docs.rs/tokio-timer +//! [ext]: ../util/trait.FutureExt.html#method.timeout +//! [Timeout]: struct.Timeout.html +//! [Delay]: struct.Delay.html +//! [Interval]: struct.Interval.html +//! [`DelayQueue`]: struct.DelayQueue.html + +pub use tokio_timer::{ + delay_queue, + DelayQueue, + Error, + Interval, + Delay, + Timeout, + timeout, +}; + +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type Deadline<T> = ::tokio_timer::Deadline<T>; +#[deprecated(since = "0.1.8", note = "use Timeout instead")] +#[allow(deprecated)] +#[doc(hidden)] +pub type DeadlineError<T> = ::tokio_timer::DeadlineError<T>; diff --git a/third_party/rust/tokio-0.1.11/src/util/future.rs b/third_party/rust/tokio-0.1.11/src/util/future.rs new file mode 100644 index 0000000000..92097ad9ea --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/util/future.rs @@ -0,0 +1,87 @@ +#[allow(deprecated)] +use tokio_timer::Deadline; +use tokio_timer::Timeout; + +use futures::Future; + +use std::time::{Instant, Duration}; + + +/// An extension trait for `Future` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there only is a [`timeout`] function, but this will increase +/// over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Future` already implement `FutureExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait FutureExt: Future { + + /// Creates a new future which allows `self` until `timeout`. + /// + /// This combinator creates a new future which wraps the receiving future + /// with a timeout. The returned future is allowed to execute until it + /// completes or `timeout` has elapsed, whichever happens first. + /// + /// If the future completes before `timeout` then the future will resolve + /// with that item. Otherwise the future will resolve to an error. + /// + /// The future is guaranteed to be polled at least once, even if `timeout` + /// is set to zero. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let future = long_future() + /// .timeout(Duration::from_secs(1)) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(future); + /// # } + /// ``` + fn timeout(self, timeout: Duration) -> Timeout<Self> + where Self: Sized, + { + Timeout::new(self, timeout) + } + + #[deprecated(since = "0.1.8", note = "use `timeout` instead")] + #[allow(deprecated)] + #[doc(hidden)] + fn deadline(self, deadline: Instant) -> Deadline<Self> + where Self: Sized, + { + Deadline::new(self, deadline) + } +} + +impl<T: ?Sized> FutureExt for T where T: Future {} + +#[cfg(test)] +mod test { + use super::*; + use prelude::future; + + #[test] + fn timeout_polls_at_least_once() { + let base_future = future::result::<(), ()>(Ok(())); + let timeouted_future = base_future.timeout(Duration::new(0, 0)); + assert!(timeouted_future.wait().is_ok()); + } +} diff --git a/third_party/rust/tokio-0.1.11/src/util/mod.rs b/third_party/rust/tokio-0.1.11/src/util/mod.rs new file mode 100644 index 0000000000..3ebd1fc708 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/util/mod.rs @@ -0,0 +1,14 @@ +//! Utilities for working with Tokio. +//! +//! This module contains utilities that are useful for working with Tokio. +//! Currently, this only includes [`FutureExt`] and [`StreamExt`], but this +//! may grow over time. +//! +//! [`FutureExt`]: trait.FutureExt.html +//! [`StreamExt`]: trait.StreamExt.html + +mod future; +mod stream; + +pub use self::future::FutureExt; +pub use self::stream::StreamExt; diff --git a/third_party/rust/tokio-0.1.11/src/util/stream.rs b/third_party/rust/tokio-0.1.11/src/util/stream.rs new file mode 100644 index 0000000000..ef268483c0 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/src/util/stream.rs @@ -0,0 +1,62 @@ +use tokio_timer::Timeout; + +use futures::Stream; + +use std::time::Duration; + + +/// An extension trait for `Stream` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there only is a [`timeout`] function, but this will increase +/// over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Stream` already implement `StreamExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait StreamExt: Stream { + + /// Creates a new stream which allows `self` until `timeout`. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// with a timeout. For each item, the returned stream is allowed to execute + /// until it completes or `timeout` has elapsed, whichever happens first. + /// + /// If an item completes before `timeout` then the stream will yield + /// with that item. Otherwise the stream will yield to an error. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let stream = long_future() + /// .into_stream() + /// .timeout(Duration::from_secs(1)) + /// .for_each(|i| future::ok(println!("item = {:?}", i))) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(stream); + /// # } + /// ``` + fn timeout(self, timeout: Duration) -> Timeout<Self> + where Self: Sized, + { + Timeout::new(self, timeout) + } +} + +impl<T: ?Sized> StreamExt for T where T: Stream {} diff --git a/third_party/rust/tokio-0.1.11/tests/buffered.rs b/third_party/rust/tokio-0.1.11/tests/buffered.rs new file mode 100644 index 0000000000..3605eba38a --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/buffered.rs @@ -0,0 +1,63 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_io; + +use std::net::TcpStream; +use std::thread; +use std::io::{Read, Write, BufReader, BufWriter}; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::copy; +use tokio::net::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo_server() { + const N: usize = 1024; + drop(env_logger::try_init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let msg = "foo bar baz"; + let t = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + + let t2 = thread::spawn(move || { + let mut s = t!(TcpStream::connect(&addr)); + let mut b = vec![0; msg.len() * N]; + t!(s.read_exact(&mut b)); + b + }); + + let mut expected = Vec::<u8>::new(); + for _i in 0..N { + expected.extend(msg.as_bytes()); + assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); + } + (expected, t2) + }); + + let clients = srv.incoming().take(2).collect(); + let copied = clients.and_then(|clients| { + let mut clients = clients.into_iter(); + let a = BufReader::new(clients.next().unwrap()); + let b = BufWriter::new(clients.next().unwrap()); + copy(a, b) + }); + + let (amt, _, _) = t!(copied.wait()); + let (expected, t2) = t.join().unwrap(); + let actual = t2.join().unwrap(); + + assert!(expected == actual); + assert_eq!(amt, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio-0.1.11/tests/clock.rs b/third_party/rust/tokio-0.1.11/tests/clock.rs new file mode 100644 index 0000000000..6e9d9121fc --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/clock.rs @@ -0,0 +1,69 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_timer; +extern crate env_logger; + +use tokio::prelude::*; +use tokio::runtime::{self, current_thread}; +use tokio::timer::*; +use tokio_timer::clock::Clock; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +struct MockNow(Instant); + +impl tokio_timer::clock::Now for MockNow { + fn now(&self) -> Instant { + self.0 + } +} + +#[test] +fn clock_and_timer_concurrent() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = runtime::Builder::new() + .clock(clock) + .build() + .unwrap(); + + let (tx, rx) = mpsc::channel(); + + rt.spawn({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn clock_and_timer_single_threaded() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(5_000); + let clock = Clock::new_with_now(MockNow(when)); + + let mut rt = current_thread::Builder::new() + .clock(clock) + .build() + .unwrap(); + + rt.block_on({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() < when); + Ok(()) + }) + }).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/tests/drop-core.rs b/third_party/rust/tokio-0.1.11/tests/drop-core.rs new file mode 100644 index 0000000000..75ac9b7eb1 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/drop-core.rs @@ -0,0 +1,42 @@ +extern crate tokio; +extern crate futures; + +use std::thread; +use std::net; + +use futures::future; +use futures::prelude::*; +use futures::sync::oneshot; +use tokio::net::TcpListener; +use tokio::reactor::Reactor; + +#[test] +fn tcp_doesnt_block() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + drop(core); + assert!(listener.incoming().wait().next().unwrap().is_err()); +} + +#[test] +fn drop_wakes() { + let core = Reactor::new().unwrap(); + let handle = core.handle(); + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let listener = TcpListener::from_std(listener, &handle).unwrap(); + let (tx, rx) = oneshot::channel::<()>(); + let t = thread::spawn(move || { + let incoming = listener.incoming(); + let new_socket = incoming.into_future().map_err(|_| ()); + let drop_tx = future::lazy(|| { + drop(tx); + future::ok(()) + }); + assert!(new_socket.join(drop_tx).wait().is_err()); + }); + drop(rx.wait()); + drop(core); + t.join().unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/tests/global.rs b/third_party/rust/tokio-0.1.11/tests/global.rs new file mode 100644 index 0000000000..d3bc09315a --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/global.rs @@ -0,0 +1,136 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_io; +extern crate env_logger; + +use std::{io, thread}; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +use futures::prelude::*; +use tokio::net::{TcpStream, TcpListener}; +use tokio::runtime::Runtime; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn hammer_old() { + let _ = env_logger::try_init(); + + let threads = (0..10).map(|_| { + thread::spawn(|| { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + let mine = TcpStream::connect(&addr); + let theirs = srv.incoming().into_future() + .map(|(s, _)| s.unwrap()) + .map_err(|(s, _)| s); + let (mine, theirs) = t!(mine.join(theirs).wait()); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); + }) + }).collect::<Vec<_>>(); + for thread in threads { + thread.join().unwrap(); + } +} + +struct Rd(Arc<TcpStream>); +struct Wr(Arc<TcpStream>); + +impl io::Read for Rd { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + <&TcpStream>::read(&mut &*self.0, dst) + } +} + +impl tokio_io::AsyncRead for Rd { +} + +impl io::Write for Wr { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + <&TcpStream>::write(&mut &*self.0, src) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Wr { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +#[test] +fn hammer_split() { + use tokio_io::io; + + const N: usize = 100; + const ITER: usize = 10; + + let _ = env_logger::try_init(); + + for _ in 0..ITER { + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + + let cnt = Arc::new(AtomicUsize::new(0)); + + let mut rt = Runtime::new().unwrap(); + + fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) { + let socket = Arc::new(socket); + let rd = Rd(socket.clone()); + let wr = Wr(socket); + + let cnt2 = cnt.clone(); + + let rd = io::read(rd, vec![0; 1]) + .map(move |_| { + cnt2.fetch_add(1, Relaxed); + }) + .map_err(|e| panic!("read error = {:?}", e)); + + let wr = io::write_all(wr, b"1") + .map(move |_| { + cnt.fetch_add(1, Relaxed); + }) + .map_err(move |e| panic!("write error = {:?}", e)); + + tokio::spawn(rd); + tokio::spawn(wr); + } + + rt.spawn({ + let cnt = cnt.clone(); + srv.incoming() + .map_err(|e| panic!("accept error = {:?}", e)) + .take(N as u64) + .for_each(move |socket| { + split(socket, cnt.clone()); + Ok(()) + }) + }); + + for _ in 0..N { + rt.spawn({ + let cnt = cnt.clone(); + TcpStream::connect(&addr) + .map_err(move |e| panic!("connect error = {:?}", e)) + .map(move |socket| split(socket, cnt)) + }); + } + + rt.shutdown_on_idle().wait().unwrap(); + assert_eq!(N * 4, cnt.load(Relaxed)); + } +} diff --git a/third_party/rust/tokio-0.1.11/tests/length_delimited.rs b/third_party/rust/tokio-0.1.11/tests/length_delimited.rs new file mode 100644 index 0000000000..318f35ef33 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/length_delimited.rs @@ -0,0 +1,564 @@ +extern crate tokio; +extern crate futures; +extern crate bytes; + +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::codec::*; + +use bytes::Bytes; +use futures::{Stream, Sink, Poll}; +use futures::Async::*; + +use std::io; +use std::collections::VecDeque; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + + +#[test] +fn read_empty_io_yields_nothing() { + let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_read(mock! { + Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_native_endian() { + let data = if cfg!(target_endian = "big") { + b"\x00\x00\x00\x09abcdefghi" + } else { + b"\x09\x00\x00\x00abcdefghi" + }; + let mut io = length_delimited::Builder::new() + .native_endian() + .new_read(mock! { + Ok(data[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi"); + data.extend_from_slice(b"\x00\x00\x00\x03123"); + data.extend_from_slice(b"\x00\x00\x00\x0bhello world"); + + let mut io = FramedRead::new(mock! { + Ok(data.into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet_wait() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet_wait() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Err(would_block()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_incomplete_head() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + }, LengthDelimitedCodec::new()); + + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_head_multi() { + let mut io = FramedRead::new(mock! { + Err(would_block()), + Ok(b"\x00"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_payload() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00\x00\x09ab"[..].into()), + Err(would_block()), + Ok(b"cd"[..].into()), + Err(would_block()), + }, LengthDelimitedCodec::new()); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new() + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new() + .new_read(mock! { + Ok(b"\x00\x00\x00\x09abcd"[..].into()), + Err(would_block()), + Ok(b"efghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + io.decoder_mut().set_max_frame_length(5); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_one_byte_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_read(mock! { + Ok(b"\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_header_offset() { + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(4) + .new_read(mock! { + Ok(b"zzzz\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_skip_none_adjusted() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"xx\x00\x09abcdefghi"); + data.extend_from_slice(b"yy\x00\x03123"); + data.extend_from_slice(b"zz\x00\x0bhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_field_offset(2) + .num_skip(0) + .length_adjustment(4) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"xx\x00\x09abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"zz\x00\x0bhello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_length_includes_head() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x0babcdefghi"); + data.extend_from_slice(b"\x00\x05123"); + data.extend_from_slice(b"\x00\x0dhello world"); + + let mut io = length_delimited::Builder::new() + .length_field_length(2) + .length_adjustment(-2) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn write_single_frame_length_adjusted() { + let mut io = length_delimited::Builder::new() + .length_adjustment(-2) + .new_write(mock! { + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_nothing_yields_nothing() { + let mut io = FramedWrite::new( + mock!(), + LengthDelimitedCodec::new() + ); + assert!(io.poll_complete().unwrap().is_ready()); +} + +#[test] +fn write_single_frame_one_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_one_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_multi_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send(Bytes::from("123")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_would_block() { + let mut io = FramedWrite::new(mock! { + Err(would_block()), + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }, LengthDelimitedCodec::new()); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_little_endian() { + let mut io = length_delimited::Builder::new() + .little_endian() + .new_write(mock! { + Ok(b"\x09\x00\x00\x00"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + + +#[test] +fn write_single_frame_with_short_length_field() { + let mut io = length_delimited::Builder::new() + .length_field_length(1) + .new_write(mock! { + Ok(b"\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_max_frame_len() { + let mut io = length_delimited::Builder::new() + .max_frame_length(5) + .new_write(mock! { }); + + assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_at_rest() { + let mut io = length_delimited::Builder::new() + .new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"abcdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_in_flight() { + let mut io = length_delimited::Builder::new() + .new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"ab"[..].into()), + Err(would_block()), + Ok(b"cdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + io.encoder_mut().set_max_frame_length(5); + assert!(io.poll_complete().unwrap().is_ready()); + assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_zero() { + let mut io = length_delimited::Builder::new() + .new_write(mock! { }); + + assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready()); + assert_eq!(io.poll_complete().unwrap_err().kind(), io::ErrorKind::WriteZero); + assert!(io.get_ref().calls.is_empty()); +} + +// ===== Test utils ===== + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +struct Mock { + calls: VecDeque<io::Result<Op>>, +} + +enum Op { + Data(Vec<u8>), + Flush, +} + +use self::Op::*; + +impl io::Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock { +} + +impl io::Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + let len = data.len(); + assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); + assert_eq!(&data[..], &src[..len]); + Ok(len) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.calls.pop_front() { + Some(Ok(Op::Flush)) => { + Ok(()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(()), + } + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Ready(())) + } +} + +impl<'a> From<&'a [u8]> for Op { + fn from(src: &'a [u8]) -> Op { + Op::Data(src.into()) + } +} + +impl From<Vec<u8>> for Op { + fn from(src: Vec<u8>) -> Op { + Op::Data(src) + } +} diff --git a/third_party/rust/tokio-0.1.11/tests/line-frames.rs b/third_party/rust/tokio-0.1.11/tests/line-frames.rs new file mode 100644 index 0000000000..e36d5a73ee --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/line-frames.rs @@ -0,0 +1,88 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_threadpool; +extern crate bytes; + +use std::io; +use std::net::Shutdown; + +use bytes::{BytesMut, BufMut}; +use futures::{Future, Stream, Sink}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_codec::{Encoder, Decoder}; +use tokio_io::io::{write_all, read}; +use tokio_threadpool::Builder; + +pub struct LineCodec; + +impl Decoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + match buf.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some(buf.split_to(i + 1).into())), + None => Ok(None), + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() == 0 { + Ok(None) + } else { + let amt = buf.len(); + Ok(Some(buf.split_to(amt))) + } + } +} + +impl Encoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> { + into.put(&item[..]); + Ok(()) + } +} + +#[test] +fn echo() { + drop(env_logger::try_init()); + + let pool = Builder::new() + .pool_size(1) + .build(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + let sender = pool.sender().clone(); + let srv = listener.incoming().for_each(move |socket| { + let (sink, stream) = LineCodec.framed(socket).split(); + sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap(); + Ok(()) + }); + + pool.sender().spawn(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); + + let client = TcpStream::connect(&addr); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); + assert_eq!(amt, 2); + assert_eq!(&buf[..2], b"a\n"); + + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"\n"); + + let (client, _) = write_all(client, b"b").wait().unwrap(); + client.shutdown(Shutdown::Write).unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"b"); +} diff --git a/third_party/rust/tokio-0.1.11/tests/pipe-hup.rs b/third_party/rust/tokio-0.1.11/tests/pipe-hup.rs new file mode 100644 index 0000000000..a23ae7f6ba --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/pipe-hup.rs @@ -0,0 +1,88 @@ +#![cfg(unix)] + +extern crate env_logger; +extern crate futures; +extern crate libc; +extern crate mio; +extern crate tokio; +extern crate tokio_io; + +use std::fs::File; +use std::io::{self, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::thread; +use std::time::Duration; + +use mio::event::Evented; +use mio::unix::{UnixReady, EventedFd}; +use mio::{PollOpt, Ready, Token}; +use tokio::reactor::{Handle, PollEvented2}; +use tokio_io::io::read_to_end; +use futures::Future; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +struct MyFile(File); + +impl MyFile { + fn new(file: File) -> MyFile { + unsafe { + let r = libc::fcntl(file.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); + assert!(r != -1, "fcntl error: {}", io::Error::last_os_error()); + } + MyFile(file) + } +} + +impl io::Read for MyFile { + fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { + self.0.read(bytes) + } +} + +impl Evented for MyFile { + fn register(&self, poll: &mio::Poll, token: Token, interest: Ready, opts: PollOpt) + -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).register(poll, token, interest | hup, opts) + } + fn reregister(&self, poll: &mio::Poll, token: Token, interest: Ready, opts: PollOpt) + -> io::Result<()> { + let hup: Ready = UnixReady::hup().into(); + EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest | hup, opts) + } + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + EventedFd(&self.0.as_raw_fd()).deregister(poll) + } +} + +#[test] +fn hup() { + drop(env_logger::try_init()); + + let handle = Handle::default(); + unsafe { + let mut pipes = [0; 2]; + assert!(libc::pipe(pipes.as_mut_ptr()) != -1, + "pipe error: {}", io::Error::last_os_error()); + let read = File::from_raw_fd(pipes[0]); + let mut write = File::from_raw_fd(pipes[1]); + let t = thread::spawn(move || { + write.write_all(b"Hello!\n").unwrap(); + write.write_all(b"Good bye!\n").unwrap(); + thread::sleep(Duration::from_millis(100)); + }); + + let source = PollEvented2::new_with_handle(MyFile::new(read), &handle).unwrap(); + + let reader = read_to_end(source, Vec::new()); + let (_, content) = t!(reader.wait()); + assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); + t.join().unwrap(); + } +} diff --git a/third_party/rust/tokio-0.1.11/tests/reactor.rs b/third_party/rust/tokio-0.1.11/tests/reactor.rs new file mode 100644 index 0000000000..1bac13ad40 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/reactor.rs @@ -0,0 +1,89 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_tcp; + +use tokio_reactor::Reactor; +use tokio_tcp::TcpListener; + +use futures::{Future, Stream}; +use futures::executor::{spawn, Notify, Spawn}; + +use std::mem; +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +#[test] +fn test_drop_on_notify() { + // When the reactor receives a kernel notification, it notifies the + // task that holds the associated socket. If this notification results in + // the task being dropped, the socket will also be dropped. + // + // Previously, there was a deadlock scenario where the reactor, while + // notifying, held a lock and the task being dropped attempted to acquire + // that same lock in order to clean up state. + // + // To simulate this case, we create a fake executor that does nothing when + // the task is notified. This simulates an executor in the process of + // shutting down. Then, when the task handle is dropped, the task itself is + // dropped. + + struct MyNotify; + + type Task = Mutex<Spawn<Box<Future<Item = (), Error = ()>>>>; + + impl Notify for MyNotify { + fn notify(&self, _: usize) { + // Do nothing + } + + fn clone_id(&self, id: usize) -> usize { + let ptr = id as *const Task; + let task = unsafe { Arc::from_raw(ptr) }; + + mem::forget(task.clone()); + mem::forget(task); + + id + } + + fn drop_id(&self, id: usize) { + let ptr = id as *const Task; + let _ = unsafe { Arc::from_raw(ptr) }; + } + } + + let addr = "127.0.0.1:0".parse().unwrap(); + let mut reactor = Reactor::new().unwrap(); + + // Create a listener + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Define a task that just drains the listener + let task = Box::new({ + listener.incoming() + .for_each(|_| Ok(())) + .map_err(|_| panic!()) + }) as Box<Future<Item = (), Error = ()>>; + + let task = Arc::new(Mutex::new(spawn(task))); + let notify = Arc::new(MyNotify); + + let mut enter = tokio_executor::enter().unwrap(); + + tokio_reactor::with_default(&reactor.handle(), &mut enter, |_| { + let id = &*task as *const Task as usize; + + task.lock().unwrap() + .poll_future_notify(¬ify, id) + .unwrap(); + }); + + drop(task); + + // Establish a connection to the acceptor + let _s = TcpStream::connect(&addr).unwrap(); + + reactor.turn(None).unwrap(); +} diff --git a/third_party/rust/tokio-0.1.11/tests/runtime.rs b/third_party/rust/tokio-0.1.11/tests/runtime.rs new file mode 100644 index 0000000000..66d10b9510 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/runtime.rs @@ -0,0 +1,404 @@ +extern crate tokio; +extern crate env_logger; +extern crate futures; + +use futures::sync::oneshot; +use std::sync::{Arc, Mutex}; +use std::thread; +use tokio::io; +use tokio::net::{TcpStream, TcpListener}; +use tokio::prelude::future::lazy; +use tokio::prelude::*; +use tokio::runtime::Runtime; + +// this import is used in all child modules that have it in scope +// from importing super::*, but the compiler doesn't realise that +// and warns about it. +pub use futures::future::Executor; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +fn create_client_server_future() -> Box<Future<Item=(), Error=()> + Send> { + let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(server.local_addr()); + let client = TcpStream::connect(&addr); + + let server = server.incoming().take(1) + .map_err(|e| panic!("accept err = {:?}", e)) + .for_each(|socket| { + tokio::spawn({ + io::write_all(socket, b"hello") + .map(|_| ()) + .map_err(|e| panic!("write err = {:?}", e)) + }) + }) + .map(|_| ()); + + let client = client + .map_err(|e| panic!("connect err = {:?}", e)) + .and_then(|client| { + // Read all + io::read_to_end(client, vec![]) + .map(|_| ()) + .map_err(|e| panic!("read err = {:?}", e)) + }); + + let future = server.join(client) + .map(|_| ()); + Box::new(future) +} + +#[test] +fn runtime_tokio_run() { + let _ = env_logger::try_init(); + + tokio::run(create_client_server_future()); +} + +#[test] +fn runtime_single_threaded() { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::current_thread::Runtime::new() + .unwrap(); + runtime.block_on(create_client_server_future()).unwrap(); + runtime.run().unwrap(); +} + +#[test] +fn runtime_single_threaded_block_on() { + let _ = env_logger::try_init(); + + tokio::runtime::current_thread::block_on_all(create_client_server_future()).unwrap(); +} + +mod runtime_single_threaded_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let msg = tokio::runtime::current_thread::block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })).unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(|f| { tokio::spawn(f); }) + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod runtime_single_threaded_racy { + use super::*; + fn test<F>(spawn: F) + where + F: Fn( + tokio::runtime::current_thread::Handle, + Box<Future<Item=(), Error=()> + Send>, + ), + { + let (trigger, exit) = futures::sync::oneshot::channel(); + let (handle_tx, handle_rx) = ::std::sync::mpsc::channel(); + let jh = ::std::thread::spawn(move || { + let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); + handle_tx.send(rt.handle()).unwrap(); + + // don't exit until we are told to + rt.block_on(exit.map_err(|_| ())).unwrap(); + + // run until all spawned futures (incl. the "exit" signal future) have completed. + rt.run().unwrap(); + }); + + let (tx, rx) = futures::sync::oneshot::channel(); + + let handle = handle_rx.recv().unwrap(); + spawn(handle, Box::new(futures::future::lazy(move || { + tx.send(()).unwrap(); + Ok(()) + }))); + + // signal runtime thread to exit + trigger.send(()).unwrap(); + + // wait for runtime thread to exit + jh.join().unwrap(); + + assert_eq!(rx.wait().unwrap(), ()); + } + + #[test] + fn spawn() { + test(|handle, f| { handle.spawn(f).unwrap(); }) + } + + #[test] + fn execute() { + test(|handle, f| { handle.execute(f).unwrap(); }) + } +} + +mod runtime_multi_threaded { + use super::*; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime) + Send + 'static, + { + let _ = env_logger::try_init(); + + let mut runtime = tokio::runtime::Builder::new() + .build() + .unwrap(); + spawn(&mut runtime); + runtime.shutdown_on_idle().wait().unwrap(); + } + + #[test] + fn spawn() { + test(|rt| { rt.spawn(create_client_server_future()); }); + } + + #[test] + fn execute() { + test(|rt| { rt.executor().execute(create_client_server_future()).unwrap(); }); + } +} + + +#[test] +fn block_on_timer() { + use std::time::{Duration, Instant}; + use tokio::timer::{Delay, Error}; + + fn after_1s<T>(x: T) -> Box<Future<Item = T, Error = Error> + Send> + where + T: Send + 'static, + { + Box::new(Delay::new(Instant::now() + Duration::from_millis(100)).map(move |_| x)) + } + + let mut runtime = Runtime::new().unwrap(); + assert_eq!(runtime.block_on(after_1s(42)).unwrap(), 42); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod from_block_on { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { + tokio::spawn(f); + }) + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + use std::time::Duration; + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let mut runtime = Runtime::new().unwrap(); + runtime + .block_on(rx.then(move |_| { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, *cnt.lock().unwrap()); + runtime.shutdown_on_idle().wait().unwrap(); +} + +mod many { + use super::*; + + const ITER: usize = 200; + fn test<F>(spawn: F) + where + F: Fn(&mut Runtime, Box<Future<Item=(), Error=()> + Send>), + { + let cnt = Arc::new(Mutex::new(0)); + let mut runtime = Runtime::new().unwrap(); + + for _ in 0..ITER { + let c = cnt.clone(); + spawn(&mut runtime, Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + } + + runtime.shutdown_on_idle().wait().unwrap(); + assert_eq!(ITER, *cnt.lock().unwrap()); + } + + #[test] + fn spawn() { + test(|rt, f| { rt.spawn(f); }) + } + + #[test] + fn execute() { + test(|rt, f| { + rt.executor() + .execute(f) + .unwrap(); + }) + } +} + + +mod from_block_on_all { + use super::*; + + fn test<F>(spawn: F) + where + F: Fn(Box<Future<Item=(), Error=()> + Send>) + Send + 'static, + { + let cnt = Arc::new(Mutex::new(0)); + let c = cnt.clone(); + + let runtime = Runtime::new().unwrap(); + let msg = runtime + .block_on_all(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + + // Spawn! + spawn(Box::new(lazy(move || { + { + let mut x = c.lock().unwrap(); + *x = 1 + *x; + } + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, *cnt.lock().unwrap()); + assert_eq!(msg, "hello"); + } + + #[test] + fn execute() { + test(|f| { + tokio::executor::DefaultExecutor::current() + .execute(f) + .unwrap(); + }) + } + + #[test] + fn spawn() { + test(|f| { tokio::spawn(f); }) + } +} + +#[test] +fn run_in_run() { + use std::panic; + + tokio::run(lazy(|| { + panic::catch_unwind(|| { + tokio::run(lazy(|| { Ok::<(), ()>(()) })) + }).unwrap_err(); + Ok::<(), ()>(()) + })); +} diff --git a/third_party/rust/tokio-0.1.11/tests/timer.rs b/third_party/rust/tokio-0.1.11/tests/timer.rs new file mode 100644 index 0000000000..72a5595d76 --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/timer.rs @@ -0,0 +1,116 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_io; +extern crate env_logger; + +use tokio::prelude::*; +use tokio::timer::*; + +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +#[test] +fn timer_with_runtime() { + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(100); + let (tx, rx) = mpsc::channel(); + + tokio::run({ + Delay::new(when) + .map_err(|e| panic!("unexpected error; err={:?}", e)) + .and_then(move |_| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn starving() { + use futures::{task, Poll, Async}; + + let _ = env_logger::try_init(); + + struct Starve(Delay, u64); + + impl Future for Starve { + type Item = u64; + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, ()> { + if self.0.poll().unwrap().is_ready() { + return Ok(self.1.into()); + } + + self.1 += 1; + + task::current().notify(); + + Ok(Async::NotReady) + } + } + + let when = Instant::now() + Duration::from_millis(20); + let starve = Starve(Delay::new(when), 0); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + starve + .and_then(move |_ticks| { + assert!(Instant::now() >= when); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn deadline() { + use futures::future; + + let _ = env_logger::try_init(); + + let when = Instant::now() + Duration::from_millis(20); + let (tx, rx) = mpsc::channel(); + + #[allow(deprecated)] + tokio::run({ + future::empty::<(), ()>() + .deadline(when) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} + +#[test] +fn timeout() { + use futures::future; + + let _ = env_logger::try_init(); + + let (tx, rx) = mpsc::channel(); + + tokio::run({ + future::empty::<(), ()>() + .timeout(Duration::from_millis(20)) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }) + }); + + rx.recv().unwrap(); +} |