summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio-extras
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/mio-extras')
-rw-r--r--third_party/rust/mio-extras/.cargo-checksum.json1
-rw-r--r--third_party/rust/mio-extras/CHANGELOG.md37
-rw-r--r--third_party/rust/mio-extras/Cargo.toml40
-rw-r--r--third_party/rust/mio-extras/LICENSE-APACHE201
-rw-r--r--third_party/rust/mio-extras/LICENSE-MIT25
-rw-r--r--third_party/rust/mio-extras/README.md30
-rw-r--r--third_party/rust/mio-extras/src/channel.rs431
-rw-r--r--third_party/rust/mio-extras/src/lib.rs33
-rw-r--r--third_party/rust/mio-extras/src/timer.rs751
-rw-r--r--third_party/rust/mio-extras/test/mod.rs45
-rw-r--r--third_party/rust/mio-extras/test/test_poll_channel.rs362
-rw-r--r--third_party/rust/mio-extras/test/test_timer.rs308
12 files changed, 2264 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/.cargo-checksum.json b/third_party/rust/mio-extras/.cargo-checksum.json
new file mode 100644
index 0000000000..cb15205f9b
--- /dev/null
+++ b/third_party/rust/mio-extras/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"CHANGELOG.md":"499f8d84e8bee37198044301a5fdfe6a811dd19b62c2c771616185a7543b8334","Cargo.toml":"a4d1b4f5de2908b805a42b31472dfd5fee30a49dc4f575a174c581c7b57b25b7","LICENSE-APACHE":"406e5cbaa2ad1178c300cf28ac5258e8d0db0de4f061e78db559d30e6f38e25c","LICENSE-MIT":"8aa414e6c821efd8be6bade07368a5d9f51f5cc55718bc54e10a59eb826b8d58","README.md":"fa2642be7bd670014c5e25bafbee73b8be0667ddbd193c1cc344a71d7f59463f","src/channel.rs":"a9fb5bcf889b03766821011e94b30a351b80501523c4a9fe5c45796eae218968","src/lib.rs":"2ed1572d3255208681d017265df7f642eb4898b1c2dace91676935f55e03eb04","src/timer.rs":"a1e71e38ab983291557d534ce2454a0ba5872652f4e7c4161131ba5150ec8d57","test/mod.rs":"aa3afc2582f00e5e2a2e5b87d12eb9810b0eed3248b48abef7094fd8d02d9c41","test/test_poll_channel.rs":"508815e265ae44328fb3d7c98cdf210815a9946bde291dd896de81df0394de37","test/test_timer.rs":"d04b6f57e9a395ce190022c0158cc498805758101e9fdad18b63829eb9bb6510"},"package":"52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"} \ No newline at end of file
diff --git a/third_party/rust/mio-extras/CHANGELOG.md b/third_party/rust/mio-extras/CHANGELOG.md
new file mode 100644
index 0000000000..354b87584c
--- /dev/null
+++ b/third_party/rust/mio-extras/CHANGELOG.md
@@ -0,0 +1,37 @@
+## 2.0.6 (7 Dec 2019)
+
+- fix license metadata in `Cargo.toml` (thanks @ignatenkobrain)
+
+## 2.0.5 (18 Jun 2018)
+
+- update `lazycell` from 0.6 -> 1.0
+
+## 2.0.4 (7 Apr 2018)
+
+- Bump mio dependency (fixes minimal-versions build)
+
+## 2.0.3 (28 Dec 2017)
+
+- update `log` from 0.3 -> 0.4
+
+## 2.0.2
+
+- More docs tidying.
+
+## 2.0.1
+
+- Another try at documenting the timer interface.
+
+## 2.0.0
+
+- Remove channel implementation details from the API. Specifically, the
+ following are no longer public:
+ - `ctl_pair()`
+ - `SenderCtl`
+ - `ReceiverCtl`
+- Document all APIs
+
+## 1.0.0
+
+- Initial release. Essentially identical to
+ [mio-more](https://github.com/carllerche/mio-more).
diff --git a/third_party/rust/mio-extras/Cargo.toml b/third_party/rust/mio-extras/Cargo.toml
new file mode 100644
index 0000000000..d902a46019
--- /dev/null
+++ b/third_party/rust/mio-extras/Cargo.toml
@@ -0,0 +1,40 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "mio-extras"
+version = "2.0.6"
+authors = ["Carl Lerche <me@carllerche.com>", "David Hotham"]
+exclude = [".gitignore"]
+description = "Extra components for use with Mio"
+documentation = "https://docs.rs/mio-extras"
+readme = "README.md"
+keywords = ["io", "async", "non-blocking"]
+categories = ["asynchronous"]
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/dimbleby/mio-extras"
+
+[[test]]
+name = "test"
+path = "test/mod.rs"
+[dependencies.lazycell]
+version = "1"
+
+[dependencies.log]
+version = "0.4"
+
+[dependencies.mio]
+version = "0.6.14"
+
+[dependencies.slab]
+version = "0.4"
diff --git a/third_party/rust/mio-extras/LICENSE-APACHE b/third_party/rust/mio-extras/LICENSE-APACHE
new file mode 100644
index 0000000000..a6e8ded657
--- /dev/null
+++ b/third_party/rust/mio-extras/LICENSE-APACHE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright 2017 Mio authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/third_party/rust/mio-extras/LICENSE-MIT b/third_party/rust/mio-extras/LICENSE-MIT
new file mode 100644
index 0000000000..4cf193e73e
--- /dev/null
+++ b/third_party/rust/mio-extras/LICENSE-MIT
@@ -0,0 +1,25 @@
+Copyright (c) 2017 Mio authors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/third_party/rust/mio-extras/README.md b/third_party/rust/mio-extras/README.md
new file mode 100644
index 0000000000..8ed136a079
--- /dev/null
+++ b/third_party/rust/mio-extras/README.md
@@ -0,0 +1,30 @@
+# mio-extras
+
+Extra components for use with [Mio](https://github.com/tokio-rs/mio):
+
+- a channel that implements `Evented`
+- a timer that implements `Evented`
+
+[![Build Status](https://travis-ci.org/dimbleby/mio-extras.svg?branch=master)](https://travis-ci.org/dimbleby/mio-extras)
+[![crates.io](http://meritbadge.herokuapp.com/mio-extras)](https://crates.io/crates/mio-extras)
+
+[Documentation](https://docs.rs/mio-extras).
+
+## History and maintenance
+
+This repository is forked from
+[`mio-more`](https://github.com/carllerche/mio-more), which is unmaintained.
+
+I don't intend to do very much with this except for routine maintenance - bug
+fixes, updating dependencies, and suchlike.
+
+However if you have some code that you think belongs here, then by all means
+raise an issue or open a pull request.
+
+# License
+
+`mio-extras` is primarily distributed under the terms of both the MIT license
+and the Apache License (Version 2.0), with portions covered by various BSD-like
+licenses.
+
+See LICENSE-APACHE, and LICENSE-MIT for details.
diff --git a/third_party/rust/mio-extras/src/channel.rs b/third_party/rust/mio-extras/src/channel.rs
new file mode 100644
index 0000000000..561317ecbf
--- /dev/null
+++ b/third_party/rust/mio-extras/src/channel.rs
@@ -0,0 +1,431 @@
+//! Thread safe communication channel implementing `Evented`
+use lazycell::{AtomicLazyCell, LazyCell};
+use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+use std::any::Any;
+use std::error;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{mpsc, Arc};
+use std::{fmt, io};
+
+/// Creates a new asynchronous channel, where the `Receiver` can be registered
+/// with `Poll`.
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let (tx_ctl, rx_ctl) = ctl_pair();
+ let (tx, rx) = mpsc::channel();
+
+ let tx = Sender { tx, ctl: tx_ctl };
+
+ let rx = Receiver { rx, ctl: rx_ctl };
+
+ (tx, rx)
+}
+
+/// Creates a new synchronous, bounded channel where the `Receiver` can be
+/// registered with `Poll`.
+pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
+ let (tx_ctl, rx_ctl) = ctl_pair();
+ let (tx, rx) = mpsc::sync_channel(bound);
+
+ let tx = SyncSender { tx, ctl: tx_ctl };
+
+ let rx = Receiver { rx, ctl: rx_ctl };
+
+ (tx, rx)
+}
+
+fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
+ let inner = Arc::new(Inner {
+ pending: AtomicUsize::new(0),
+ senders: AtomicUsize::new(1),
+ set_readiness: AtomicLazyCell::new(),
+ });
+
+ let tx = SenderCtl {
+ inner: Arc::clone(&inner),
+ };
+
+ let rx = ReceiverCtl {
+ registration: LazyCell::new(),
+ inner,
+ };
+
+ (tx, rx)
+}
+
+/// Tracks messages sent on a channel in order to update readiness.
+struct SenderCtl {
+ inner: Arc<Inner>,
+}
+
+/// Tracks messages received on a channel in order to track readiness.
+struct ReceiverCtl {
+ registration: LazyCell<Registration>,
+ inner: Arc<Inner>,
+}
+
+/// The sending half of a channel.
+pub struct Sender<T> {
+ tx: mpsc::Sender<T>,
+ ctl: SenderCtl,
+}
+
+/// The sending half of a synchronous channel.
+pub struct SyncSender<T> {
+ tx: mpsc::SyncSender<T>,
+ ctl: SenderCtl,
+}
+
+/// The receiving half of a channel.
+pub struct Receiver<T> {
+ rx: mpsc::Receiver<T>,
+ ctl: ReceiverCtl,
+}
+
+/// An error returned from the `Sender::send` or `SyncSender::send` function.
+pub enum SendError<T> {
+ /// An IO error.
+ Io(io::Error),
+
+ /// The receiving half of the channel has disconnected.
+ Disconnected(T),
+}
+
+/// An error returned from the `SyncSender::try_send` function.
+pub enum TrySendError<T> {
+ /// An IO error.
+ Io(io::Error),
+
+ /// Data could not be sent because it would require the callee to block.
+ Full(T),
+
+ /// The receiving half of the channel has disconnected.
+ Disconnected(T),
+}
+
+struct Inner {
+ // The number of outstanding messages for the receiver to read
+ pending: AtomicUsize,
+ // The number of sender handles
+ senders: AtomicUsize,
+ // The set readiness handle
+ set_readiness: AtomicLazyCell<SetReadiness>,
+}
+
+impl<T> Sender<T> {
+ /// Attempts to send a value on this channel, returning it back if it could not be sent.
+ pub fn send(&self, t: T) -> Result<(), SendError<T>> {
+ self.tx.send(t).map_err(SendError::from).and_then(|_| {
+ self.ctl.inc()?;
+ Ok(())
+ })
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Sender<T> {
+ Sender {
+ tx: self.tx.clone(),
+ ctl: self.ctl.clone(),
+ }
+ }
+}
+
+impl<T> SyncSender<T> {
+ /// Sends a value on this synchronous channel.
+ ///
+ /// This function will *block* until space in the internal buffer becomes
+ /// available or a receiver is available to hand off the message to.
+ pub fn send(&self, t: T) -> Result<(), SendError<T>> {
+ self.tx.send(t).map_err(From::from).and_then(|_| {
+ self.ctl.inc()?;
+ Ok(())
+ })
+ }
+
+ /// Attempts to send a value on this channel without blocking.
+ ///
+ /// This method differs from `send` by returning immediately if the channel's
+ /// buffer is full or no receiver is waiting to acquire some data.
+ pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
+ self.tx.try_send(t).map_err(From::from).and_then(|_| {
+ self.ctl.inc()?;
+ Ok(())
+ })
+ }
+}
+
+impl<T> Clone for SyncSender<T> {
+ fn clone(&self) -> SyncSender<T> {
+ SyncSender {
+ tx: self.tx.clone(),
+ ctl: self.ctl.clone(),
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Attempts to return a pending value on this receiver without blocking.
+ pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
+ self.rx.try_recv().and_then(|res| {
+ let _ = self.ctl.dec();
+ Ok(res)
+ })
+ }
+}
+
+impl<T> Evented for Receiver<T> {
+ fn register(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ self.ctl.register(poll, token, interest, opts)
+ }
+
+ fn reregister(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ self.ctl.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.ctl.deregister(poll)
+ }
+}
+
+/*
+ *
+ * ===== SenderCtl / ReceiverCtl =====
+ *
+ */
+
+impl SenderCtl {
+ /// Call to track that a message has been sent
+ fn inc(&self) -> io::Result<()> {
+ let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
+
+ if 0 == cnt {
+ // Toggle readiness to readable
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ set_readiness.set_readiness(Ready::readable())?;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Clone for SenderCtl {
+ fn clone(&self) -> SenderCtl {
+ self.inner.senders.fetch_add(1, Ordering::Relaxed);
+ SenderCtl {
+ inner: Arc::clone(&self.inner),
+ }
+ }
+}
+
+impl Drop for SenderCtl {
+ fn drop(&mut self) {
+ if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
+ let _ = self.inc();
+ }
+ }
+}
+
+impl ReceiverCtl {
+ fn dec(&self) -> io::Result<()> {
+ let first = self.inner.pending.load(Ordering::Acquire);
+
+ if first == 1 {
+ // Unset readiness
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ set_readiness.set_readiness(Ready::empty())?;
+ }
+ }
+
+ // Decrement
+ let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
+
+ if first == 1 && second > 1 {
+ // There are still pending messages. Since readiness was
+ // previously unset, it must be reset here
+ if let Some(set_readiness) = self.inner.set_readiness.borrow() {
+ set_readiness.set_readiness(Ready::readable())?;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Evented for ReceiverCtl {
+ fn register(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ if self.registration.borrow().is_some() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver already registered",
+ ));
+ }
+
+ let (registration, set_readiness) = Registration::new2();
+ poll.register(&registration, token, interest, opts)?;
+
+ if self.inner.pending.load(Ordering::Relaxed) > 0 {
+ // TODO: Don't drop readiness
+ let _ = set_readiness.set_readiness(Ready::readable());
+ }
+
+ self.registration
+ .fill(registration)
+ .expect("unexpected state encountered");
+ self.inner
+ .set_readiness
+ .fill(set_readiness)
+ .expect("unexpected state encountered");
+
+ Ok(())
+ }
+
+ fn reregister(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ match self.registration.borrow() {
+ Some(registration) => poll.reregister(registration, token, interest, opts),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered",
+ )),
+ }
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ match self.registration.borrow() {
+ Some(registration) => poll.deregister(registration),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered",
+ )),
+ }
+ }
+}
+
+/*
+ *
+ * ===== Error conversions =====
+ *
+ */
+
+impl<T> From<mpsc::SendError<T>> for SendError<T> {
+ fn from(src: mpsc::SendError<T>) -> SendError<T> {
+ SendError::Disconnected(src.0)
+ }
+}
+
+impl<T> From<io::Error> for SendError<T> {
+ fn from(src: io::Error) -> SendError<T> {
+ SendError::Io(src)
+ }
+}
+
+impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
+ fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
+ match src {
+ mpsc::TrySendError::Full(v) => TrySendError::Full(v),
+ mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
+ }
+ }
+}
+
+impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
+ fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
+ TrySendError::Disconnected(src.0)
+ }
+}
+
+impl<T> From<io::Error> for TrySendError<T> {
+ fn from(src: io::Error) -> TrySendError<T> {
+ TrySendError::Io(src)
+ }
+}
+
+/*
+ *
+ * ===== Implement Error, Debug and Display for Errors =====
+ *
+ */
+
+impl<T: Any> error::Error for SendError<T> {
+ fn description(&self) -> &str {
+ match *self {
+ SendError::Io(ref io_err) => io_err.description(),
+ SendError::Disconnected(..) => "Disconnected",
+ }
+ }
+}
+
+impl<T: Any> error::Error for TrySendError<T> {
+ fn description(&self) -> &str {
+ match *self {
+ TrySendError::Io(ref io_err) => io_err.description(),
+ TrySendError::Full(..) => "Full",
+ TrySendError::Disconnected(..) => "Disconnected",
+ }
+ }
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_try_send_error(self, f)
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ format_try_send_error(self, f)
+ }
+}
+
+#[inline]
+fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+ match *e {
+ SendError::Io(ref io_err) => write!(f, "{}", io_err),
+ SendError::Disconnected(..) => write!(f, "Disconnected"),
+ }
+}
+
+#[inline]
+fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
+ match *e {
+ TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
+ TrySendError::Full(..) => write!(f, "Full"),
+ TrySendError::Disconnected(..) => write!(f, "Disconnected"),
+ }
+}
diff --git a/third_party/rust/mio-extras/src/lib.rs b/third_party/rust/mio-extras/src/lib.rs
new file mode 100644
index 0000000000..69a000556c
--- /dev/null
+++ b/third_party/rust/mio-extras/src/lib.rs
@@ -0,0 +1,33 @@
+//! Extra components for use with Mio.
+#![deny(missing_docs)]
+extern crate lazycell;
+extern crate mio;
+extern crate slab;
+
+#[macro_use]
+extern crate log;
+
+pub mod channel;
+pub mod timer;
+
+// Conversion utilities
+mod convert {
+ use std::time::Duration;
+
+ const NANOS_PER_MILLI: u32 = 1_000_000;
+ const MILLIS_PER_SEC: u64 = 1_000;
+
+ /// Convert a `Duration` to milliseconds, rounding up and saturating at
+ /// `u64::MAX`.
+ ///
+ /// The saturating is fine because `u64::MAX` milliseconds are still many
+ /// million years.
+ pub fn millis(duration: Duration) -> u64 {
+ // Round up.
+ let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
+ duration
+ .as_secs()
+ .saturating_mul(MILLIS_PER_SEC)
+ .saturating_add(u64::from(millis))
+ }
+}
diff --git a/third_party/rust/mio-extras/src/timer.rs b/third_party/rust/mio-extras/src/timer.rs
new file mode 100644
index 0000000000..876026c99c
--- /dev/null
+++ b/third_party/rust/mio-extras/src/timer.rs
@@ -0,0 +1,751 @@
+//! Timer optimized for I/O related operations
+use crate::convert;
+use lazycell::LazyCell;
+use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
+use slab::Slab;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use std::{cmp, fmt, io, iter, thread, u64, usize};
+
+/// A timer.
+///
+/// Typical usage goes like this:
+///
+/// * register the timer with a `mio::Poll`.
+/// * set a timeout, by calling `Timer::set_timeout`. Here you provide some
+/// state to be associated with this timeout.
+/// * poll the `Poll`, to learn when a timeout has occurred.
+/// * retrieve state associated with the timeout by calling `Timer::poll`.
+///
+/// You can omit use of the `Poll` altogether, if you like, and just poll the
+/// `Timer` directly.
+pub struct Timer<T> {
+ // Size of each tick in milliseconds
+ tick_ms: u64,
+ // Slab of timeout entries
+ entries: Slab<Entry<T>>,
+ // Timeout wheel. Each tick, the timer will look at the next slot for
+ // timeouts that match the current tick.
+ wheel: Vec<WheelEntry>,
+ // Tick 0's time instant
+ start: Instant,
+ // The current tick
+ tick: Tick,
+ // The next entry to possibly timeout
+ next: Token,
+ // Masks the target tick to get the slot
+ mask: u64,
+ // Set on registration with Poll
+ inner: LazyCell<Inner>,
+}
+
+/// Used to create a `Timer`.
+pub struct Builder {
+ // Approximate duration of each tick
+ tick: Duration,
+ // Number of slots in the timer wheel
+ num_slots: usize,
+ // Max number of timeouts that can be in flight at a given time.
+ capacity: usize,
+}
+
+/// A timeout, as returned by `Timer::set_timeout`.
+///
+/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout.
+#[derive(Clone, Debug)]
+pub struct Timeout {
+ // Reference into the timer entry slab
+ token: Token,
+ // Tick that it should match up with
+ tick: u64,
+}
+
+struct Inner {
+ registration: Registration,
+ set_readiness: SetReadiness,
+ wakeup_state: WakeupState,
+ wakeup_thread: thread::JoinHandle<()>,
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ // 1. Set wakeup state to TERMINATE_THREAD
+ self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release);
+ // 2. Wake him up
+ self.wakeup_thread.thread().unpark();
+ }
+}
+
+#[derive(Copy, Clone, Debug)]
+struct WheelEntry {
+ next_tick: Tick,
+ head: Token,
+}
+
+// Doubly linked list of timer entries. Allows for efficient insertion /
+// removal of timeouts.
+struct Entry<T> {
+ state: T,
+ links: EntryLinks,
+}
+
+#[derive(Copy, Clone)]
+struct EntryLinks {
+ tick: Tick,
+ prev: Token,
+ next: Token,
+}
+
+type Tick = u64;
+
+const TICK_MAX: Tick = u64::MAX;
+
+// Manages communication with wakeup thread
+type WakeupState = Arc<AtomicUsize>;
+
+const TERMINATE_THREAD: usize = 0;
+const EMPTY: Token = Token(usize::MAX);
+
+impl Builder {
+ /// Set the tick duration. Default is 100ms.
+ pub fn tick_duration(mut self, duration: Duration) -> Builder {
+ self.tick = duration;
+ self
+ }
+
+ /// Set the number of slots. Default is 256.
+ pub fn num_slots(mut self, num_slots: usize) -> Builder {
+ self.num_slots = num_slots;
+ self
+ }
+
+ /// Set the capacity. Default is 65536.
+ pub fn capacity(mut self, capacity: usize) -> Builder {
+ self.capacity = capacity;
+ self
+ }
+
+ /// Build a `Timer` with the parameters set on this `Builder`.
+ pub fn build<T>(self) -> Timer<T> {
+ Timer::new(
+ convert::millis(self.tick),
+ self.num_slots,
+ self.capacity,
+ Instant::now(),
+ )
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Builder {
+ Builder {
+ tick: Duration::from_millis(100),
+ num_slots: 1 << 8,
+ capacity: 1 << 16,
+ }
+ }
+}
+
+impl<T> Timer<T> {
+ fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> {
+ let num_slots = num_slots.next_power_of_two();
+ let capacity = capacity.next_power_of_two();
+ let mask = (num_slots as u64) - 1;
+ let wheel = iter::repeat(WheelEntry {
+ next_tick: TICK_MAX,
+ head: EMPTY,
+ })
+ .take(num_slots)
+ .collect();
+
+ Timer {
+ tick_ms,
+ entries: Slab::with_capacity(capacity),
+ wheel,
+ start,
+ tick: 0,
+ next: EMPTY,
+ mask,
+ inner: LazyCell::new(),
+ }
+ }
+
+ /// Set a timeout.
+ ///
+ /// When the timeout occurs, the given state becomes available via `poll`.
+ pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout {
+ let delay_from_start = self.start.elapsed() + delay_from_now;
+ self.set_timeout_at(delay_from_start, state)
+ }
+
+ fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout {
+ let mut tick = duration_to_tick(delay_from_start, self.tick_ms);
+ trace!(
+ "setting timeout; delay={:?}; tick={:?}; current-tick={:?}",
+ delay_from_start,
+ tick,
+ self.tick
+ );
+
+ // Always target at least 1 tick in the future
+ if tick <= self.tick {
+ tick = self.tick + 1;
+ }
+
+ self.insert(tick, state)
+ }
+
+ fn insert(&mut self, tick: Tick, state: T) -> Timeout {
+ // Get the slot for the requested tick
+ let slot = (tick & self.mask) as usize;
+ let curr = self.wheel[slot];
+
+ // Insert the new entry
+ let entry = Entry::new(state, tick, curr.head);
+ let token = Token(self.entries.insert(entry));
+
+ if curr.head != EMPTY {
+ // If there was a previous entry, set its prev pointer to the new
+ // entry
+ self.entries[curr.head.into()].links.prev = token;
+ }
+
+ // Update the head slot
+ self.wheel[slot] = WheelEntry {
+ next_tick: cmp::min(tick, curr.next_tick),
+ head: token,
+ };
+
+ self.schedule_readiness(tick);
+
+ trace!("inserted timout; slot={}; token={:?}", slot, token);
+
+ // Return the new timeout
+ Timeout { token, tick }
+ }
+
+ /// Cancel a timeout.
+ ///
+ /// If the timeout has not yet occurred, the return value holds the
+ /// associated state.
+ pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> {
+ let links = match self.entries.get(timeout.token.into()) {
+ Some(e) => e.links,
+ None => return None,
+ };
+
+ // Sanity check
+ if links.tick != timeout.tick {
+ return None;
+ }
+
+ self.unlink(&links, timeout.token);
+ Some(self.entries.remove(timeout.token.into()).state)
+ }
+
+ /// Poll for an expired timer.
+ ///
+ /// The return value holds the state associated with the first expired
+ /// timer, if any.
+ pub fn poll(&mut self) -> Option<T> {
+ let target_tick = current_tick(self.start, self.tick_ms);
+ self.poll_to(target_tick)
+ }
+
+ fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> {
+ trace!(
+ "tick_to; target_tick={}; current_tick={}",
+ target_tick,
+ self.tick
+ );
+
+ if target_tick < self.tick {
+ target_tick = self.tick;
+ }
+
+ while self.tick <= target_tick {
+ let curr = self.next;
+
+ trace!("ticking; curr={:?}", curr);
+
+ if curr == EMPTY {
+ self.tick += 1;
+
+ let slot = self.slot_for(self.tick);
+ self.next = self.wheel[slot].head;
+
+ // Handle the case when a slot has a single timeout which gets
+ // canceled before the timeout expires. In this case, the
+ // slot's head is EMPTY but there is a value for next_tick. Not
+ // resetting next_tick here causes the timer to get stuck in a
+ // loop.
+ if self.next == EMPTY {
+ self.wheel[slot].next_tick = TICK_MAX;
+ }
+ } else {
+ let slot = self.slot_for(self.tick);
+
+ if curr == self.wheel[slot].head {
+ self.wheel[slot].next_tick = TICK_MAX;
+ }
+
+ let links = self.entries[curr.into()].links;
+
+ if links.tick <= self.tick {
+ trace!("triggering; token={:?}", curr);
+
+ // Unlink will also advance self.next
+ self.unlink(&links, curr);
+
+ // Remove and return the token
+ return Some(self.entries.remove(curr.into()).state);
+ } else {
+ let next_tick = self.wheel[slot].next_tick;
+ self.wheel[slot].next_tick = cmp::min(next_tick, links.tick);
+ self.next = links.next;
+ }
+ }
+ }
+
+ // No more timeouts to poll
+ if let Some(inner) = self.inner.borrow() {
+ trace!("unsetting readiness");
+ let _ = inner.set_readiness.set_readiness(Ready::empty());
+
+ if let Some(tick) = self.next_tick() {
+ self.schedule_readiness(tick);
+ }
+ }
+
+ None
+ }
+
+ fn unlink(&mut self, links: &EntryLinks, token: Token) {
+ trace!(
+ "unlinking timeout; slot={}; token={:?}",
+ self.slot_for(links.tick),
+ token
+ );
+
+ if links.prev == EMPTY {
+ let slot = self.slot_for(links.tick);
+ self.wheel[slot].head = links.next;
+ } else {
+ self.entries[links.prev.into()].links.next = links.next;
+ }
+
+ if links.next != EMPTY {
+ self.entries[links.next.into()].links.prev = links.prev;
+
+ if token == self.next {
+ self.next = links.next;
+ }
+ } else if token == self.next {
+ self.next = EMPTY;
+ }
+ }
+
+ fn schedule_readiness(&self, tick: Tick) {
+ if let Some(inner) = self.inner.borrow() {
+ // Coordinate setting readiness w/ the wakeup thread
+ let mut curr = inner.wakeup_state.load(Ordering::Acquire);
+
+ loop {
+ if curr as Tick <= tick {
+ // Nothing to do, wakeup is already scheduled
+ return;
+ }
+
+ // Attempt to move the wakeup time forward
+ trace!("advancing the wakeup time; target={}; curr={}", tick, curr);
+ let actual =
+ inner
+ .wakeup_state
+ .compare_and_swap(curr, tick as usize, Ordering::Release);
+
+ if actual == curr {
+ // Signal to the wakeup thread that the wakeup time has
+ // been changed.
+ trace!("unparking wakeup thread");
+ inner.wakeup_thread.thread().unpark();
+ return;
+ }
+
+ curr = actual;
+ }
+ }
+ }
+
+ // Next tick containing a timeout
+ fn next_tick(&self) -> Option<Tick> {
+ if self.next != EMPTY {
+ let slot = self.slot_for(self.entries[self.next.into()].links.tick);
+
+ if self.wheel[slot].next_tick == self.tick {
+ // There is data ready right now
+ return Some(self.tick);
+ }
+ }
+
+ self.wheel.iter().map(|e| e.next_tick).min()
+ }
+
+ fn slot_for(&self, tick: Tick) -> usize {
+ (self.mask & tick) as usize
+ }
+}
+
+impl<T> Default for Timer<T> {
+ fn default() -> Timer<T> {
+ Builder::default().build()
+ }
+}
+
+impl<T> Evented for Timer<T> {
+ fn register(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ if self.inner.borrow().is_some() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "timer already registered",
+ ));
+ }
+
+ let (registration, set_readiness) = Registration::new2();
+ poll.register(&registration, token, interest, opts)?;
+ let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX));
+ let thread_handle = spawn_wakeup_thread(
+ Arc::clone(&wakeup_state),
+ set_readiness.clone(),
+ self.start,
+ self.tick_ms,
+ );
+
+ self.inner
+ .fill(Inner {
+ registration,
+ set_readiness,
+ wakeup_state,
+ wakeup_thread: thread_handle,
+ })
+ .expect("timer already registered");
+
+ if let Some(next_tick) = self.next_tick() {
+ self.schedule_readiness(next_tick);
+ }
+
+ Ok(())
+ }
+
+ fn reregister(
+ &self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt,
+ ) -> io::Result<()> {
+ match self.inner.borrow() {
+ Some(inner) => poll.reregister(&inner.registration, token, interest, opts),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered",
+ )),
+ }
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ match self.inner.borrow() {
+ Some(inner) => poll.deregister(&inner.registration),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "receiver not registered",
+ )),
+ }
+ }
+}
+
+impl fmt::Debug for Inner {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Inner")
+ .field("registration", &self.registration)
+ .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed))
+ .finish()
+ }
+}
+
+fn spawn_wakeup_thread(
+ state: WakeupState,
+ set_readiness: SetReadiness,
+ start: Instant,
+ tick_ms: u64,
+) -> thread::JoinHandle<()> {
+ thread::spawn(move || {
+ let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick;
+
+ loop {
+ if sleep_until_tick == TERMINATE_THREAD as Tick {
+ return;
+ }
+
+ let now_tick = current_tick(start, tick_ms);
+
+ trace!(
+ "wakeup thread: sleep_until_tick={:?}; now_tick={:?}",
+ sleep_until_tick,
+ now_tick
+ );
+
+ if now_tick < sleep_until_tick {
+ // Calling park_timeout with u64::MAX leads to undefined
+ // behavior in pthread, causing the park to return immediately
+ // and causing the thread to tightly spin. Instead of u64::MAX
+ // on large values, simply use a blocking park.
+ match tick_ms.checked_mul(sleep_until_tick - now_tick) {
+ Some(sleep_duration) => {
+ trace!(
+ "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}",
+ tick_ms,
+ now_tick,
+ sleep_until_tick,
+ sleep_duration
+ );
+ thread::park_timeout(Duration::from_millis(sleep_duration));
+ }
+ None => {
+ trace!(
+ "sleeping; tick_ms={}; now_tick={}; blocking sleep",
+ tick_ms,
+ now_tick
+ );
+ thread::park();
+ }
+ }
+ sleep_until_tick = state.load(Ordering::Acquire) as Tick;
+ } else {
+ let actual =
+ state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel)
+ as Tick;
+
+ if actual == sleep_until_tick {
+ trace!("setting readiness from wakeup thread");
+ let _ = set_readiness.set_readiness(Ready::readable());
+ sleep_until_tick = usize::MAX as Tick;
+ } else {
+ sleep_until_tick = actual as Tick;
+ }
+ }
+ }
+ })
+}
+
+fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick {
+ // Calculate tick rounding up to the closest one
+ let elapsed_ms = convert::millis(elapsed);
+ elapsed_ms.saturating_add(tick_ms / 2) / tick_ms
+}
+
+fn current_tick(start: Instant, tick_ms: u64) -> Tick {
+ duration_to_tick(start.elapsed(), tick_ms)
+}
+
+impl<T> Entry<T> {
+ fn new(state: T, tick: u64, next: Token) -> Entry<T> {
+ Entry {
+ state,
+ links: EntryLinks {
+ tick,
+ prev: EMPTY,
+ next,
+ },
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::time::{Duration, Instant};
+
+ #[test]
+ pub fn test_timeout_next_tick() {
+ let mut t = timer();
+ let mut tick;
+
+ t.set_timeout_at(Duration::from_millis(100), "a");
+
+ tick = ms_to_tick(&t, 50);
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 100);
+ assert_eq!(Some("a"), t.poll_to(tick));
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 150);
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 200);
+ assert_eq!(None, t.poll_to(tick));
+
+ assert_eq!(count(&t), 0);
+ }
+
+ #[test]
+ pub fn test_clearing_timeout() {
+ let mut t = timer();
+ let mut tick;
+
+ let to = t.set_timeout_at(Duration::from_millis(100), "a");
+ assert_eq!("a", t.cancel_timeout(&to).unwrap());
+
+ tick = ms_to_tick(&t, 100);
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 200);
+ assert_eq!(None, t.poll_to(tick));
+
+ assert_eq!(count(&t), 0);
+ }
+
+ #[test]
+ pub fn test_multiple_timeouts_same_tick() {
+ let mut t = timer();
+ let mut tick;
+
+ t.set_timeout_at(Duration::from_millis(100), "a");
+ t.set_timeout_at(Duration::from_millis(100), "b");
+
+ let mut rcv = vec![];
+
+ tick = ms_to_tick(&t, 100);
+ rcv.push(t.poll_to(tick).unwrap());
+ rcv.push(t.poll_to(tick).unwrap());
+
+ assert_eq!(None, t.poll_to(tick));
+
+ rcv.sort();
+ assert!(rcv == ["a", "b"], "actual={:?}", rcv);
+
+ tick = ms_to_tick(&t, 200);
+ assert_eq!(None, t.poll_to(tick));
+
+ assert_eq!(count(&t), 0);
+ }
+
+ #[test]
+ pub fn test_multiple_timeouts_diff_tick() {
+ let mut t = timer();
+ let mut tick;
+
+ t.set_timeout_at(Duration::from_millis(110), "a");
+ t.set_timeout_at(Duration::from_millis(220), "b");
+ t.set_timeout_at(Duration::from_millis(230), "c");
+ t.set_timeout_at(Duration::from_millis(440), "d");
+ t.set_timeout_at(Duration::from_millis(560), "e");
+
+ tick = ms_to_tick(&t, 100);
+ assert_eq!(Some("a"), t.poll_to(tick));
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 200);
+ assert_eq!(Some("c"), t.poll_to(tick));
+ assert_eq!(Some("b"), t.poll_to(tick));
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 300);
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 400);
+ assert_eq!(Some("d"), t.poll_to(tick));
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 500);
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 600);
+ assert_eq!(Some("e"), t.poll_to(tick));
+ assert_eq!(None, t.poll_to(tick));
+ }
+
+ #[test]
+ pub fn test_catching_up() {
+ let mut t = timer();
+
+ t.set_timeout_at(Duration::from_millis(110), "a");
+ t.set_timeout_at(Duration::from_millis(220), "b");
+ t.set_timeout_at(Duration::from_millis(230), "c");
+ t.set_timeout_at(Duration::from_millis(440), "d");
+
+ let tick = ms_to_tick(&t, 600);
+ assert_eq!(Some("a"), t.poll_to(tick));
+ assert_eq!(Some("c"), t.poll_to(tick));
+ assert_eq!(Some("b"), t.poll_to(tick));
+ assert_eq!(Some("d"), t.poll_to(tick));
+ assert_eq!(None, t.poll_to(tick));
+ }
+
+ #[test]
+ pub fn test_timeout_hash_collision() {
+ let mut t = timer();
+ let mut tick;
+
+ t.set_timeout_at(Duration::from_millis(100), "a");
+ t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b");
+
+ tick = ms_to_tick(&t, 100);
+ assert_eq!(Some("a"), t.poll_to(tick));
+ assert_eq!(1, count(&t));
+
+ tick = ms_to_tick(&t, 200);
+ assert_eq!(None, t.poll_to(tick));
+ assert_eq!(1, count(&t));
+
+ tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64);
+ assert_eq!(Some("b"), t.poll_to(tick));
+ assert_eq!(0, count(&t));
+ }
+
+ #[test]
+ pub fn test_clearing_timeout_between_triggers() {
+ let mut t = timer();
+ let mut tick;
+
+ let a = t.set_timeout_at(Duration::from_millis(100), "a");
+ let _ = t.set_timeout_at(Duration::from_millis(100), "b");
+ let _ = t.set_timeout_at(Duration::from_millis(200), "c");
+
+ tick = ms_to_tick(&t, 100);
+ assert_eq!(Some("b"), t.poll_to(tick));
+ assert_eq!(2, count(&t));
+
+ t.cancel_timeout(&a);
+ assert_eq!(1, count(&t));
+
+ assert_eq!(None, t.poll_to(tick));
+
+ tick = ms_to_tick(&t, 200);
+ assert_eq!(Some("c"), t.poll_to(tick));
+ assert_eq!(0, count(&t));
+ }
+
+ const TICK: u64 = 100;
+ const SLOTS: usize = 16;
+ const CAPACITY: usize = 32;
+
+ fn count<T>(timer: &Timer<T>) -> usize {
+ timer.entries.len()
+ }
+
+ fn timer() -> Timer<&'static str> {
+ Timer::new(TICK, SLOTS, CAPACITY, Instant::now())
+ }
+
+ fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 {
+ ms / timer.tick_ms
+ }
+}
diff --git a/third_party/rust/mio-extras/test/mod.rs b/third_party/rust/mio-extras/test/mod.rs
new file mode 100644
index 0000000000..217069466a
--- /dev/null
+++ b/third_party/rust/mio-extras/test/mod.rs
@@ -0,0 +1,45 @@
+extern crate mio;
+extern crate mio_extras;
+
+use mio::event::Event;
+use mio::{Events, Poll};
+use std::time::Duration;
+
+mod test_poll_channel;
+mod test_timer;
+
+pub fn expect_events(
+ poll: &Poll,
+ event_buffer: &mut Events,
+ poll_try_count: usize,
+ mut expected: Vec<Event>,
+) {
+ const MS: u64 = 1_000;
+
+ for _ in 0..poll_try_count {
+ poll.poll(event_buffer, Some(Duration::from_millis(MS)))
+ .unwrap();
+ for event in event_buffer.iter() {
+ let pos_opt = match expected.iter().position(|exp_event| {
+ (event.token() == exp_event.token())
+ && event.readiness().contains(exp_event.readiness())
+ }) {
+ Some(x) => Some(x),
+ None => None,
+ };
+ if let Some(pos) = pos_opt {
+ expected.remove(pos);
+ }
+ }
+
+ if expected.is_empty() {
+ break;
+ }
+ }
+
+ assert!(
+ expected.is_empty(),
+ "The following expected events were not found: {:?}",
+ expected
+ );
+}
diff --git a/third_party/rust/mio-extras/test/test_poll_channel.rs b/third_party/rust/mio-extras/test/test_poll_channel.rs
new file mode 100644
index 0000000000..7314f26661
--- /dev/null
+++ b/third_party/rust/mio-extras/test/test_poll_channel.rs
@@ -0,0 +1,362 @@
+use crate::expect_events;
+use mio::event::Event;
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio_extras::channel;
+use std::sync::mpsc::TryRecvError;
+use std::thread;
+use std::time::Duration;
+
+#[test]
+pub fn test_poll_channel_edge() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Polling will contain the event
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(1, num);
+
+ let event = events.iter().next().unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ // Poll again and there should be no events
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Read the value
+ assert_eq!("hello", rx.try_recv().unwrap());
+
+ // Poll again, nothing
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Push a value
+ tx.send("goodbye").unwrap();
+
+ // Have an event
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(1, num);
+
+ let event = events.iter().next().unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ // Read the value
+ rx.try_recv().unwrap();
+
+ // Drop the sender half
+ drop(tx);
+
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(1, num);
+
+ let event = events.iter().next().unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ match rx.try_recv() {
+ Err(TryRecvError::Disconnected) => {}
+ no => panic!("unexpected value {:?}", no),
+ }
+}
+
+#[test]
+pub fn test_poll_channel_oneshot() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(
+ &rx,
+ Token(123),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot(),
+ )
+ .unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Polling will contain the event
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(1, num);
+
+ let event = events.iter().next().unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+
+ // Poll again and there should be no events
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Read the value
+ assert_eq!("hello", rx.try_recv().unwrap());
+
+ // Poll again, nothing
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Push a value
+ tx.send("goodbye").unwrap();
+
+ // Poll again, nothing
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Reregistering will re-trigger the notification
+ for _ in 0..3 {
+ poll.reregister(
+ &rx,
+ Token(123),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot(),
+ )
+ .unwrap();
+
+ // Have an event
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(1, num);
+
+ let event = events.iter().next().unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+ }
+
+ // Get the value
+ assert_eq!("goodbye", rx.try_recv().unwrap());
+
+ poll.reregister(
+ &rx,
+ Token(123),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot(),
+ )
+ .unwrap();
+
+ // Have an event
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ poll.reregister(
+ &rx,
+ Token(123),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot(),
+ )
+ .unwrap();
+
+ // Have an event
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_poll_channel_level() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::level())
+ .unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Polling will contain the event
+ for i in 0..5 {
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert!(1 == num, "actually got {} on iteration {}", num, i);
+
+ let event = events.iter().next().unwrap();
+ assert_eq!(event.token(), Token(123));
+ assert_eq!(event.readiness(), Ready::readable());
+ }
+
+ // Read the value
+ assert_eq!("hello", rx.try_recv().unwrap());
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_poll_channel_writable() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge())
+ .unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_dropping_receive_before_poll() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ // Push the value
+ tx.send("hello").unwrap();
+
+ // Drop the receive end
+ drop(rx);
+
+ // Wait, but nothing should happen
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(0, num);
+}
+
+#[test]
+pub fn test_mixing_channel_with_socket() {
+ use mio::net::{TcpListener, TcpStream};
+
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let (tx, rx) = channel::channel();
+
+ // Create the listener
+ let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+
+ // Register the listener with `Poll`
+ poll.register(&l, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+ poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ // Push a value onto the channel
+ tx.send("hello").unwrap();
+
+ // Connect a TCP socket
+ let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
+
+ // Register the socket
+ poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ // Sleep a bit to ensure it arrives at dest
+ thread::sleep(Duration::from_millis(250));
+
+ expect_events(
+ &poll,
+ &mut events,
+ 2,
+ vec![
+ Event::new(Ready::empty(), Token(0)),
+ Event::new(Ready::empty(), Token(1)),
+ ],
+ );
+}
+
+#[test]
+pub fn test_sending_from_other_thread_while_polling() {
+ const ITERATIONS: usize = 20;
+ const THREADS: usize = 5;
+
+ // Make sure to run multiple times
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+
+ for _ in 0..ITERATIONS {
+ let (tx, rx) = channel::channel();
+ poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ for _ in 0..THREADS {
+ let tx = tx.clone();
+
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(50));
+ tx.send("ping").unwrap();
+ });
+ }
+
+ let mut recv = 0;
+
+ while recv < THREADS {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ if num != 0 {
+ assert_eq!(1, num);
+ assert_eq!(events.iter().next().unwrap().token(), Token(0));
+
+ while let Ok(_) = rx.try_recv() {
+ recv += 1;
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio-extras/test/test_timer.rs b/third_party/rust/mio-extras/test/test_timer.rs
new file mode 100644
index 0000000000..ac49833523
--- /dev/null
+++ b/third_party/rust/mio-extras/test/test_timer.rs
@@ -0,0 +1,308 @@
+use mio::{Events, Poll, PollOpt, Ready, Token};
+use mio_extras::timer::{self, Timer};
+
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn test_basic_timer_without_poll() {
+ let mut timer = Timer::default();
+
+ // Set the timeout
+ timer.set_timeout(Duration::from_millis(200), "hello");
+
+ // Nothing when polled immediately
+ assert!(timer.poll().is_none());
+
+ // Wait for the timeout
+ thread::sleep(Duration::from_millis(250));
+
+ assert_eq!(Some("hello"), timer.poll());
+ assert!(timer.poll().is_none());
+}
+
+#[test]
+fn test_basic_timer_with_poll_edge_set_timeout_after_register() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = Timer::default();
+
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+ timer.set_timeout(Duration::from_millis(200), "hello");
+
+ let elapsed = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(200, elapsed), "actual={:?}", elapsed);
+ assert_eq!("hello", timer.poll().unwrap());
+ assert_eq!(None, timer.poll());
+}
+
+#[test]
+fn test_basic_timer_with_poll_edge_set_timeout_before_register() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = Timer::default();
+
+ timer.set_timeout(Duration::from_millis(200), "hello");
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let elapsed = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(200, elapsed), "actual={:?}", elapsed);
+ assert_eq!("hello", timer.poll().unwrap());
+ assert_eq!(None, timer.poll());
+}
+
+#[test]
+fn test_setting_later_timeout_then_earlier_one() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = Timer::default();
+
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ timer.set_timeout(Duration::from_millis(600), "hello");
+ timer.set_timeout(Duration::from_millis(200), "world");
+
+ let elapsed = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(200, elapsed), "actual={:?}", elapsed);
+ assert_eq!("world", timer.poll().unwrap());
+ assert_eq!(None, timer.poll());
+
+ let elapsed = self::elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(400, elapsed), "actual={:?}", elapsed);
+ assert_eq!("hello", timer.poll().unwrap());
+ assert_eq!(None, timer.poll());
+}
+
+#[test]
+fn test_timer_with_looping_wheel() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = timer::Builder::default().num_slots(2).build();
+
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ const TOKENS: &[&str] = &["hello", "world", "some", "thing"];
+
+ for (i, msg) in TOKENS.iter().enumerate() {
+ timer.set_timeout(Duration::from_millis(500 * (i as u64 + 1)), msg);
+ }
+
+ for msg in TOKENS {
+ let elapsed = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(
+ is_about(500, elapsed),
+ "actual={:?}; msg={:?}",
+ elapsed,
+ msg
+ );
+ assert_eq!(Some(msg), timer.poll());
+ assert_eq!(None, timer.poll());
+ }
+}
+
+#[test]
+fn test_edge_without_polling() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = Timer::default();
+
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ timer.set_timeout(Duration::from_millis(400), "hello");
+
+ let ms = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(400, ms), "actual={:?}", ms);
+
+ let ms = elapsed(|| {
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(num, 0);
+ });
+
+ assert!(is_about(300, ms), "actual={:?}", ms);
+}
+
+#[test]
+fn test_level_triggered() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = Timer::default();
+
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::level())
+ .unwrap();
+
+ timer.set_timeout(Duration::from_millis(400), "hello");
+
+ let ms = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(400, ms), "actual={:?}", ms);
+
+ let ms = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+ assert_eq!(num, 1);
+ let event = events.iter().next().unwrap();
+ assert_eq!(Token(0), event.token());
+ assert_eq!(Ready::readable(), event.readiness());
+ });
+
+ assert!(is_about(0, ms), "actual={:?}", ms);
+}
+
+#[test]
+fn test_edge_oneshot_triggered() {
+ let poll = Poll::new().unwrap();
+ let mut events = Events::with_capacity(1024);
+ let mut timer = Timer::default();
+
+ poll.register(
+ &timer,
+ Token(0),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot(),
+ )
+ .unwrap();
+
+ timer.set_timeout(Duration::from_millis(200), "hello");
+
+ let ms = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+ assert_eq!(num, 1);
+ });
+
+ assert!(is_about(200, ms), "actual={:?}", ms);
+
+ let ms = elapsed(|| {
+ let num = poll
+ .poll(&mut events, Some(Duration::from_millis(300)))
+ .unwrap();
+ assert_eq!(num, 0);
+ });
+
+ assert!(is_about(300, ms), "actual={:?}", ms);
+
+ poll.reregister(
+ &timer,
+ Token(0),
+ Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot(),
+ )
+ .unwrap();
+
+ let ms = elapsed(|| {
+ let num = poll.poll(&mut events, None).unwrap();
+ assert_eq!(num, 1);
+ });
+
+ assert!(is_about(0, ms));
+}
+
+#[test]
+fn test_cancel_timeout() {
+ use std::time::Instant;
+
+ let mut timer: Timer<u32> = Default::default();
+ let timeout = timer.set_timeout(Duration::from_millis(200), 1);
+ timer.cancel_timeout(&timeout);
+
+ let poll = Poll::new().unwrap();
+ poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge())
+ .unwrap();
+
+ let mut events = Events::with_capacity(16);
+
+ let now = Instant::now();
+ let dur = Duration::from_millis(500);
+ let mut i = 0;
+
+ while Instant::now() - now < dur {
+ if i > 10 {
+ panic!("iterated too many times");
+ }
+
+ i += 1;
+
+ let elapsed = Instant::now() - now;
+
+ poll.poll(&mut events, Some(dur - elapsed)).unwrap();
+
+ while let Some(_) = timer.poll() {
+ panic!("did not expect to receive timeout");
+ }
+ }
+}
+
+fn elapsed<F: FnMut()>(mut f: F) -> u64 {
+ use std::time::Instant;
+
+ let now = Instant::now();
+
+ f();
+
+ let elapsed = now.elapsed();
+ elapsed.as_secs() * 1000 + u64::from(elapsed.subsec_millis())
+}
+
+fn is_about(expect: u64, val: u64) -> bool {
+ const WINDOW: i64 = 200;
+
+ ((expect as i64) - (val as i64)).abs() <= WINDOW
+}