diff options
Diffstat (limited to 'third_party/rust/mio-named-pipes')
-rw-r--r-- | third_party/rust/mio-named-pipes/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/Cargo.toml | 25 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/LICENSE-APACHE | 201 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/LICENSE-MIT | 25 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/README.md | 49 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/appveyor.yml | 18 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/src/from_raw_arc.rs | 116 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/src/lib.rs | 717 | ||||
-rw-r--r-- | third_party/rust/mio-named-pipes/tests/smoke.rs | 274 |
9 files changed, 1426 insertions, 0 deletions
diff --git a/third_party/rust/mio-named-pipes/.cargo-checksum.json b/third_party/rust/mio-named-pipes/.cargo-checksum.json new file mode 100644 index 0000000000..a846e81bc8 --- /dev/null +++ b/third_party/rust/mio-named-pipes/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"2e062a21894df50c23cbe4d8ecec92bc7fcd84a71760b034f8052b4dd2dffce3","LICENSE-APACHE":"a60eea817514531668d7e00765731449fe14d059d3249e0bc93b36de45f759f2","LICENSE-MIT":"378f5840b258e2779c39418f3f2d7b2ba96f1c7917dd6be0713f88305dbda397","README.md":"7b08962c6e08aa32089a76af7ba08553fbba42be7efae2e8f935baa7e34394ca","appveyor.yml":"0bd9b8e5a94a36972b37227cc59984fc6ec01b4ee4b617ef20d0e3acd19f44b1","src/from_raw_arc.rs":"db036a55fc797eab8d5f55caecd49aab960221bbad5f24c903fae28194e19e0e","src/lib.rs":"b890af801e60352d039a5d4939b8bbdc75f23c7230c7112c4800afac7532cca8","tests/smoke.rs":"567a8913569c03a3603a26a026b7ba4302ce8a33efa6a01b67a3cf531adf9419"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/mio-named-pipes/Cargo.toml b/third_party/rust/mio-named-pipes/Cargo.toml new file mode 100644 index 0000000000..ab9410c523 --- /dev/null +++ b/third_party/rust/mio-named-pipes/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "mio-named-pipes" +version = "0.1.6" +authors = ["Alex Crichton <alex@alexcrichton.com>"] +license = "MIT/Apache-2.0" +readme = "README.md" +repository = "https://github.com/alexcrichton/mio-named-pipes" +homepage = "https://github.com/alexcrichton/mio-named-pipes" +documentation = "https://docs.rs/mio-named-pipes/0.1/x86_64-pc-windows-msvc/mio_named_pipes/" +description = """ +Windows named pipe bindings for mio. +""" + +[target.'cfg(windows)'.dependencies] +log = "0.4" +mio = "0.6.5" +miow = "0.3" + +[target.'cfg(windows)'.dependencies.winapi] +version = "0.3" +features = ["winerror", "ioapiset", "minwinbase", "winbase"] + +[dev-dependencies] +env_logger = { version = "0.4", default-features = false } +rand = "0.4" diff --git a/third_party/rust/mio-named-pipes/LICENSE-APACHE b/third_party/rust/mio-named-pipes/LICENSE-APACHE new file mode 100644 index 0000000000..16fe87b06e --- /dev/null +++ b/third_party/rust/mio-named-pipes/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/third_party/rust/mio-named-pipes/LICENSE-MIT b/third_party/rust/mio-named-pipes/LICENSE-MIT new file mode 100644 index 0000000000..39e0ed6602 --- /dev/null +++ b/third_party/rust/mio-named-pipes/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2014 Alex Crichton + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/mio-named-pipes/README.md b/third_party/rust/mio-named-pipes/README.md new file mode 100644 index 0000000000..7290c61ae1 --- /dev/null +++ b/third_party/rust/mio-named-pipes/README.md @@ -0,0 +1,49 @@ +# mio-named-pipes + +[![Build status](https://ci.appveyor.com/api/projects/status/y0ct01srewnhhesn?svg=true)](https://ci.appveyor.com/project/alexcrichton/mio-named-pipes) + +[Documentation](https://docs.rs/mio-named-pipes/0.1/x86_64-pc-windows-msvc/mio_named_pipes/) + +A library for integrating Windows [Named Pipes] with [mio]. + +[Named Pipes]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365590(v=vs.85).aspx +[mio]: https://github.com/carllerche/mio + +```toml +# Cargo.toml +[dependencies] +mio-named-pipes = "0.1" +mio = "0.6" +``` + +## Usage + +The primary type, `NamedPipe`, can be constructed with `NamedPipe::new` or +through the `IntoRawHandle` type. All operations on `NamedPipe` are nonblocking +and will return an I/O error if they'd block (with the error indicating so). + +Typically you can use a `NamedPipe` in the same way you would a TCP socket on +Windows with mio. + +> **Note**: Named pipes on Windows do not have a zero-cost abstraction when +> working with the mio interface (readiness, not completion). As a result, this +> library internally has some buffer management that hasn't been optimized yet. +> It's recommended you benchmark this library for your application, and feel +> free to contact me if anything looks awry. + +# License + +This project is licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or + http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or + http://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Serde by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/third_party/rust/mio-named-pipes/appveyor.yml b/third_party/rust/mio-named-pipes/appveyor.yml new file mode 100644 index 0000000000..ed39b90386 --- /dev/null +++ b/third_party/rust/mio-named-pipes/appveyor.yml @@ -0,0 +1,18 @@ +environment: + matrix: + - TARGET: x86_64-pc-windows-msvc + GH_TOKEN: + secure: nHB4fVo+y/Aak+L0nYfrT8Rcs8OfUNm0F2xcIVFVYJ9ehf0CzvCmSMUvWguM0kKp + +install: + - curl -sSf -o rustup-init.exe https://win.rustup.rs/ + - rustup-init.exe -y --default-host %TARGET% + - set PATH=%PATH%;C:\Users\appveyor\.cargo\bin;C:\MinGW\bin + + - rustc -vV + - cargo -vV + +build: false + +test_script: + - cargo test diff --git a/third_party/rust/mio-named-pipes/src/from_raw_arc.rs b/third_party/rust/mio-named-pipes/src/from_raw_arc.rs new file mode 100644 index 0000000000..0f828c6e47 --- /dev/null +++ b/third_party/rust/mio-named-pipes/src/from_raw_arc.rs @@ -0,0 +1,116 @@ +//! A "Manual Arc" which allows manually frobbing the reference count +//! +//! This module contains a copy of the `Arc` found in the standard library, +//! stripped down to the bare bones of what we actually need. The reason this is +//! done is for the ability to concretely know the memory layout of the `Inner` +//! structure of the arc pointer itself (e.g. `ArcInner` in the standard +//! library). +//! +//! We do some unsafe casting from `*mut OVERLAPPED` to a `FromRawArc<T>` to +//! ensure that data lives for the length of an I/O operation, but this means +//! that we have to know the layouts of the structures involved. This +//! representation primarily guarantees that the data, `T` is at the front of +//! the inner pointer always. +//! +//! Note that we're missing out on some various optimizations implemented in the +//! standard library: +//! +//! * The size of `FromRawArc` is actually two words because of the drop flag +//! * The compiler doesn't understand that the pointer in `FromRawArc` is never +//! null, so Option<FromRawArc<T>> is not a nullable pointer. + +use std::ops::Deref; +use std::mem; +use std::sync::atomic::{self, AtomicUsize, Ordering}; + +pub struct FromRawArc<T> { + _inner: *mut Inner<T>, +} + +unsafe impl<T: Sync + Send> Send for FromRawArc<T> { } +unsafe impl<T: Sync + Send> Sync for FromRawArc<T> { } + +#[repr(C)] +struct Inner<T> { + data: T, + cnt: AtomicUsize, +} + +impl<T> FromRawArc<T> { + pub fn new(data: T) -> FromRawArc<T> { + let x = Box::new(Inner { + data: data, + cnt: AtomicUsize::new(1), + }); + FromRawArc { _inner: unsafe { mem::transmute(x) } } + } + + pub unsafe fn from_raw(ptr: *mut T) -> FromRawArc<T> { + // Note that if we could use `mem::transmute` here to get a libstd Arc + // (guaranteed) then we could just use std::sync::Arc, but this is the + // crucial reason this currently exists. + FromRawArc { _inner: ptr as *mut Inner<T> } + } +} + +impl<T> Clone for FromRawArc<T> { + fn clone(&self) -> FromRawArc<T> { + // Atomic ordering of Relaxed lifted from libstd, but the general idea + // is that you need synchronization to communicate this increment to + // another thread, so this itself doesn't need to be synchronized. + unsafe { + (*self._inner).cnt.fetch_add(1, Ordering::Relaxed); + } + FromRawArc { _inner: self._inner } + } +} + +impl<T> Deref for FromRawArc<T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &(*self._inner).data } + } +} + +impl<T> Drop for FromRawArc<T> { + fn drop(&mut self) { + unsafe { + // Atomic orderings lifted from the standard library + if (*self._inner).cnt.fetch_sub(1, Ordering::Release) != 1 { + return + } + atomic::fence(Ordering::Acquire); + drop(mem::transmute::<_, Box<T>>(self._inner)); + } + } +} + +#[cfg(test)] +mod tests { + use super::FromRawArc; + + #[test] + fn smoke() { + let a = FromRawArc::new(1); + assert_eq!(*a, 1); + assert_eq!(*a.clone(), 1); + } + + #[test] + fn drops() { + struct A<'a>(&'a mut bool); + impl<'a> Drop for A<'a> { + fn drop(&mut self) { + *self.0 = true; + } + } + let mut a = false; + { + let a = FromRawArc::new(A(&mut a)); + drop(a.clone()); + assert!(!*a.0); + } + assert!(a); + } +} diff --git a/third_party/rust/mio-named-pipes/src/lib.rs b/third_party/rust/mio-named-pipes/src/lib.rs new file mode 100644 index 0000000000..92b2fc96d2 --- /dev/null +++ b/third_party/rust/mio-named-pipes/src/lib.rs @@ -0,0 +1,717 @@ +//! Windows named pipes bindings for mio. +//! +//! This crate implements bindings for named pipes for the mio crate. This +//! crate compiles on all platforms but only contains anything on Windows. +//! Currently this crate requires mio 0.6.2. +//! +//! On Windows, mio is implemented with an IOCP object at the heart of its +//! `Poll` implementation. For named pipes, this means that all I/O is done in +//! an overlapped fashion and the named pipes themselves are registered with +//! mio's internal IOCP object. Essentially, this crate is using IOCP for +//! bindings with named pipes. +//! +//! Note, though, that IOCP is a *completion* based model whereas mio expects a +//! *readiness* based model. As a result this crate, like with TCP objects in +//! mio, has internal buffering to translate the completion model to a readiness +//! model. This means that this crate is not a zero-cost binding over named +//! pipes on Windows, but rather approximates the performance of mio's TCP +//! implementation on Windows. +//! +//! # Trait implementations +//! +//! The `Read` and `Write` traits are implemented for `NamedPipe` and for +//! `&NamedPipe`. This represents that a named pipe can be concurrently read and +//! written to and also can be read and written to at all. Typically a named +//! pipe needs to be connected to a client before it can be read or written, +//! however. +//! +//! Note that for I/O operations on a named pipe to succeed then the named pipe +//! needs to be associated with an event loop. Until this happens all I/O +//! operations will return a "would block" error. +//! +//! # Managing connections +//! +//! The `NamedPipe` type supports a `connect` method to connect to a client and +//! a `disconnect` method to disconnect from that client. These two methods only +//! work once a named pipe is associated with an event loop. +//! +//! The `connect` method will succeed asynchronously and a completion can be +//! detected once the object receives a writable notification. +//! +//! # Named pipe clients +//! +//! Currently to create a client of a named pipe server then you can use the +//! `OpenOptions` type in the standard library to create a `File` that connects +//! to a named pipe. Afterwards you can use the `into_raw_handle` method coupled +//! with the `NamedPipe::from_raw_handle` method to convert that to a named pipe +//! that can operate asynchronously. Don't forget to pass the +//! `FILE_FLAG_OVERLAPPED` flag when opening the `File`. + +#![cfg(windows)] +#![deny(missing_docs)] + +#[macro_use] +extern crate log; +extern crate mio; +extern crate miow; +extern crate winapi; + +use std::ffi::OsStr; +use std::fmt; +use std::io::prelude::*; +use std::io; +use std::mem; +use std::os::windows::io::*; +use std::slice; +use std::sync::Mutex; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::SeqCst; + +use mio::windows; +use mio::{Registration, Poll, Token, PollOpt, Ready, Evented, SetReadiness}; +use miow::iocp::CompletionStatus; +use miow::pipe; +use winapi::shared::winerror::*; +use winapi::um::ioapiset::*; +use winapi::um::minwinbase::*; + +mod from_raw_arc; +use from_raw_arc::FromRawArc; + +macro_rules! offset_of { + ($t:ty, $($field:ident).+) => ( + &(*(0 as *const $t)).$($field).+ as *const _ as usize + ) +} + +macro_rules! overlapped2arc { + ($e:expr, $t:ty, $($field:ident).+) => ({ + let offset = offset_of!($t, $($field).+); + debug_assert!(offset < mem::size_of::<$t>()); + FromRawArc::from_raw(($e as usize - offset) as *mut $t) + }) +} + +fn would_block() -> io::Error { + io::ErrorKind::WouldBlock.into() +} + +/// Representation of a named pipe on Windows. +/// +/// This structure internally contains a `HANDLE` which represents the named +/// pipe, and also maintains state associated with the mio event loop and active +/// I/O operations that have been scheduled to translate IOCP to a readiness +/// model. +pub struct NamedPipe { + registered: AtomicBool, + ready_registration: Registration, + poll_registration: windows::Binding, + inner: FromRawArc<Inner>, +} + +struct Inner { + handle: pipe::NamedPipe, + readiness: SetReadiness, + + connect: windows::Overlapped, + connecting: AtomicBool, + + read: windows::Overlapped, + write: windows::Overlapped, + + io: Mutex<Io>, + + pool: Mutex<BufferPool>, +} + +struct Io { + read: State, + write: State, + connect_error: Option<io::Error>, +} + +enum State { + None, + Pending(Vec<u8>, usize), + Ok(Vec<u8>, usize), + Err(io::Error), +} + +fn _assert_kinds() { + fn _assert_send<T: Send>() {} + fn _assert_sync<T: Sync>() {} + _assert_send::<NamedPipe>(); + _assert_sync::<NamedPipe>(); +} + +impl NamedPipe { + /// Creates a new named pipe at the specified `addr` given a "reasonable + /// set" of initial configuration options. + /// + /// Currently the configuration options are the [same as miow]. To change + /// these options, you can create a custom named pipe yourself and then use + /// the `FromRawHandle` constructor to convert that type to an instance of a + /// `NamedPipe` in this crate. + /// + /// [same as miow]: https://docs.rs/miow/0.1.4/x86_64-pc-windows-msvc/miow/pipe/struct.NamedPipe.html#method.new + pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> { + NamedPipe::_new(addr.as_ref()) + } + + fn _new(addr: &OsStr) -> io::Result<NamedPipe> { + let pipe = pipe::NamedPipe::new(addr)?; + unsafe { Ok(NamedPipe::from_raw_handle(pipe.into_raw_handle())) } + } + + /// Attempts to call `ConnectNamedPipe`, if possible. + /// + /// This function will attempt to connect this pipe to a client in an + /// asynchronous fashion. If the function immediately establishes a + /// connection to a client then `Ok(())` is returned. Otherwise if a + /// connection attempt was issued and is now in progress then a "would + /// block" error is returned. + /// + /// When the connection is finished then this object will be flagged as + /// being ready for a write, or otherwise in the writable state. + /// + /// # Errors + /// + /// This function will return a "would block" error if the pipe has not yet + /// been registered with an event loop, if the connection operation has + /// previously been issued but has not yet completed, or if the connect + /// itself was issued and didn't finish immediately. + /// + /// Normal I/O errors from the call to `ConnectNamedPipe` are returned + /// immediately. + pub fn connect(&self) -> io::Result<()> { + // Make sure we're associated with an IOCP object + if !self.registered() { + return Err(would_block()) + } + + // "Acquire the connecting lock" or otherwise just make sure we're the + // only operation that's using the `connect` overlapped instance. + if self.inner.connecting.swap(true, SeqCst) { + return Err(would_block()) + } + + // Now that we've flagged ourselves in the connecting state, issue the + // connection attempt. Afterwards interpret the return value and set + // internal state accordingly. + let res = unsafe { + let overlapped = self.inner.connect.as_mut_ptr() as *mut _; + self.inner.handle.connect_overlapped(overlapped) + }; + + match res { + // The connection operation finished immediately, so let's schedule + // reads/writes and such. + Ok(true) => { + trace!("connect done immediately"); + self.inner.connecting.store(false, SeqCst); + Inner::post_register(&self.inner); + Ok(()) + } + + // If the overlapped operation was successful and didn't finish + // immediately then we forget a copy of the arc we hold + // internally. This ensures that when the completion status comes + // in for the I/O operation finishing it'll have a reference + // associated with it and our data will still be valid. The + // `connect_done` function will "reify" this forgotten pointer to + // drop the refcount on the other side. + Ok(false) => { + trace!("connect in progress"); + mem::forget(self.inner.clone()); + Err(would_block()) + } + + // TODO: are we sure no IOCP notification comes in here? + Err(e) => { + trace!("connect error: {}", e); + self.inner.connecting.store(false, SeqCst); + Err(e) + } + } + } + + /// Takes any internal error that has happened after the last I/O operation + /// which hasn't been retrieved yet. + /// + /// This is particularly useful when detecting failed attempts to `connect`. + /// After a completed `connect` flags this pipe as writable then callers + /// must invoke this method to determine whether the connection actually + /// succeeded. If this function returns `None` then a client is connected, + /// otherwise it returns an error of what happened and a client shouldn't be + /// connected. + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + Ok(self.inner.io.lock().unwrap().connect_error.take()) + } + + /// Disconnects this named pipe from a connected client. + /// + /// This function will disconnect the pipe from a connected client, if any, + /// transitively calling the `DisconnectNamedPipe` function. If the + /// disconnection is successful then this object will no longer be readable + /// or writable. + /// + /// After a `disconnect` is issued, then a `connect` may be called again to + /// connect to another client. + pub fn disconnect(&self) -> io::Result<()> { + self.inner.handle.disconnect()?; + self.inner + .readiness + .set_readiness(Ready::empty()) + .expect("event loop seems gone"); + Ok(()) + } + + fn registered(&self) -> bool { + self.registered.load(SeqCst) + } +} + +impl Read for NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + <&NamedPipe as Read>::read(&mut &*self, buf) + } +} + +impl Write for NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + <&NamedPipe as Write>::write(&mut &*self, buf) + } + + fn flush(&mut self) -> io::Result<()> { + <&NamedPipe as Write>::flush(&mut &*self) + } +} + +impl<'a> Read for &'a NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // Make sure we're registered + if !self.registered() { + return Err(would_block()) + } + + let mut state = self.inner.io.lock().unwrap(); + match mem::replace(&mut state.read, State::None) { + // In theory not possible with `ready_registration` checked above, + // but return would block for now. + State::None => Err(would_block()), + + // A read is in flight, still waiting for it to finish + State::Pending(buf, amt) => { + state.read = State::Pending(buf, amt); + Err(would_block()) + } + + // We previously read something into `data`, try to copy out some + // data. If we copy out all the data schedule a new read and + // otherwise store the buffer to get read later. + State::Ok(data, cur) => { + let n = { + let mut remaining = &data[cur..]; + remaining.read(buf)? + }; + let next = cur + n; + if next != data.len() { + state.read = State::Ok(data, next); + } else { + self.inner.put_buffer(data); + Inner::schedule_read(&self.inner, &mut state); + } + Ok(n) + } + + // Looks like an in-flight read hit an error, return that here while + // we schedule a new one. + State::Err(e) => { + Inner::schedule_read(&self.inner, &mut state); + if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) { + Ok(0) + } else { + Err(e) + } + } + } + } +} + +impl<'a> Write for &'a NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // Make sure we're registered + if !self.registered() { + return Err(would_block()) + } + + // Make sure there's no writes pending + let mut io = self.inner.io.lock().unwrap(); + match io.write { + State::None => {} + _ => return Err(would_block()) + } + + // Move `buf` onto the heap and fire off the write + let mut owned_buf = self.inner.get_buffer(); + owned_buf.extend(buf); + Inner::schedule_write(&self.inner, owned_buf, 0, &mut io); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + // TODO: `FlushFileBuffers` somehow? + Ok(()) + } +} + +impl Evented for NamedPipe { + fn register(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> { + // First, register the handle with the event loop + unsafe { + self.poll_registration + .register_handle(&self.inner.handle, token, poll)?; + } + poll.register(&self.ready_registration, token, interest, opts)?; + self.registered.store(true, SeqCst); + Inner::post_register(&self.inner); + Ok(()) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> { + // Validate `Poll` and that we were previously registered + unsafe { + self.poll_registration + .reregister_handle(&self.inner.handle, token, poll)?; + } + + // At this point we should for sure have `ready_registration` unless + // we're racing with `register` above, so just return a bland error if + // the borrow fails. + poll.reregister(&self.ready_registration, token, interest, opts)?; + + Inner::post_register(&self.inner); + + Ok(()) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + // Validate `Poll` and deregister ourselves + unsafe { + self.poll_registration + .deregister_handle(&self.inner.handle, poll)?; + } + poll.deregister(&self.ready_registration) + } +} + +impl AsRawHandle for NamedPipe { + fn as_raw_handle(&self) -> RawHandle { + self.inner.handle.as_raw_handle() + } +} + +impl FromRawHandle for NamedPipe { + unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe { + let (r, s) = Registration::new2(); + NamedPipe { + registered: AtomicBool::new(false), + ready_registration: r, + poll_registration: windows::Binding::new(), + inner: FromRawArc::new(Inner { + handle: pipe::NamedPipe::from_raw_handle(handle), + readiness: s, + connecting: AtomicBool::new(false), + // transmutes to straddle winapi versions (mio 0.6 is on an + // older winapi) + connect: windows::Overlapped::new(mem::transmute(connect_done as fn(_))), + read: windows::Overlapped::new(mem::transmute(read_done as fn(_))), + write: windows::Overlapped::new(mem::transmute(write_done as fn(_))), + io: Mutex::new(Io { + read: State::None, + write: State::None, + connect_error: None, + }), + pool: Mutex::new(BufferPool::with_capacity(2)), + }), + } + } +} + +impl fmt::Debug for NamedPipe { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.handle.fmt(f) + } +} + +impl Drop for NamedPipe { + fn drop(&mut self) { + // Cancel pending reads/connects, but don't cancel writes to ensure that + // everything is flushed out. + unsafe { + if self.inner.connecting.load(SeqCst) { + drop(cancel(&self.inner.handle, &self.inner.connect)); + } + let io = self.inner.io.lock().unwrap(); + match io.read { + State::Pending(..) => { + drop(cancel(&self.inner.handle, &self.inner.read)); + } + _ => {} + } + } + } +} + +impl Inner { + /// Schedules a read to happen in the background, executing an overlapped + /// operation. + /// + /// This function returns `true` if a normal error happens or if the read + /// is scheduled in the background. If the pipe is no longer connected + /// (ERROR_PIPE_LISTENING) then `false` is returned and no read is + /// scheduled. + fn schedule_read(me: &FromRawArc<Inner>, io: &mut Io) -> bool { + // Check to see if a read is already scheduled/completed + match io.read { + State::None => {} + _ => return true, + } + + // Turn off our read readiness + let ready = me.readiness.readiness(); + me.readiness.set_readiness(ready & !Ready::readable()) + .expect("event loop seems gone"); + + // Allocate a buffer and schedule the read. + let mut buf = me.get_buffer(); + let e = unsafe { + let overlapped = me.read.as_mut_ptr() as *mut _; + let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), + buf.capacity()); + me.handle.read_overlapped(slice, overlapped) + }; + + match e { + // See `connect` above for the rationale behind `forget` + Ok(e) => { + trace!("schedule read success: {:?}", e); + io.read = State::Pending(buf, 0); // 0 is ignored on read side + mem::forget(me.clone()); + true + } + + // If ERROR_PIPE_LISTENING happens then it's not a real read error, + // we just need to wait for a connect. + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => { + false + } + + // If some other error happened, though, we're now readable to give + // out the error. + Err(e) => { + trace!("schedule read error: {}", e); + io.read = State::Err(e); + me.readiness.set_readiness(ready | Ready::readable()) + .expect("event loop still seems gone"); + true + } + } + } + + fn schedule_write(me: &FromRawArc<Inner>, + buf: Vec<u8>, + pos: usize, + io: &mut Io) { + // Very similar to `schedule_read` above, just done for the write half. + let ready = me.readiness.readiness(); + me.readiness.set_readiness(ready & !Ready::writable()) + .expect("event loop seems gone"); + + let e = unsafe { + let overlapped = me.write.as_mut_ptr() as *mut _; + me.handle.write_overlapped(&buf[pos..], overlapped) + }; + + match e { + // See `connect` above for the rationale behind `forget` + Ok(e) => { + trace!("schedule write success: {:?}", e); + io.write = State::Pending(buf, pos); + mem::forget(me.clone()) + } + Err(e) => { + trace!("schedule write error: {}", e); + io.write = State::Err(e); + me.add_readiness(Ready::writable()); + } + } + } + + fn add_readiness(&self, ready: Ready) { + self.readiness.set_readiness(ready | self.readiness.readiness()) + .expect("event loop still seems gone"); + } + + fn post_register(me: &FromRawArc<Inner>) { + let mut io = me.io.lock().unwrap(); + if Inner::schedule_read(&me, &mut io) { + if let State::None = io.write { + me.add_readiness(Ready::writable()); + } + } + } + + fn get_buffer(&self) -> Vec<u8> { + self.pool.lock().unwrap().get(8 * 1024) + } + + fn put_buffer(&self, buf: Vec<u8>) { + self.pool.lock().unwrap().put(buf) + } +} + +unsafe fn cancel<T: AsRawHandle>(handle: &T, + overlapped: &windows::Overlapped) -> io::Result<()> { + let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_mut_ptr() as *mut _); + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} + +fn connect_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + trace!("connect done"); + + // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `connect` above. + let me = unsafe { + overlapped2arc!(status.overlapped(), Inner, connect) + }; + + // Flag ourselves as no longer using the `connect` overlapped instances. + let prev = me.connecting.swap(false, SeqCst); + assert!(prev, "wasn't previously connecting"); + + // Stash away our connect error if one happened + debug_assert_eq!(status.bytes_transferred(), 0); + unsafe { + match me.handle.result(status.overlapped()) { + Ok(n) => debug_assert_eq!(n, 0), + Err(e) => me.io.lock().unwrap().connect_error = Some(e), + } + } + + // We essentially just finished a registration, so kick off a + // read and register write readiness. + Inner::post_register(&me); +} + +fn read_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + trace!("read finished, bytes={}", status.bytes_transferred()); + + // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `schedule_read` above. + let me = unsafe { + overlapped2arc!(status.overlapped(), Inner, read) + }; + + // Move from the `Pending` to `Ok` state. + let mut io = me.io.lock().unwrap(); + let mut buf = match mem::replace(&mut io.read, State::None) { + State::Pending(buf, _) => buf, + _ => unreachable!(), + }; + unsafe { + match me.handle.result(status.overlapped()) { + Ok(n) => { + debug_assert_eq!(status.bytes_transferred() as usize, n); + buf.set_len(status.bytes_transferred() as usize); + io.read = State::Ok(buf, 0); + } + Err(e) => { + debug_assert_eq!(status.bytes_transferred(), 0); + io.read = State::Err(e); + } + } + } + + // Flag our readiness that we've got data. + me.add_readiness(Ready::readable()); +} + +fn write_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + trace!("write finished, bytes={}", status.bytes_transferred()); + // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that + // the refcount is available to us due to the `mem::forget` in + // `schedule_write` above. + let me = unsafe { + overlapped2arc!(status.overlapped(), Inner, write) + }; + + // Make the state change out of `Pending`. If we wrote the entire buffer + // then we're writable again and otherwise we schedule another write. + let mut io = me.io.lock().unwrap(); + let (buf, pos) = match mem::replace(&mut io.write, State::None) { + State::Pending(buf, pos) => (buf, pos), + _ => unreachable!(), + }; + + unsafe { + match me.handle.result(status.overlapped()) { + Ok(n) => { + debug_assert_eq!(status.bytes_transferred() as usize, n); + let new_pos = pos + (status.bytes_transferred() as usize); + if new_pos == buf.len() { + me.put_buffer(buf); + me.add_readiness(Ready::writable()); + } else { + Inner::schedule_write(&me, buf, new_pos, &mut io); + } + } + Err(e) => { + debug_assert_eq!(status.bytes_transferred(), 0); + io.write = State::Err(e); + me.add_readiness(Ready::writable()); + } + } + } +} + +// Based on https://github.com/tokio-rs/mio/blob/13d5fc9/src/sys/windows/buffer_pool.rs +struct BufferPool { + pool: Vec<Vec<u8>>, +} + +impl BufferPool { + fn with_capacity(cap: usize) -> BufferPool { + BufferPool { + pool: Vec::with_capacity(cap), + } + } + + fn get(&mut self, default_cap: usize) -> Vec<u8> { + self.pool.pop().unwrap_or_else(|| Vec::with_capacity(default_cap)) + } + + fn put(&mut self, mut buf: Vec<u8>) { + if self.pool.len() < self.pool.capacity() { + buf.clear(); + self.pool.push(buf); + } + } +} diff --git a/third_party/rust/mio-named-pipes/tests/smoke.rs b/third_party/rust/mio-named-pipes/tests/smoke.rs new file mode 100644 index 0000000000..1035941369 --- /dev/null +++ b/third_party/rust/mio-named-pipes/tests/smoke.rs @@ -0,0 +1,274 @@ +extern crate mio; +extern crate mio_named_pipes; +extern crate env_logger; +extern crate rand; +extern crate winapi; + +#[macro_use] +extern crate log; + +use std::fs::OpenOptions; +use std::io::prelude::*; +use std::io; +use std::os::windows::fs::*; +use std::os::windows::io::*; +use std::time::Duration; + +use mio::{Poll, Ready, Token, PollOpt, Events}; +use mio_named_pipes::NamedPipe; +use rand::Rng; +use winapi::um::winbase::*; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {}", stringify!($e), e), + }) +} + +fn server() -> (NamedPipe, String) { + let num: u64 = rand::thread_rng().gen(); + let name = format!(r"\\.\pipe\my-pipe-{}", num); + let pipe = t!(NamedPipe::new(&name)); + (pipe, name) +} + +fn client(name: &str) -> NamedPipe { + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + let file = t!(opts.open(name)); + unsafe { + NamedPipe::from_raw_handle(file.into_raw_handle()) + } +} + +fn pipe() -> (NamedPipe, NamedPipe) { + let (pipe, name) = server(); + (pipe, client(&name)) +} + +#[test] +fn writable_after_register() { + drop(env_logger::init()); + + let (server, client) = pipe(); + let poll = t!(Poll::new()); + t!(poll.register(&server, + Token(0), + Ready::writable() | Ready::readable(), + PollOpt::edge())); + t!(poll.register(&client, + Token(1), + Ready::writable(), + PollOpt::edge())); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, None)); + + let events = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", events); + assert!(events.iter().any(|e| { + e.token() == Token(0) && e.readiness() == Ready::writable() + })); + assert!(events.iter().any(|e| { + e.token() == Token(1) && e.readiness() == Ready::writable() + })); +} + +#[test] +fn write_then_read() { + drop(env_logger::init()); + + let (mut server, mut client) = pipe(); + let poll = t!(Poll::new()); + t!(poll.register(&server, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + t!(poll.register(&client, + Token(1), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, None)); + + assert_eq!(t!(client.write(b"1234")), 4); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", events); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.readiness().is_readable() { + break + } + } + } + + let mut buf = [0; 10]; + assert_eq!(t!(server.read(&mut buf)), 4); + assert_eq!(&buf[..4], b"1234"); +} + +#[test] +fn connect_before_client() { + drop(env_logger::init()); + + let (server, name) = server(); + let poll = t!(Poll::new()); + t!(poll.register(&server, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, Some(Duration::new(0, 0)))); + let e = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", e); + assert_eq!(e.len(), 0); + assert_eq!(server.connect().err().unwrap().kind(), + io::ErrorKind::WouldBlock); + + let client = client(&name); + t!(poll.register(&client, + Token(1), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + loop { + t!(poll.poll(&mut events, None)); + let e = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", e); + if let Some(event) = e.iter().find(|e| e.token() == Token(0)) { + if event.readiness().is_writable() { + break + } + } + } +} + +#[test] +fn connect_after_client() { + drop(env_logger::init()); + + let (server, name) = server(); + let poll = t!(Poll::new()); + t!(poll.register(&server, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, Some(Duration::new(0, 0)))); + let e = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", e); + assert_eq!(e.len(), 0); + + let client = client(&name); + t!(poll.register(&client, + Token(1), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + t!(server.connect()); + loop { + t!(poll.poll(&mut events, None)); + let e = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", e); + if let Some(event) = e.iter().find(|e| e.token() == Token(0)) { + if event.readiness().is_writable() { + break + } + } + } +} + +#[test] +fn write_then_drop() { + drop(env_logger::init()); + + let (mut server, mut client) = pipe(); + let poll = t!(Poll::new()); + t!(poll.register(&server, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + t!(poll.register(&client, + Token(1), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + assert_eq!(t!(client.write(b"1234")), 4); + drop(client); + + let mut events = Events::with_capacity(128); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", events); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.readiness().is_readable() { + break + } + } + } + + let mut buf = [0; 10]; + assert_eq!(t!(server.read(&mut buf)), 4); + assert_eq!(&buf[..4], b"1234"); +} + +#[test] +fn connect_twice() { + drop(env_logger::init()); + + let (mut server, name) = server(); + let c1 = client(&name); + let poll = t!(Poll::new()); + t!(poll.register(&server, + Token(0), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + t!(poll.register(&c1, + Token(1), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + drop(c1); + + let mut events = Events::with_capacity(128); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", events); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.readiness().is_readable() { + break + } + } + } + + let mut buf = [0; 10]; + assert_eq!(t!(server.read(&mut buf)), 0); + t!(server.disconnect()); + assert_eq!(server.connect().err().unwrap().kind(), + io::ErrorKind::WouldBlock); + + let c2 = client(&name); + t!(poll.register(&c2, + Token(2), + Ready::readable() | Ready::writable(), + PollOpt::edge())); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::<Vec<_>>(); + debug!("events {:?}", events); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.readiness().is_writable() { + break + } + } + } +} |