diff options
Diffstat (limited to 'third_party/rust/mio-extras')
-rw-r--r-- | third_party/rust/mio-extras/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/mio-extras/CHANGELOG.md | 37 | ||||
-rw-r--r-- | third_party/rust/mio-extras/Cargo.toml | 40 | ||||
-rw-r--r-- | third_party/rust/mio-extras/LICENSE-APACHE | 201 | ||||
-rw-r--r-- | third_party/rust/mio-extras/LICENSE-MIT | 25 | ||||
-rw-r--r-- | third_party/rust/mio-extras/README.md | 30 | ||||
-rw-r--r-- | third_party/rust/mio-extras/src/channel.rs | 431 | ||||
-rw-r--r-- | third_party/rust/mio-extras/src/lib.rs | 33 | ||||
-rw-r--r-- | third_party/rust/mio-extras/src/timer.rs | 751 | ||||
-rw-r--r-- | third_party/rust/mio-extras/test/mod.rs | 45 | ||||
-rw-r--r-- | third_party/rust/mio-extras/test/test_poll_channel.rs | 362 | ||||
-rw-r--r-- | third_party/rust/mio-extras/test/test_timer.rs | 308 |
12 files changed, 2264 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/.cargo-checksum.json b/third_party/rust/mio-extras/.cargo-checksum.json new file mode 100644 index 0000000000..cb15205f9b --- /dev/null +++ b/third_party/rust/mio-extras/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"499f8d84e8bee37198044301a5fdfe6a811dd19b62c2c771616185a7543b8334","Cargo.toml":"a4d1b4f5de2908b805a42b31472dfd5fee30a49dc4f575a174c581c7b57b25b7","LICENSE-APACHE":"406e5cbaa2ad1178c300cf28ac5258e8d0db0de4f061e78db559d30e6f38e25c","LICENSE-MIT":"8aa414e6c821efd8be6bade07368a5d9f51f5cc55718bc54e10a59eb826b8d58","README.md":"fa2642be7bd670014c5e25bafbee73b8be0667ddbd193c1cc344a71d7f59463f","src/channel.rs":"a9fb5bcf889b03766821011e94b30a351b80501523c4a9fe5c45796eae218968","src/lib.rs":"2ed1572d3255208681d017265df7f642eb4898b1c2dace91676935f55e03eb04","src/timer.rs":"a1e71e38ab983291557d534ce2454a0ba5872652f4e7c4161131ba5150ec8d57","test/mod.rs":"aa3afc2582f00e5e2a2e5b87d12eb9810b0eed3248b48abef7094fd8d02d9c41","test/test_poll_channel.rs":"508815e265ae44328fb3d7c98cdf210815a9946bde291dd896de81df0394de37","test/test_timer.rs":"d04b6f57e9a395ce190022c0158cc498805758101e9fdad18b63829eb9bb6510"},"package":"52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"}
\ No newline at end of file diff --git a/third_party/rust/mio-extras/CHANGELOG.md b/third_party/rust/mio-extras/CHANGELOG.md new file mode 100644 index 0000000000..354b87584c --- /dev/null +++ b/third_party/rust/mio-extras/CHANGELOG.md @@ -0,0 +1,37 @@ +## 2.0.6 (7 Dec 2019) + +- fix license metadata in `Cargo.toml` (thanks @ignatenkobrain) + +## 2.0.5 (18 Jun 2018) + +- update `lazycell` from 0.6 -> 1.0 + +## 2.0.4 (7 Apr 2018) + +- Bump mio dependency (fixes minimal-versions build) + +## 2.0.3 (28 Dec 2017) + +- update `log` from 0.3 -> 0.4 + +## 2.0.2 + +- More docs tidying. + +## 2.0.1 + +- Another try at documenting the timer interface. + +## 2.0.0 + +- Remove channel implementation details from the API. Specifically, the + following are no longer public: + - `ctl_pair()` + - `SenderCtl` + - `ReceiverCtl` +- Document all APIs + +## 1.0.0 + +- Initial release. Essentially identical to + [mio-more](https://github.com/carllerche/mio-more). diff --git a/third_party/rust/mio-extras/Cargo.toml b/third_party/rust/mio-extras/Cargo.toml new file mode 100644 index 0000000000..d902a46019 --- /dev/null +++ b/third_party/rust/mio-extras/Cargo.toml @@ -0,0 +1,40 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "mio-extras" +version = "2.0.6" +authors = ["Carl Lerche <me@carllerche.com>", "David Hotham"] +exclude = [".gitignore"] +description = "Extra components for use with Mio" +documentation = "https://docs.rs/mio-extras" +readme = "README.md" +keywords = ["io", "async", "non-blocking"] +categories = ["asynchronous"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/dimbleby/mio-extras" + +[[test]] +name = "test" +path = "test/mod.rs" +[dependencies.lazycell] +version = "1" + +[dependencies.log] +version = "0.4" + +[dependencies.mio] +version = "0.6.14" + +[dependencies.slab] +version = "0.4" diff --git a/third_party/rust/mio-extras/LICENSE-APACHE b/third_party/rust/mio-extras/LICENSE-APACHE new file mode 100644 index 0000000000..a6e8ded657 --- /dev/null +++ b/third_party/rust/mio-extras/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright 2017 Mio authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/mio-extras/LICENSE-MIT b/third_party/rust/mio-extras/LICENSE-MIT new file mode 100644 index 0000000000..4cf193e73e --- /dev/null +++ b/third_party/rust/mio-extras/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 Mio authors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/mio-extras/README.md b/third_party/rust/mio-extras/README.md new file mode 100644 index 0000000000..8ed136a079 --- /dev/null +++ b/third_party/rust/mio-extras/README.md @@ -0,0 +1,30 @@ +# mio-extras + +Extra components for use with [Mio](https://github.com/tokio-rs/mio): + +- a channel that implements `Evented` +- a timer that implements `Evented` + +[![Build Status](https://travis-ci.org/dimbleby/mio-extras.svg?branch=master)](https://travis-ci.org/dimbleby/mio-extras) +[![crates.io](http://meritbadge.herokuapp.com/mio-extras)](https://crates.io/crates/mio-extras) + +[Documentation](https://docs.rs/mio-extras). + +## History and maintenance + +This repository is forked from +[`mio-more`](https://github.com/carllerche/mio-more), which is unmaintained. + +I don't intend to do very much with this except for routine maintenance - bug +fixes, updating dependencies, and suchlike. + +However if you have some code that you think belongs here, then by all means +raise an issue or open a pull request. + +# License + +`mio-extras` is primarily distributed under the terms of both the MIT license +and the Apache License (Version 2.0), with portions covered by various BSD-like +licenses. + +See LICENSE-APACHE, and LICENSE-MIT for details. diff --git a/third_party/rust/mio-extras/src/channel.rs b/third_party/rust/mio-extras/src/channel.rs new file mode 100644 index 0000000000..561317ecbf --- /dev/null +++ b/third_party/rust/mio-extras/src/channel.rs @@ -0,0 +1,431 @@ +//! Thread safe communication channel implementing `Evented` +use lazycell::{AtomicLazyCell, LazyCell}; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use std::any::Any; +use std::error; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{mpsc, Arc}; +use std::{fmt, io}; + +/// Creates a new asynchronous channel, where the `Receiver` can be registered +/// with `Poll`. +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::channel(); + + let tx = Sender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +/// Creates a new synchronous, bounded channel where the `Receiver` can be +/// registered with `Poll`. +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let (tx_ctl, rx_ctl) = ctl_pair(); + let (tx, rx) = mpsc::sync_channel(bound); + + let tx = SyncSender { tx, ctl: tx_ctl }; + + let rx = Receiver { rx, ctl: rx_ctl }; + + (tx, rx) +} + +fn ctl_pair() -> (SenderCtl, ReceiverCtl) { + let inner = Arc::new(Inner { + pending: AtomicUsize::new(0), + senders: AtomicUsize::new(1), + set_readiness: AtomicLazyCell::new(), + }); + + let tx = SenderCtl { + inner: Arc::clone(&inner), + }; + + let rx = ReceiverCtl { + registration: LazyCell::new(), + inner, + }; + + (tx, rx) +} + +/// Tracks messages sent on a channel in order to update readiness. +struct SenderCtl { + inner: Arc<Inner>, +} + +/// Tracks messages received on a channel in order to track readiness. +struct ReceiverCtl { + registration: LazyCell<Registration>, + inner: Arc<Inner>, +} + +/// The sending half of a channel. +pub struct Sender<T> { + tx: mpsc::Sender<T>, + ctl: SenderCtl, +} + +/// The sending half of a synchronous channel. +pub struct SyncSender<T> { + tx: mpsc::SyncSender<T>, + ctl: SenderCtl, +} + +/// The receiving half of a channel. +pub struct Receiver<T> { + rx: mpsc::Receiver<T>, + ctl: ReceiverCtl, +} + +/// An error returned from the `Sender::send` or `SyncSender::send` function. +pub enum SendError<T> { + /// An IO error. + Io(io::Error), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +/// An error returned from the `SyncSender::try_send` function. +pub enum TrySendError<T> { + /// An IO error. + Io(io::Error), + + /// Data could not be sent because it would require the callee to block. + Full(T), + + /// The receiving half of the channel has disconnected. + Disconnected(T), +} + +struct Inner { + // The number of outstanding messages for the receiver to read + pending: AtomicUsize, + // The number of sender handles + senders: AtomicUsize, + // The set readiness handle + set_readiness: AtomicLazyCell<SetReadiness>, +} + +impl<T> Sender<T> { + /// Attempts to send a value on this channel, returning it back if it could not be sent. + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.tx.send(t).map_err(SendError::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl<T> SyncSender<T> { + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.tx.send(t).map_err(From::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from `send` by returning immediately if the channel's + /// buffer is full or no receiver is waiting to acquire some data. + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.tx.try_send(t).map_err(From::from).and_then(|_| { + self.ctl.inc()?; + Ok(()) + }) + } +} + +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + SyncSender { + tx: self.tx.clone(), + ctl: self.ctl.clone(), + } + } +} + +impl<T> Receiver<T> { + /// Attempts to return a pending value on this receiver without blocking. + pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { + self.rx.try_recv().and_then(|res| { + let _ = self.ctl.dec(); + Ok(res) + }) + } +} + +impl<T> Evented for Receiver<T> { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + self.ctl.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.ctl.deregister(poll) + } +} + +/* + * + * ===== SenderCtl / ReceiverCtl ===== + * + */ + +impl SenderCtl { + /// Call to track that a message has been sent + fn inc(&self) -> io::Result<()> { + let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire); + + if 0 == cnt { + // Toggle readiness to readable + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Clone for SenderCtl { + fn clone(&self) -> SenderCtl { + self.inner.senders.fetch_add(1, Ordering::Relaxed); + SenderCtl { + inner: Arc::clone(&self.inner), + } + } +} + +impl Drop for SenderCtl { + fn drop(&mut self) { + if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 { + let _ = self.inc(); + } + } +} + +impl ReceiverCtl { + fn dec(&self) -> io::Result<()> { + let first = self.inner.pending.load(Ordering::Acquire); + + if first == 1 { + // Unset readiness + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::empty())?; + } + } + + // Decrement + let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel); + + if first == 1 && second > 1 { + // There are still pending messages. Since readiness was + // previously unset, it must be reset here + if let Some(set_readiness) = self.inner.set_readiness.borrow() { + set_readiness.set_readiness(Ready::readable())?; + } + } + + Ok(()) + } +} + +impl Evented for ReceiverCtl { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.registration.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "receiver already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + + if self.inner.pending.load(Ordering::Relaxed) > 0 { + // TODO: Don't drop readiness + let _ = set_readiness.set_readiness(Ready::readable()); + } + + self.registration + .fill(registration) + .expect("unexpected state encountered"); + self.inner + .set_readiness + .fill(set_readiness) + .expect("unexpected state encountered"); + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.reregister(registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.registration.borrow() { + Some(registration) => poll.deregister(registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +/* + * + * ===== Error conversions ===== + * + */ + +impl<T> From<mpsc::SendError<T>> for SendError<T> { + fn from(src: mpsc::SendError<T>) -> SendError<T> { + SendError::Disconnected(src.0) + } +} + +impl<T> From<io::Error> for SendError<T> { + fn from(src: io::Error) -> SendError<T> { + SendError::Io(src) + } +} + +impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> { + fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> { + match src { + mpsc::TrySendError::Full(v) => TrySendError::Full(v), + mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v), + } + } +} + +impl<T> From<mpsc::SendError<T>> for TrySendError<T> { + fn from(src: mpsc::SendError<T>) -> TrySendError<T> { + TrySendError::Disconnected(src.0) + } +} + +impl<T> From<io::Error> for TrySendError<T> { + fn from(src: io::Error) -> TrySendError<T> { + TrySendError::Io(src) + } +} + +/* + * + * ===== Implement Error, Debug and Display for Errors ===== + * + */ + +impl<T: Any> error::Error for SendError<T> { + fn description(&self) -> &str { + match *self { + SendError::Io(ref io_err) => io_err.description(), + SendError::Disconnected(..) => "Disconnected", + } + } +} + +impl<T: Any> error::Error for TrySendError<T> { + fn description(&self) -> &str { + match *self { + TrySendError::Io(ref io_err) => io_err.description(), + TrySendError::Full(..) => "Full", + TrySendError::Disconnected(..) => "Disconnected", + } + } +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_send_error(self, f) + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + format_try_send_error(self, f) + } +} + +#[inline] +fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + SendError::Io(ref io_err) => write!(f, "{}", io_err), + SendError::Disconnected(..) => write!(f, "Disconnected"), + } +} + +#[inline] +fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result { + match *e { + TrySendError::Io(ref io_err) => write!(f, "{}", io_err), + TrySendError::Full(..) => write!(f, "Full"), + TrySendError::Disconnected(..) => write!(f, "Disconnected"), + } +} diff --git a/third_party/rust/mio-extras/src/lib.rs b/third_party/rust/mio-extras/src/lib.rs new file mode 100644 index 0000000000..69a000556c --- /dev/null +++ b/third_party/rust/mio-extras/src/lib.rs @@ -0,0 +1,33 @@ +//! Extra components for use with Mio. +#![deny(missing_docs)] +extern crate lazycell; +extern crate mio; +extern crate slab; + +#[macro_use] +extern crate log; + +pub mod channel; +pub mod timer; + +// Conversion utilities +mod convert { + use std::time::Duration; + + const NANOS_PER_MILLI: u32 = 1_000_000; + const MILLIS_PER_SEC: u64 = 1_000; + + /// Convert a `Duration` to milliseconds, rounding up and saturating at + /// `u64::MAX`. + /// + /// The saturating is fine because `u64::MAX` milliseconds are still many + /// million years. + pub fn millis(duration: Duration) -> u64 { + // Round up. + let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI; + duration + .as_secs() + .saturating_mul(MILLIS_PER_SEC) + .saturating_add(u64::from(millis)) + } +} diff --git a/third_party/rust/mio-extras/src/timer.rs b/third_party/rust/mio-extras/src/timer.rs new file mode 100644 index 0000000000..876026c99c --- /dev/null +++ b/third_party/rust/mio-extras/src/timer.rs @@ -0,0 +1,751 @@ +//! Timer optimized for I/O related operations +use crate::convert; +use lazycell::LazyCell; +use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; +use slab::Slab; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::{cmp, fmt, io, iter, thread, u64, usize}; + +/// A timer. +/// +/// Typical usage goes like this: +/// +/// * register the timer with a `mio::Poll`. +/// * set a timeout, by calling `Timer::set_timeout`. Here you provide some +/// state to be associated with this timeout. +/// * poll the `Poll`, to learn when a timeout has occurred. +/// * retrieve state associated with the timeout by calling `Timer::poll`. +/// +/// You can omit use of the `Poll` altogether, if you like, and just poll the +/// `Timer` directly. +pub struct Timer<T> { + // Size of each tick in milliseconds + tick_ms: u64, + // Slab of timeout entries + entries: Slab<Entry<T>>, + // Timeout wheel. Each tick, the timer will look at the next slot for + // timeouts that match the current tick. + wheel: Vec<WheelEntry>, + // Tick 0's time instant + start: Instant, + // The current tick + tick: Tick, + // The next entry to possibly timeout + next: Token, + // Masks the target tick to get the slot + mask: u64, + // Set on registration with Poll + inner: LazyCell<Inner>, +} + +/// Used to create a `Timer`. +pub struct Builder { + // Approximate duration of each tick + tick: Duration, + // Number of slots in the timer wheel + num_slots: usize, + // Max number of timeouts that can be in flight at a given time. + capacity: usize, +} + +/// A timeout, as returned by `Timer::set_timeout`. +/// +/// Use this as the argument to `Timer::cancel_timeout`, to cancel this timeout. +#[derive(Clone, Debug)] +pub struct Timeout { + // Reference into the timer entry slab + token: Token, + // Tick that it should match up with + tick: u64, +} + +struct Inner { + registration: Registration, + set_readiness: SetReadiness, + wakeup_state: WakeupState, + wakeup_thread: thread::JoinHandle<()>, +} + +impl Drop for Inner { + fn drop(&mut self) { + // 1. Set wakeup state to TERMINATE_THREAD + self.wakeup_state.store(TERMINATE_THREAD, Ordering::Release); + // 2. Wake him up + self.wakeup_thread.thread().unpark(); + } +} + +#[derive(Copy, Clone, Debug)] +struct WheelEntry { + next_tick: Tick, + head: Token, +} + +// Doubly linked list of timer entries. Allows for efficient insertion / +// removal of timeouts. +struct Entry<T> { + state: T, + links: EntryLinks, +} + +#[derive(Copy, Clone)] +struct EntryLinks { + tick: Tick, + prev: Token, + next: Token, +} + +type Tick = u64; + +const TICK_MAX: Tick = u64::MAX; + +// Manages communication with wakeup thread +type WakeupState = Arc<AtomicUsize>; + +const TERMINATE_THREAD: usize = 0; +const EMPTY: Token = Token(usize::MAX); + +impl Builder { + /// Set the tick duration. Default is 100ms. + pub fn tick_duration(mut self, duration: Duration) -> Builder { + self.tick = duration; + self + } + + /// Set the number of slots. Default is 256. + pub fn num_slots(mut self, num_slots: usize) -> Builder { + self.num_slots = num_slots; + self + } + + /// Set the capacity. Default is 65536. + pub fn capacity(mut self, capacity: usize) -> Builder { + self.capacity = capacity; + self + } + + /// Build a `Timer` with the parameters set on this `Builder`. + pub fn build<T>(self) -> Timer<T> { + Timer::new( + convert::millis(self.tick), + self.num_slots, + self.capacity, + Instant::now(), + ) + } +} + +impl Default for Builder { + fn default() -> Builder { + Builder { + tick: Duration::from_millis(100), + num_slots: 1 << 8, + capacity: 1 << 16, + } + } +} + +impl<T> Timer<T> { + fn new(tick_ms: u64, num_slots: usize, capacity: usize, start: Instant) -> Timer<T> { + let num_slots = num_slots.next_power_of_two(); + let capacity = capacity.next_power_of_two(); + let mask = (num_slots as u64) - 1; + let wheel = iter::repeat(WheelEntry { + next_tick: TICK_MAX, + head: EMPTY, + }) + .take(num_slots) + .collect(); + + Timer { + tick_ms, + entries: Slab::with_capacity(capacity), + wheel, + start, + tick: 0, + next: EMPTY, + mask, + inner: LazyCell::new(), + } + } + + /// Set a timeout. + /// + /// When the timeout occurs, the given state becomes available via `poll`. + pub fn set_timeout(&mut self, delay_from_now: Duration, state: T) -> Timeout { + let delay_from_start = self.start.elapsed() + delay_from_now; + self.set_timeout_at(delay_from_start, state) + } + + fn set_timeout_at(&mut self, delay_from_start: Duration, state: T) -> Timeout { + let mut tick = duration_to_tick(delay_from_start, self.tick_ms); + trace!( + "setting timeout; delay={:?}; tick={:?}; current-tick={:?}", + delay_from_start, + tick, + self.tick + ); + + // Always target at least 1 tick in the future + if tick <= self.tick { + tick = self.tick + 1; + } + + self.insert(tick, state) + } + + fn insert(&mut self, tick: Tick, state: T) -> Timeout { + // Get the slot for the requested tick + let slot = (tick & self.mask) as usize; + let curr = self.wheel[slot]; + + // Insert the new entry + let entry = Entry::new(state, tick, curr.head); + let token = Token(self.entries.insert(entry)); + + if curr.head != EMPTY { + // If there was a previous entry, set its prev pointer to the new + // entry + self.entries[curr.head.into()].links.prev = token; + } + + // Update the head slot + self.wheel[slot] = WheelEntry { + next_tick: cmp::min(tick, curr.next_tick), + head: token, + }; + + self.schedule_readiness(tick); + + trace!("inserted timout; slot={}; token={:?}", slot, token); + + // Return the new timeout + Timeout { token, tick } + } + + /// Cancel a timeout. + /// + /// If the timeout has not yet occurred, the return value holds the + /// associated state. + pub fn cancel_timeout(&mut self, timeout: &Timeout) -> Option<T> { + let links = match self.entries.get(timeout.token.into()) { + Some(e) => e.links, + None => return None, + }; + + // Sanity check + if links.tick != timeout.tick { + return None; + } + + self.unlink(&links, timeout.token); + Some(self.entries.remove(timeout.token.into()).state) + } + + /// Poll for an expired timer. + /// + /// The return value holds the state associated with the first expired + /// timer, if any. + pub fn poll(&mut self) -> Option<T> { + let target_tick = current_tick(self.start, self.tick_ms); + self.poll_to(target_tick) + } + + fn poll_to(&mut self, mut target_tick: Tick) -> Option<T> { + trace!( + "tick_to; target_tick={}; current_tick={}", + target_tick, + self.tick + ); + + if target_tick < self.tick { + target_tick = self.tick; + } + + while self.tick <= target_tick { + let curr = self.next; + + trace!("ticking; curr={:?}", curr); + + if curr == EMPTY { + self.tick += 1; + + let slot = self.slot_for(self.tick); + self.next = self.wheel[slot].head; + + // Handle the case when a slot has a single timeout which gets + // canceled before the timeout expires. In this case, the + // slot's head is EMPTY but there is a value for next_tick. Not + // resetting next_tick here causes the timer to get stuck in a + // loop. + if self.next == EMPTY { + self.wheel[slot].next_tick = TICK_MAX; + } + } else { + let slot = self.slot_for(self.tick); + + if curr == self.wheel[slot].head { + self.wheel[slot].next_tick = TICK_MAX; + } + + let links = self.entries[curr.into()].links; + + if links.tick <= self.tick { + trace!("triggering; token={:?}", curr); + + // Unlink will also advance self.next + self.unlink(&links, curr); + + // Remove and return the token + return Some(self.entries.remove(curr.into()).state); + } else { + let next_tick = self.wheel[slot].next_tick; + self.wheel[slot].next_tick = cmp::min(next_tick, links.tick); + self.next = links.next; + } + } + } + + // No more timeouts to poll + if let Some(inner) = self.inner.borrow() { + trace!("unsetting readiness"); + let _ = inner.set_readiness.set_readiness(Ready::empty()); + + if let Some(tick) = self.next_tick() { + self.schedule_readiness(tick); + } + } + + None + } + + fn unlink(&mut self, links: &EntryLinks, token: Token) { + trace!( + "unlinking timeout; slot={}; token={:?}", + self.slot_for(links.tick), + token + ); + + if links.prev == EMPTY { + let slot = self.slot_for(links.tick); + self.wheel[slot].head = links.next; + } else { + self.entries[links.prev.into()].links.next = links.next; + } + + if links.next != EMPTY { + self.entries[links.next.into()].links.prev = links.prev; + + if token == self.next { + self.next = links.next; + } + } else if token == self.next { + self.next = EMPTY; + } + } + + fn schedule_readiness(&self, tick: Tick) { + if let Some(inner) = self.inner.borrow() { + // Coordinate setting readiness w/ the wakeup thread + let mut curr = inner.wakeup_state.load(Ordering::Acquire); + + loop { + if curr as Tick <= tick { + // Nothing to do, wakeup is already scheduled + return; + } + + // Attempt to move the wakeup time forward + trace!("advancing the wakeup time; target={}; curr={}", tick, curr); + let actual = + inner + .wakeup_state + .compare_and_swap(curr, tick as usize, Ordering::Release); + + if actual == curr { + // Signal to the wakeup thread that the wakeup time has + // been changed. + trace!("unparking wakeup thread"); + inner.wakeup_thread.thread().unpark(); + return; + } + + curr = actual; + } + } + } + + // Next tick containing a timeout + fn next_tick(&self) -> Option<Tick> { + if self.next != EMPTY { + let slot = self.slot_for(self.entries[self.next.into()].links.tick); + + if self.wheel[slot].next_tick == self.tick { + // There is data ready right now + return Some(self.tick); + } + } + + self.wheel.iter().map(|e| e.next_tick).min() + } + + fn slot_for(&self, tick: Tick) -> usize { + (self.mask & tick) as usize + } +} + +impl<T> Default for Timer<T> { + fn default() -> Timer<T> { + Builder::default().build() + } +} + +impl<T> Evented for Timer<T> { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + if self.inner.borrow().is_some() { + return Err(io::Error::new( + io::ErrorKind::Other, + "timer already registered", + )); + } + + let (registration, set_readiness) = Registration::new2(); + poll.register(®istration, token, interest, opts)?; + let wakeup_state = Arc::new(AtomicUsize::new(usize::MAX)); + let thread_handle = spawn_wakeup_thread( + Arc::clone(&wakeup_state), + set_readiness.clone(), + self.start, + self.tick_ms, + ); + + self.inner + .fill(Inner { + registration, + set_readiness, + wakeup_state, + wakeup_thread: thread_handle, + }) + .expect("timer already registered"); + + if let Some(next_tick) = self.next_tick() { + self.schedule_readiness(next_tick); + } + + Ok(()) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> io::Result<()> { + match self.inner.borrow() { + Some(inner) => poll.reregister(&inner.registration, token, interest, opts), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + match self.inner.borrow() { + Some(inner) => poll.deregister(&inner.registration), + None => Err(io::Error::new( + io::ErrorKind::Other, + "receiver not registered", + )), + } + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Inner") + .field("registration", &self.registration) + .field("wakeup_state", &self.wakeup_state.load(Ordering::Relaxed)) + .finish() + } +} + +fn spawn_wakeup_thread( + state: WakeupState, + set_readiness: SetReadiness, + start: Instant, + tick_ms: u64, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + let mut sleep_until_tick = state.load(Ordering::Acquire) as Tick; + + loop { + if sleep_until_tick == TERMINATE_THREAD as Tick { + return; + } + + let now_tick = current_tick(start, tick_ms); + + trace!( + "wakeup thread: sleep_until_tick={:?}; now_tick={:?}", + sleep_until_tick, + now_tick + ); + + if now_tick < sleep_until_tick { + // Calling park_timeout with u64::MAX leads to undefined + // behavior in pthread, causing the park to return immediately + // and causing the thread to tightly spin. Instead of u64::MAX + // on large values, simply use a blocking park. + match tick_ms.checked_mul(sleep_until_tick - now_tick) { + Some(sleep_duration) => { + trace!( + "sleeping; tick_ms={}; now_tick={}; sleep_until_tick={}; duration={:?}", + tick_ms, + now_tick, + sleep_until_tick, + sleep_duration + ); + thread::park_timeout(Duration::from_millis(sleep_duration)); + } + None => { + trace!( + "sleeping; tick_ms={}; now_tick={}; blocking sleep", + tick_ms, + now_tick + ); + thread::park(); + } + } + sleep_until_tick = state.load(Ordering::Acquire) as Tick; + } else { + let actual = + state.compare_and_swap(sleep_until_tick as usize, usize::MAX, Ordering::AcqRel) + as Tick; + + if actual == sleep_until_tick { + trace!("setting readiness from wakeup thread"); + let _ = set_readiness.set_readiness(Ready::readable()); + sleep_until_tick = usize::MAX as Tick; + } else { + sleep_until_tick = actual as Tick; + } + } + } + }) +} + +fn duration_to_tick(elapsed: Duration, tick_ms: u64) -> Tick { + // Calculate tick rounding up to the closest one + let elapsed_ms = convert::millis(elapsed); + elapsed_ms.saturating_add(tick_ms / 2) / tick_ms +} + +fn current_tick(start: Instant, tick_ms: u64) -> Tick { + duration_to_tick(start.elapsed(), tick_ms) +} + +impl<T> Entry<T> { + fn new(state: T, tick: u64, next: Token) -> Entry<T> { + Entry { + state, + links: EntryLinks { + tick, + prev: EMPTY, + next, + }, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::time::{Duration, Instant}; + + #[test] + pub fn test_timeout_next_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + + tick = ms_to_tick(&t, 50); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 150); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_clearing_timeout() { + let mut t = timer(); + let mut tick; + + let to = t.set_timeout_at(Duration::from_millis(100), "a"); + assert_eq!("a", t.cancel_timeout(&to).unwrap()); + + tick = ms_to_tick(&t, 100); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_multiple_timeouts_same_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + t.set_timeout_at(Duration::from_millis(100), "b"); + + let mut rcv = vec![]; + + tick = ms_to_tick(&t, 100); + rcv.push(t.poll_to(tick).unwrap()); + rcv.push(t.poll_to(tick).unwrap()); + + assert_eq!(None, t.poll_to(tick)); + + rcv.sort(); + assert!(rcv == ["a", "b"], "actual={:?}", rcv); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + + assert_eq!(count(&t), 0); + } + + #[test] + pub fn test_multiple_timeouts_diff_tick() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(110), "a"); + t.set_timeout_at(Duration::from_millis(220), "b"); + t.set_timeout_at(Duration::from_millis(230), "c"); + t.set_timeout_at(Duration::from_millis(440), "d"); + t.set_timeout_at(Duration::from_millis(560), "e"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 300); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 400); + assert_eq!(Some("d"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 500); + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 600); + assert_eq!(Some("e"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + } + + #[test] + pub fn test_catching_up() { + let mut t = timer(); + + t.set_timeout_at(Duration::from_millis(110), "a"); + t.set_timeout_at(Duration::from_millis(220), "b"); + t.set_timeout_at(Duration::from_millis(230), "c"); + t.set_timeout_at(Duration::from_millis(440), "d"); + + let tick = ms_to_tick(&t, 600); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(Some("d"), t.poll_to(tick)); + assert_eq!(None, t.poll_to(tick)); + } + + #[test] + pub fn test_timeout_hash_collision() { + let mut t = timer(); + let mut tick; + + t.set_timeout_at(Duration::from_millis(100), "a"); + t.set_timeout_at(Duration::from_millis(100 + TICK * SLOTS as u64), "b"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("a"), t.poll_to(tick)); + assert_eq!(1, count(&t)); + + tick = ms_to_tick(&t, 200); + assert_eq!(None, t.poll_to(tick)); + assert_eq!(1, count(&t)); + + tick = ms_to_tick(&t, 100 + TICK * SLOTS as u64); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(0, count(&t)); + } + + #[test] + pub fn test_clearing_timeout_between_triggers() { + let mut t = timer(); + let mut tick; + + let a = t.set_timeout_at(Duration::from_millis(100), "a"); + let _ = t.set_timeout_at(Duration::from_millis(100), "b"); + let _ = t.set_timeout_at(Duration::from_millis(200), "c"); + + tick = ms_to_tick(&t, 100); + assert_eq!(Some("b"), t.poll_to(tick)); + assert_eq!(2, count(&t)); + + t.cancel_timeout(&a); + assert_eq!(1, count(&t)); + + assert_eq!(None, t.poll_to(tick)); + + tick = ms_to_tick(&t, 200); + assert_eq!(Some("c"), t.poll_to(tick)); + assert_eq!(0, count(&t)); + } + + const TICK: u64 = 100; + const SLOTS: usize = 16; + const CAPACITY: usize = 32; + + fn count<T>(timer: &Timer<T>) -> usize { + timer.entries.len() + } + + fn timer() -> Timer<&'static str> { + Timer::new(TICK, SLOTS, CAPACITY, Instant::now()) + } + + fn ms_to_tick<T>(timer: &Timer<T>, ms: u64) -> u64 { + ms / timer.tick_ms + } +} diff --git a/third_party/rust/mio-extras/test/mod.rs b/third_party/rust/mio-extras/test/mod.rs new file mode 100644 index 0000000000..217069466a --- /dev/null +++ b/third_party/rust/mio-extras/test/mod.rs @@ -0,0 +1,45 @@ +extern crate mio; +extern crate mio_extras; + +use mio::event::Event; +use mio::{Events, Poll}; +use std::time::Duration; + +mod test_poll_channel; +mod test_timer; + +pub fn expect_events( + poll: &Poll, + event_buffer: &mut Events, + poll_try_count: usize, + mut expected: Vec<Event>, +) { + const MS: u64 = 1_000; + + for _ in 0..poll_try_count { + poll.poll(event_buffer, Some(Duration::from_millis(MS))) + .unwrap(); + for event in event_buffer.iter() { + let pos_opt = match expected.iter().position(|exp_event| { + (event.token() == exp_event.token()) + && event.readiness().contains(exp_event.readiness()) + }) { + Some(x) => Some(x), + None => None, + }; + if let Some(pos) = pos_opt { + expected.remove(pos); + } + } + + if expected.is_empty() { + break; + } + } + + assert!( + expected.is_empty(), + "The following expected events were not found: {:?}", + expected + ); +} diff --git a/third_party/rust/mio-extras/test/test_poll_channel.rs b/third_party/rust/mio-extras/test/test_poll_channel.rs new file mode 100644 index 0000000000..7314f26661 --- /dev/null +++ b/third_party/rust/mio-extras/test/test_poll_channel.rs @@ -0,0 +1,362 @@ +use crate::expect_events; +use mio::event::Event; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio_extras::channel; +use std::sync::mpsc::TryRecvError; +use std::thread; +use std::time::Duration; + +#[test] +pub fn test_poll_channel_edge() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Poll again and there should be no events + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Poll again, nothing + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push a value + tx.send("goodbye").unwrap(); + + // Have an event + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Read the value + rx.try_recv().unwrap(); + + // Drop the sender half + drop(tx); + + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + match rx.try_recv() { + Err(TryRecvError::Disconnected) => {} + no => panic!("unexpected value {:?}", no), + } +} + +#[test] +pub fn test_poll_channel_oneshot() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ) + .unwrap(); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Poll again and there should be no events + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Poll again, nothing + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push a value + tx.send("goodbye").unwrap(); + + // Poll again, nothing + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Reregistering will re-trigger the notification + for _ in 0..3 { + poll.reregister( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ) + .unwrap(); + + // Have an event + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + } + + // Get the value + assert_eq!("goodbye", rx.try_recv().unwrap()); + + poll.reregister( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ) + .unwrap(); + + // Have an event + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + poll.reregister( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ) + .unwrap(); + + // Have an event + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_poll_channel_level() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()) + .unwrap(); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + for i in 0..5 { + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert!(1 == num, "actually got {} on iteration {}", num, i); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + } + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_poll_channel_writable() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()) + .unwrap(); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_dropping_receive_before_poll() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Push the value + tx.send("hello").unwrap(); + + // Drop the receive end + drop(rx); + + // Wait, but nothing should happen + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_mixing_channel_with_socket() { + use mio::net::{TcpListener, TcpStream}; + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Push a value onto the channel + tx.send("hello").unwrap(); + + // Connect a TCP socket + let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + + // Register the socket + poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Sleep a bit to ensure it arrives at dest + thread::sleep(Duration::from_millis(250)); + + expect_events( + &poll, + &mut events, + 2, + vec![ + Event::new(Ready::empty(), Token(0)), + Event::new(Ready::empty(), Token(1)), + ], + ); +} + +#[test] +pub fn test_sending_from_other_thread_while_polling() { + const ITERATIONS: usize = 20; + const THREADS: usize = 5; + + // Make sure to run multiple times + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + + for _ in 0..ITERATIONS { + let (tx, rx) = channel::channel(); + poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + for _ in 0..THREADS { + let tx = tx.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + tx.send("ping").unwrap(); + }); + } + + let mut recv = 0; + + while recv < THREADS { + let num = poll.poll(&mut events, None).unwrap(); + + if num != 0 { + assert_eq!(1, num); + assert_eq!(events.iter().next().unwrap().token(), Token(0)); + + while let Ok(_) = rx.try_recv() { + recv += 1; + } + } + } + } +} diff --git a/third_party/rust/mio-extras/test/test_timer.rs b/third_party/rust/mio-extras/test/test_timer.rs new file mode 100644 index 0000000000..ac49833523 --- /dev/null +++ b/third_party/rust/mio-extras/test/test_timer.rs @@ -0,0 +1,308 @@ +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio_extras::timer::{self, Timer}; + +use std::thread; +use std::time::Duration; + +#[test] +fn test_basic_timer_without_poll() { + let mut timer = Timer::default(); + + // Set the timeout + timer.set_timeout(Duration::from_millis(200), "hello"); + + // Nothing when polled immediately + assert!(timer.poll().is_none()); + + // Wait for the timeout + thread::sleep(Duration::from_millis(250)); + + assert_eq!(Some("hello"), timer.poll()); + assert!(timer.poll().is_none()); +} + +#[test] +fn test_basic_timer_with_poll_edge_set_timeout_after_register() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = Timer::default(); + + poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + timer.set_timeout(Duration::from_millis(200), "hello"); + + let elapsed = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(200, elapsed), "actual={:?}", elapsed); + assert_eq!("hello", timer.poll().unwrap()); + assert_eq!(None, timer.poll()); +} + +#[test] +fn test_basic_timer_with_poll_edge_set_timeout_before_register() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = Timer::default(); + + timer.set_timeout(Duration::from_millis(200), "hello"); + poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + let elapsed = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(200, elapsed), "actual={:?}", elapsed); + assert_eq!("hello", timer.poll().unwrap()); + assert_eq!(None, timer.poll()); +} + +#[test] +fn test_setting_later_timeout_then_earlier_one() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = Timer::default(); + + poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + timer.set_timeout(Duration::from_millis(600), "hello"); + timer.set_timeout(Duration::from_millis(200), "world"); + + let elapsed = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(200, elapsed), "actual={:?}", elapsed); + assert_eq!("world", timer.poll().unwrap()); + assert_eq!(None, timer.poll()); + + let elapsed = self::elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(400, elapsed), "actual={:?}", elapsed); + assert_eq!("hello", timer.poll().unwrap()); + assert_eq!(None, timer.poll()); +} + +#[test] +fn test_timer_with_looping_wheel() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = timer::Builder::default().num_slots(2).build(); + + poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + const TOKENS: &[&str] = &["hello", "world", "some", "thing"]; + + for (i, msg) in TOKENS.iter().enumerate() { + timer.set_timeout(Duration::from_millis(500 * (i as u64 + 1)), msg); + } + + for msg in TOKENS { + let elapsed = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!( + is_about(500, elapsed), + "actual={:?}; msg={:?}", + elapsed, + msg + ); + assert_eq!(Some(msg), timer.poll()); + assert_eq!(None, timer.poll()); + } +} + +#[test] +fn test_edge_without_polling() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = Timer::default(); + + poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + timer.set_timeout(Duration::from_millis(400), "hello"); + + let ms = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(400, ms), "actual={:?}", ms); + + let ms = elapsed(|| { + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(num, 0); + }); + + assert!(is_about(300, ms), "actual={:?}", ms); +} + +#[test] +fn test_level_triggered() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = Timer::default(); + + poll.register(&timer, Token(0), Ready::readable(), PollOpt::level()) + .unwrap(); + + timer.set_timeout(Duration::from_millis(400), "hello"); + + let ms = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(400, ms), "actual={:?}", ms); + + let ms = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + assert_eq!(num, 1); + let event = events.iter().next().unwrap(); + assert_eq!(Token(0), event.token()); + assert_eq!(Ready::readable(), event.readiness()); + }); + + assert!(is_about(0, ms), "actual={:?}", ms); +} + +#[test] +fn test_edge_oneshot_triggered() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let mut timer = Timer::default(); + + poll.register( + &timer, + Token(0), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ) + .unwrap(); + + timer.set_timeout(Duration::from_millis(200), "hello"); + + let ms = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + assert_eq!(num, 1); + }); + + assert!(is_about(200, ms), "actual={:?}", ms); + + let ms = elapsed(|| { + let num = poll + .poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(num, 0); + }); + + assert!(is_about(300, ms), "actual={:?}", ms); + + poll.reregister( + &timer, + Token(0), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ) + .unwrap(); + + let ms = elapsed(|| { + let num = poll.poll(&mut events, None).unwrap(); + assert_eq!(num, 1); + }); + + assert!(is_about(0, ms)); +} + +#[test] +fn test_cancel_timeout() { + use std::time::Instant; + + let mut timer: Timer<u32> = Default::default(); + let timeout = timer.set_timeout(Duration::from_millis(200), 1); + timer.cancel_timeout(&timeout); + + let poll = Poll::new().unwrap(); + poll.register(&timer, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + let mut events = Events::with_capacity(16); + + let now = Instant::now(); + let dur = Duration::from_millis(500); + let mut i = 0; + + while Instant::now() - now < dur { + if i > 10 { + panic!("iterated too many times"); + } + + i += 1; + + let elapsed = Instant::now() - now; + + poll.poll(&mut events, Some(dur - elapsed)).unwrap(); + + while let Some(_) = timer.poll() { + panic!("did not expect to receive timeout"); + } + } +} + +fn elapsed<F: FnMut()>(mut f: F) -> u64 { + use std::time::Instant; + + let now = Instant::now(); + + f(); + + let elapsed = now.elapsed(); + elapsed.as_secs() * 1000 + u64::from(elapsed.subsec_millis()) +} + +fn is_about(expect: u64, val: u64) -> bool { + const WINDOW: i64 = 200; + + ((expect as i64) - (val as i64)).abs() <= WINDOW +} |