summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio/src/sys
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/mio/src/sys
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/mio/src/sys')
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/awakener.rs73
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/eventedfd.rs263
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/handles.rs78
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/mod.rs177
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/net.rs444
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/ready.rs181
-rw-r--r--third_party/rust/mio/src/sys/fuchsia/selector.rs353
-rw-r--r--third_party/rust/mio/src/sys/mod.rs56
-rw-r--r--third_party/rust/mio/src/sys/unix/awakener.rs74
-rw-r--r--third_party/rust/mio/src/sys/unix/dlsym.rs47
-rw-r--r--third_party/rust/mio/src/sys/unix/epoll.rs260
-rw-r--r--third_party/rust/mio/src/sys/unix/eventedfd.rs107
-rw-r--r--third_party/rust/mio/src/sys/unix/io.rs107
-rw-r--r--third_party/rust/mio/src/sys/unix/kqueue.rs360
-rw-r--r--third_party/rust/mio/src/sys/unix/mod.rs95
-rw-r--r--third_party/rust/mio/src/sys/unix/ready.rs499
-rw-r--r--third_party/rust/mio/src/sys/unix/tcp.rs286
-rw-r--r--third_party/rust/mio/src/sys/unix/udp.rs181
-rw-r--r--third_party/rust/mio/src/sys/unix/uds.rs262
-rw-r--r--third_party/rust/mio/src/sys/unix/uio.rs44
-rw-r--r--third_party/rust/mio/src/sys/windows/awakener.rs66
-rw-r--r--third_party/rust/mio/src/sys/windows/buffer_pool.rs20
-rw-r--r--third_party/rust/mio/src/sys/windows/from_raw_arc.rs116
-rw-r--r--third_party/rust/mio/src/sys/windows/mod.rs187
-rw-r--r--third_party/rust/mio/src/sys/windows/selector.rs534
-rw-r--r--third_party/rust/mio/src/sys/windows/tcp.rs851
-rw-r--r--third_party/rust/mio/src/sys/windows/udp.rs413
27 files changed, 6134 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/sys/fuchsia/awakener.rs b/third_party/rust/mio/src/sys/fuchsia/awakener.rs
new file mode 100644
index 0000000000..19bc762429
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/awakener.rs
@@ -0,0 +1,73 @@
+use {io, poll, Evented, Ready, Poll, PollOpt, Token};
+use zircon;
+use std::sync::{Arc, Mutex, Weak};
+
+pub struct Awakener {
+ /// Token and weak reference to the port on which Awakener was registered.
+ ///
+ /// When `Awakener::wakeup` is called, these are used to send a wakeup message to the port.
+ inner: Mutex<Option<(Token, Weak<zircon::Port>)>>,
+}
+
+impl Awakener {
+ /// Create a new `Awakener`.
+ pub fn new() -> io::Result<Awakener> {
+ Ok(Awakener {
+ inner: Mutex::new(None)
+ })
+ }
+
+ /// Send a wakeup signal to the `Selector` on which the `Awakener` was registered.
+ pub fn wakeup(&self) -> io::Result<()> {
+ let inner_locked = self.inner.lock().unwrap();
+ let &(token, ref weak_port) =
+ inner_locked.as_ref().expect("Called wakeup on unregistered awakener.");
+
+ let port = weak_port.upgrade().expect("Tried to wakeup a closed port.");
+
+ let status = 0; // arbitrary
+ let packet = zircon::Packet::from_user_packet(
+ token.0 as u64, status, zircon::UserPacket::from_u8_array([0; 32]));
+
+ Ok(port.queue(&packet)?)
+ }
+
+ pub fn cleanup(&self) {}
+}
+
+impl Evented for Awakener {
+ fn register(&self,
+ poll: &Poll,
+ token: Token,
+ _events: Ready,
+ _opts: PollOpt) -> io::Result<()>
+ {
+ let mut inner_locked = self.inner.lock().unwrap();
+ if inner_locked.is_some() {
+ panic!("Called register on already-registered Awakener.");
+ }
+ *inner_locked = Some((token, Arc::downgrade(poll::selector(poll).port())));
+
+ Ok(())
+ }
+
+ fn reregister(&self,
+ poll: &Poll,
+ token: Token,
+ _events: Ready,
+ _opts: PollOpt) -> io::Result<()>
+ {
+ let mut inner_locked = self.inner.lock().unwrap();
+ *inner_locked = Some((token, Arc::downgrade(poll::selector(poll).port())));
+
+ Ok(())
+ }
+
+ fn deregister(&self, _poll: &Poll) -> io::Result<()>
+ {
+ let mut inner_locked = self.inner.lock().unwrap();
+ *inner_locked = None;
+
+ Ok(())
+ }
+} \ No newline at end of file
diff --git a/third_party/rust/mio/src/sys/fuchsia/eventedfd.rs b/third_party/rust/mio/src/sys/fuchsia/eventedfd.rs
new file mode 100644
index 0000000000..e23d0c4a1e
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/eventedfd.rs
@@ -0,0 +1,263 @@
+use {io, poll, Evented, Ready, Poll, PollOpt, Token};
+use libc;
+use zircon;
+use zircon::AsHandleRef;
+use sys::fuchsia::{DontDrop, poll_opts_to_wait_async, sys};
+use std::mem;
+use std::os::unix::io::RawFd;
+use std::sync::{Arc, Mutex};
+
+/// Properties of an `EventedFd`'s current registration
+#[derive(Debug)]
+pub struct EventedFdRegistration {
+ token: Token,
+ handle: DontDrop<zircon::Handle>,
+ rereg_signals: Option<(zircon::Signals, zircon::WaitAsyncOpts)>,
+}
+
+impl EventedFdRegistration {
+ unsafe fn new(token: Token,
+ raw_handle: sys::zx_handle_t,
+ rereg_signals: Option<(zircon::Signals, zircon::WaitAsyncOpts)>,
+ ) -> Self
+ {
+ EventedFdRegistration {
+ token: token,
+ handle: DontDrop::new(zircon::Handle::from_raw(raw_handle)),
+ rereg_signals: rereg_signals
+ }
+ }
+
+ pub fn rereg_signals(&self) -> Option<(zircon::Signals, zircon::WaitAsyncOpts)> {
+ self.rereg_signals
+ }
+}
+
+/// An event-ed file descriptor. The file descriptor is owned by this structure.
+#[derive(Debug)]
+pub struct EventedFdInner {
+ /// Properties of the current registration.
+ registration: Mutex<Option<EventedFdRegistration>>,
+
+ /// Owned file descriptor.
+ ///
+ /// `fd` is closed on `Drop`, so modifying `fd` is a memory-unsafe operation.
+ fd: RawFd,
+
+ /// Owned `fdio_t` pointer.
+ fdio: *const sys::fdio_t,
+}
+
+impl EventedFdInner {
+ pub fn rereg_for_level(&self, port: &zircon::Port) {
+ let registration_opt = self.registration.lock().unwrap();
+ if let Some(ref registration) = *registration_opt {
+ if let Some((rereg_signals, rereg_opts)) = registration.rereg_signals {
+ let _res =
+ registration
+ .handle.inner_ref()
+ .wait_async_handle(
+ port,
+ registration.token.0 as u64,
+ rereg_signals,
+ rereg_opts);
+ }
+ }
+ }
+
+ pub fn registration(&self) -> &Mutex<Option<EventedFdRegistration>> {
+ &self.registration
+ }
+
+ pub fn fdio(&self) -> &sys::fdio_t {
+ unsafe { &*self.fdio }
+ }
+}
+
+impl Drop for EventedFdInner {
+ fn drop(&mut self) {
+ unsafe {
+ sys::__fdio_release(self.fdio);
+ let _ = libc::close(self.fd);
+ }
+ }
+}
+
+// `EventedInner` must be manually declared `Send + Sync` because it contains a `RawFd` and a
+// `*const sys::fdio_t`. These are only used to make thread-safe system calls, so accessing
+// them is entirely thread-safe.
+//
+// Note: one minor exception to this are the calls to `libc::close` and `__fdio_release`, which
+// happen on `Drop`. These accesses are safe because `drop` can only be called at most once from
+// a single thread, and after it is called no other functions can be called on the `EventedFdInner`.
+unsafe impl Sync for EventedFdInner {}
+unsafe impl Send for EventedFdInner {}
+
+#[derive(Clone, Debug)]
+pub struct EventedFd {
+ pub inner: Arc<EventedFdInner>
+}
+
+impl EventedFd {
+ pub unsafe fn new(fd: RawFd) -> Self {
+ let fdio = sys::__fdio_fd_to_io(fd);
+ assert!(fdio != ::std::ptr::null(), "FileDescriptor given to EventedFd must be valid.");
+
+ EventedFd {
+ inner: Arc::new(EventedFdInner {
+ registration: Mutex::new(None),
+ fd: fd,
+ fdio: fdio,
+ })
+ }
+ }
+
+ fn handle_and_signals_for_events(&self, interest: Ready, opts: PollOpt)
+ -> (sys::zx_handle_t, zircon::Signals)
+ {
+ let epoll_events = ioevent_to_epoll(interest, opts);
+
+ unsafe {
+ let mut raw_handle: sys::zx_handle_t = mem::uninitialized();
+ let mut signals: sys::zx_signals_t = mem::uninitialized();
+ sys::__fdio_wait_begin(self.inner.fdio, epoll_events, &mut raw_handle, &mut signals);
+
+ (raw_handle, signals)
+ }
+ }
+
+ fn register_with_lock(
+ &self,
+ registration: &mut Option<EventedFdRegistration>,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ if registration.is_some() {
+ return Err(io::Error::new(
+ io::ErrorKind::AlreadyExists,
+ "Called register on an already registered file descriptor."));
+ }
+
+ let (raw_handle, signals) = self.handle_and_signals_for_events(interest, opts);
+
+ let needs_rereg = opts.is_level() && !opts.is_oneshot();
+
+ // If we need to reregister, then each registration should be `oneshot`
+ let opts = opts | if needs_rereg { PollOpt::oneshot() } else { PollOpt::empty() };
+
+ let rereg_signals = if needs_rereg {
+ Some((signals, poll_opts_to_wait_async(opts)))
+ } else {
+ None
+ };
+
+ *registration = Some(
+ unsafe { EventedFdRegistration::new(token, raw_handle, rereg_signals) }
+ );
+
+ // We don't have ownership of the handle, so we can't drop it
+ let handle = DontDrop::new(unsafe { zircon::Handle::from_raw(raw_handle) });
+
+ let registered = poll::selector(poll)
+ .register_fd(handle.inner_ref(), self, token, signals, opts);
+
+ if registered.is_err() {
+ *registration = None;
+ }
+
+ registered
+ }
+
+ fn deregister_with_lock(
+ &self,
+ registration: &mut Option<EventedFdRegistration>,
+ poll: &Poll) -> io::Result<()>
+ {
+ let old_registration = if let Some(old_reg) = registration.take() {
+ old_reg
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::NotFound,
+ "Called rereregister on an unregistered file descriptor."))
+ };
+
+ poll::selector(poll)
+ .deregister_fd(old_registration.handle.inner_ref(), old_registration.token)
+ }
+}
+
+impl Evented for EventedFd {
+ fn register(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.register_with_lock(
+ &mut *self.inner.registration.lock().unwrap(),
+ poll,
+ token,
+ interest,
+ opts)
+ }
+
+ fn reregister(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ // Take out the registration lock
+ let mut registration_lock = self.inner.registration.lock().unwrap();
+
+ // Deregister
+ self.deregister_with_lock(&mut *registration_lock, poll)?;
+
+ self.register_with_lock(
+ &mut *registration_lock,
+ poll,
+ token,
+ interest,
+ opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ let mut registration_lock = self.inner.registration.lock().unwrap();
+ self.deregister_with_lock(&mut *registration_lock, poll)
+ }
+}
+
+fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
+ use event_imp::ready_from_usize;
+ const HUP: usize = 0b01000;
+
+ let mut kind = 0;
+
+ if interest.is_readable() {
+ kind |= libc::EPOLLIN;
+ }
+
+ if interest.is_writable() {
+ kind |= libc::EPOLLOUT;
+ }
+
+ if interest.contains(ready_from_usize(HUP)) {
+ kind |= libc::EPOLLRDHUP;
+ }
+
+ if opts.is_edge() {
+ kind |= libc::EPOLLET;
+ }
+
+ if opts.is_oneshot() {
+ kind |= libc::EPOLLONESHOT;
+ }
+
+ if opts.is_level() {
+ kind &= !libc::EPOLLET;
+ }
+
+ kind as u32
+}
diff --git a/third_party/rust/mio/src/sys/fuchsia/handles.rs b/third_party/rust/mio/src/sys/fuchsia/handles.rs
new file mode 100644
index 0000000000..ae6f07f6d9
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/handles.rs
@@ -0,0 +1,78 @@
+use {io, poll, Evented, Ready, Poll, PollOpt, Token};
+use zircon_sys::zx_handle_t;
+use std::sync::Mutex;
+
+/// Wrapper for registering a `HandleBase` type with mio.
+#[derive(Debug)]
+pub struct EventedHandle {
+ /// The handle to be registered.
+ handle: zx_handle_t,
+
+ /// The current `Token` with which the handle is registered with mio.
+ token: Mutex<Option<Token>>,
+}
+
+impl EventedHandle {
+ /// Create a new `EventedHandle` which can be registered with mio
+ /// in order to receive event notifications.
+ ///
+ /// The underlying handle must not be dropped while the
+ /// `EventedHandle` still exists.
+ pub unsafe fn new(handle: zx_handle_t) -> Self {
+ EventedHandle {
+ handle: handle,
+ token: Mutex::new(None),
+ }
+ }
+
+ /// Get the underlying handle being registered.
+ pub fn get_handle(&self) -> zx_handle_t {
+ self.handle
+ }
+}
+
+impl Evented for EventedHandle {
+ fn register(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ let mut this_token = self.token.lock().unwrap();
+ {
+ poll::selector(poll).register_handle(self.handle, token, interest, opts)?;
+ *this_token = Some(token);
+ }
+ Ok(())
+ }
+
+ fn reregister(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ let mut this_token = self.token.lock().unwrap();
+ {
+ poll::selector(poll).deregister_handle(self.handle, token)?;
+ *this_token = None;
+ poll::selector(poll).register_handle(self.handle, token, interest, opts)?;
+ *this_token = Some(token);
+ }
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ let mut this_token = self.token.lock().unwrap();
+ let token = if let Some(token) = *this_token { token } else {
+ return Err(io::Error::new(
+ io::ErrorKind::NotFound,
+ "Attempted to deregister an unregistered handle."))
+ };
+ {
+ poll::selector(poll).deregister_handle(self.handle, token)?;
+ *this_token = None;
+ }
+ Ok(())
+ }
+}
diff --git a/third_party/rust/mio/src/sys/fuchsia/mod.rs b/third_party/rust/mio/src/sys/fuchsia/mod.rs
new file mode 100644
index 0000000000..10728fc8dc
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/mod.rs
@@ -0,0 +1,177 @@
+use {io, Ready, PollOpt};
+use libc;
+use zircon;
+use std::mem;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use std::ops::{Deref, DerefMut};
+use std::os::unix::io::RawFd;
+
+mod awakener;
+mod handles;
+mod eventedfd;
+mod net;
+mod ready;
+mod selector;
+
+use self::eventedfd::{EventedFd, EventedFdInner};
+use self::ready::assert_fuchsia_ready_repr;
+
+pub use self::awakener::Awakener;
+pub use self::handles::EventedHandle;
+pub use self::net::{TcpListener, TcpStream, UdpSocket};
+pub use self::selector::{Events, Selector};
+pub use self::ready::{FuchsiaReady, zx_signals_t};
+
+// Set non-blocking (workaround since the std version doesn't work in fuchsia)
+// TODO: fix the std version and replace this
+pub fn set_nonblock(fd: RawFd) -> io::Result<()> {
+ cvt(unsafe { libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) }).map(|_| ())
+}
+
+/// Workaround until fuchsia's recv_from is fixed
+unsafe fn recv_from(fd: RawFd, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let flags = 0;
+
+ let n = cvt(
+ libc::recv(fd,
+ buf.as_mut_ptr() as *mut libc::c_void,
+ buf.len(),
+ flags)
+ )?;
+
+ // random address-- we don't use it
+ let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
+ Ok((n as usize, addr))
+}
+
+mod sys {
+ #![allow(non_camel_case_types)]
+ use std::os::unix::io::RawFd;
+ pub use zircon_sys::{zx_handle_t, zx_signals_t};
+
+ // 17 fn pointers we don't need for mio :)
+ pub type fdio_ops_t = [usize; 17];
+
+ pub type atomic_int_fast32_t = usize; // TODO: https://github.com/rust-lang/libc/issues/631
+
+ #[repr(C)]
+ pub struct fdio_t {
+ pub ops: *const fdio_ops_t,
+ pub magic: u32,
+ pub refcount: atomic_int_fast32_t,
+ pub dupcount: u32,
+ pub flags: u32,
+ }
+
+ #[link(name="fdio")]
+ extern {
+ pub fn __fdio_fd_to_io(fd: RawFd) -> *const fdio_t;
+ pub fn __fdio_release(io: *const fdio_t);
+
+ pub fn __fdio_wait_begin(
+ io: *const fdio_t,
+ events: u32,
+ handle_out: &mut zx_handle_t,
+ signals_out: &mut zx_signals_t,
+ );
+ pub fn __fdio_wait_end(
+ io: *const fdio_t,
+ signals: zx_signals_t,
+ events_out: &mut u32,
+ );
+ }
+}
+
+fn epoll_event_to_ready(epoll: u32) -> Ready {
+ let epoll = epoll as i32; // casts the bits directly
+ let mut kind = Ready::empty();
+
+ if (epoll & libc::EPOLLIN) != 0 || (epoll & libc::EPOLLPRI) != 0 {
+ kind = kind | Ready::readable();
+ }
+
+ if (epoll & libc::EPOLLOUT) != 0 {
+ kind = kind | Ready::writable();
+ }
+
+ kind
+
+ /* TODO: support?
+ // EPOLLHUP - Usually means a socket error happened
+ if (epoll & libc::EPOLLERR) != 0 {
+ kind = kind | UnixReady::error();
+ }
+
+ if (epoll & libc::EPOLLRDHUP) != 0 || (epoll & libc::EPOLLHUP) != 0 {
+ kind = kind | UnixReady::hup();
+ }
+ */
+}
+
+fn poll_opts_to_wait_async(poll_opts: PollOpt) -> zircon::WaitAsyncOpts {
+ if poll_opts.is_oneshot() {
+ zircon::WaitAsyncOpts::Once
+ } else {
+ zircon::WaitAsyncOpts::Repeating
+ }
+}
+
+trait IsMinusOne {
+ fn is_minus_one(&self) -> bool;
+}
+
+impl IsMinusOne for i32 {
+ fn is_minus_one(&self) -> bool { *self == -1 }
+}
+
+impl IsMinusOne for isize {
+ fn is_minus_one(&self) -> bool { *self == -1 }
+}
+
+fn cvt<T: IsMinusOne>(t: T) -> ::io::Result<T> {
+ use std::io;
+
+ if t.is_minus_one() {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(t)
+ }
+}
+
+/// Utility type to prevent the type inside of it from being dropped.
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+struct DontDrop<T>(Option<T>);
+
+impl<T> DontDrop<T> {
+ fn new(t: T) -> DontDrop<T> {
+ DontDrop(Some(t))
+ }
+
+ fn inner_ref(&self) -> &T {
+ self.0.as_ref().unwrap()
+ }
+
+ fn inner_mut(&mut self) -> &mut T {
+ self.0.as_mut().unwrap()
+ }
+}
+
+impl<T> Deref for DontDrop<T> {
+ type Target = T;
+ fn deref(&self) -> &Self::Target {
+ self.inner_ref()
+ }
+}
+
+impl<T> DerefMut for DontDrop<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.inner_mut()
+ }
+}
+
+impl<T> Drop for DontDrop<T> {
+ fn drop(&mut self) {
+ let inner = self.0.take();
+ mem::forget(inner);
+ }
+}
diff --git a/third_party/rust/mio/src/sys/fuchsia/net.rs b/third_party/rust/mio/src/sys/fuchsia/net.rs
new file mode 100644
index 0000000000..d43ad27bb5
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/net.rs
@@ -0,0 +1,444 @@
+use {io, Evented, Ready, Poll, PollOpt, Token};
+use iovec::IoVec;
+use iovec::unix as iovec;
+use libc;
+use net2::TcpStreamExt;
+#[allow(unused_imports)] // only here for Rust 1.8
+use net2::UdpSocketExt;
+use sys::fuchsia::{recv_from, set_nonblock, EventedFd, DontDrop};
+use std::cmp;
+use std::io::{Read, Write};
+use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::os::unix::io::AsRawFd;
+use std::time::Duration;
+
+#[derive(Debug)]
+pub struct TcpStream {
+ io: DontDrop<net::TcpStream>,
+ evented_fd: EventedFd,
+}
+
+impl TcpStream {
+ pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
+ try!(set_nonblock(stream.as_raw_fd()));
+
+ let connected = stream.connect(addr);
+ match connected {
+ Ok(..) => {}
+ Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
+ Err(e) => return Err(e),
+ }
+
+ let evented_fd = unsafe { EventedFd::new(stream.as_raw_fd()) };
+
+ return Ok(TcpStream {
+ io: DontDrop::new(stream),
+ evented_fd: evented_fd,
+ })
+ }
+
+ pub fn from_stream(stream: net::TcpStream) -> TcpStream {
+ let evented_fd = unsafe { EventedFd::new(stream.as_raw_fd()) };
+
+ TcpStream {
+ io: DontDrop::new(stream),
+ evented_fd: evented_fd,
+ }
+ }
+
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.peer_addr()
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpStream> {
+ self.io.try_clone().map(|s| {
+ let evented_fd = unsafe { EventedFd::new(s.as_raw_fd()) };
+ TcpStream {
+ io: DontDrop::new(s),
+ evented_fd: evented_fd,
+ }
+ })
+ }
+
+ pub fn shutdown(&self, how: net::Shutdown) -> io::Result<()> {
+ self.io.shutdown(how)
+ }
+
+ pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
+ self.io.set_nodelay(nodelay)
+ }
+
+ pub fn nodelay(&self) -> io::Result<bool> {
+ self.io.nodelay()
+ }
+
+ pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.io.set_recv_buffer_size(size)
+ }
+
+ pub fn recv_buffer_size(&self) -> io::Result<usize> {
+ self.io.recv_buffer_size()
+ }
+
+ pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.io.set_send_buffer_size(size)
+ }
+
+ pub fn send_buffer_size(&self) -> io::Result<usize> {
+ self.io.send_buffer_size()
+ }
+
+ pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
+ self.io.set_keepalive(keepalive)
+ }
+
+ pub fn keepalive(&self) -> io::Result<Option<Duration>> {
+ self.io.keepalive()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.io.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.io.ttl()
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.io.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.io.only_v6()
+ }
+
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ self.io.set_linger(dur)
+ }
+
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ self.io.linger()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
+ pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.peek(buf)
+ }
+
+ pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+ unsafe {
+ let slice = iovec::as_os_slice_mut(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let rc = libc::readv(self.io.as_raw_fd(),
+ slice.as_ptr(),
+ len as libc::c_int);
+ if rc < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(rc as usize)
+ }
+ }
+ }
+
+ pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+ unsafe {
+ let slice = iovec::as_os_slice(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let rc = libc::writev(self.io.as_raw_fd(),
+ slice.as_ptr(),
+ len as libc::c_int);
+ if rc < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(rc as usize)
+ }
+ }
+ }
+}
+
+impl<'a> Read for &'a TcpStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.inner_ref().read(buf)
+ }
+}
+
+impl<'a> Write for &'a TcpStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.io.inner_ref().write(buf)
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.io.inner_ref().flush()
+ }
+}
+
+impl Evented for TcpStream {
+ fn register(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.evented_fd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.evented_fd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.evented_fd.deregister(poll)
+ }
+}
+
+#[derive(Debug)]
+pub struct TcpListener {
+ io: DontDrop<net::TcpListener>,
+ evented_fd: EventedFd,
+}
+
+impl TcpListener {
+ pub fn new(inner: net::TcpListener) -> io::Result<TcpListener> {
+ set_nonblock(inner.as_raw_fd())?;
+
+ let evented_fd = unsafe { EventedFd::new(inner.as_raw_fd()) };
+
+ Ok(TcpListener {
+ io: DontDrop::new(inner),
+ evented_fd: evented_fd,
+ })
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpListener> {
+ self.io.try_clone().map(|io| {
+ let evented_fd = unsafe { EventedFd::new(io.as_raw_fd()) };
+ TcpListener {
+ io: DontDrop::new(io),
+ evented_fd: evented_fd,
+ }
+ })
+ }
+
+ pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
+ self.io.accept().and_then(|(s, a)| {
+ set_nonblock(s.as_raw_fd())?;
+ let evented_fd = unsafe { EventedFd::new(s.as_raw_fd()) };
+ return Ok((TcpStream {
+ io: DontDrop::new(s),
+ evented_fd: evented_fd,
+ }, a))
+ })
+ }
+
+ #[allow(deprecated)]
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.io.set_only_v6(only_v6)
+ }
+
+ #[allow(deprecated)]
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.io.only_v6()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.io.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.io.ttl()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+}
+
+impl Evented for TcpListener {
+ fn register(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.evented_fd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.evented_fd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.evented_fd.deregister(poll)
+ }
+}
+
+#[derive(Debug)]
+pub struct UdpSocket {
+ io: DontDrop<net::UdpSocket>,
+ evented_fd: EventedFd,
+}
+
+impl UdpSocket {
+ pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> {
+ set_nonblock(socket.as_raw_fd())?;
+
+ let evented_fd = unsafe { EventedFd::new(socket.as_raw_fd()) };
+
+ Ok(UdpSocket {
+ io: DontDrop::new(socket),
+ evented_fd: evented_fd,
+ })
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<UdpSocket> {
+ self.io.try_clone().and_then(|io| {
+ UdpSocket::new(io)
+ })
+ }
+
+ pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
+ self.io.send_to(buf, target)
+ }
+
+ pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ unsafe { recv_from(self.io.as_raw_fd(), buf) }
+ }
+
+ pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io.send(buf)
+ }
+
+ pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.recv(buf)
+ }
+
+ pub fn connect(&self, addr: SocketAddr)
+ -> io::Result<()> {
+ self.io.connect(addr)
+ }
+
+ pub fn broadcast(&self) -> io::Result<bool> {
+ self.io.broadcast()
+ }
+
+ pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
+ self.io.set_broadcast(on)
+ }
+
+ pub fn multicast_loop_v4(&self) -> io::Result<bool> {
+ self.io.multicast_loop_v4()
+ }
+
+ pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
+ self.io.set_multicast_loop_v4(on)
+ }
+
+ pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
+ self.io.multicast_ttl_v4()
+ }
+
+ pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
+ self.io.set_multicast_ttl_v4(ttl)
+ }
+
+ pub fn multicast_loop_v6(&self) -> io::Result<bool> {
+ self.io.multicast_loop_v6()
+ }
+
+ pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
+ self.io.set_multicast_loop_v6(on)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.io.ttl()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.io.set_ttl(ttl)
+ }
+
+ pub fn join_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.io.join_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn join_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.io.join_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.io.leave_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.io.leave_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.io.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.io.only_v6()
+ }
+
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+}
+
+impl Evented for UdpSocket {
+ fn register(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.evented_fd.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self,
+ poll: &Poll,
+ token: Token,
+ interest: Ready,
+ opts: PollOpt) -> io::Result<()>
+ {
+ self.evented_fd.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.evented_fd.deregister(poll)
+ }
+}
diff --git a/third_party/rust/mio/src/sys/fuchsia/ready.rs b/third_party/rust/mio/src/sys/fuchsia/ready.rs
new file mode 100644
index 0000000000..97854f8c07
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/ready.rs
@@ -0,0 +1,181 @@
+use event_imp::{Ready, ready_as_usize, ready_from_usize};
+pub use zircon_sys::{
+ zx_signals_t,
+ ZX_OBJECT_READABLE,
+ ZX_OBJECT_WRITABLE,
+};
+use std::ops;
+
+// The following impls are valid because Fuchsia and mio both represent
+// "readable" as `1 << 0` and "writable" as `1 << 2`.
+// We define this assertion here and call it from `Selector::new`,
+// since `Selector:;new` is guaranteed to be called during a standard mio runtime,
+// unlike the functions in this file.
+#[inline]
+pub fn assert_fuchsia_ready_repr() {
+ debug_assert!(
+ ZX_OBJECT_READABLE.bits() as usize == ready_as_usize(Ready::readable()),
+ "Zircon ZX_OBJECT_READABLE should have the same repr as Ready::readable()"
+ );
+ debug_assert!(
+ ZX_OBJECT_WRITABLE.bits() as usize == ready_as_usize(Ready::writable()),
+ "Zircon ZX_OBJECT_WRITABLE should have the same repr as Ready::writable()"
+ );
+}
+
+/// Fuchsia specific extensions to `Ready`
+///
+/// Provides additional readiness event kinds that are available on Fuchsia.
+///
+/// Conversion traits are implemented between `Ready` and `FuchsiaReady`.
+///
+/// For high level documentation on polling and readiness, see [`Poll`].
+///
+/// [`Poll`]: struct.Poll.html
+#[derive(Debug, Copy, PartialEq, Eq, Clone, PartialOrd, Ord)]
+pub struct FuchsiaReady(Ready);
+
+impl FuchsiaReady {
+ /// Returns the `FuchsiaReady` as raw zircon signals.
+ /// This function is just a more explicit, non-generic version of
+ /// `FuchsiaReady::into`.
+ #[inline]
+ pub fn into_zx_signals(self) -> zx_signals_t {
+ zx_signals_t::from_bits_truncate(ready_as_usize(self.0) as u32)
+ }
+}
+
+impl Into<zx_signals_t> for FuchsiaReady {
+ #[inline]
+ fn into(self) -> zx_signals_t {
+ self.into_zx_signals()
+ }
+}
+
+impl From<zx_signals_t> for FuchsiaReady {
+ #[inline]
+ fn from(src: zx_signals_t) -> Self {
+ FuchsiaReady(src.into())
+ }
+}
+
+impl From<zx_signals_t> for Ready {
+ #[inline]
+ fn from(src: zx_signals_t) -> Self {
+ ready_from_usize(src.bits() as usize)
+ }
+}
+
+impl From<Ready> for FuchsiaReady {
+ #[inline]
+ fn from(src: Ready) -> FuchsiaReady {
+ FuchsiaReady(src)
+ }
+}
+
+impl From<FuchsiaReady> for Ready {
+ #[inline]
+ fn from(src: FuchsiaReady) -> Ready {
+ src.0
+ }
+}
+
+impl ops::Deref for FuchsiaReady {
+ type Target = Ready;
+
+ #[inline]
+ fn deref(&self) -> &Ready {
+ &self.0
+ }
+}
+
+impl ops::DerefMut for FuchsiaReady {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut Ready {
+ &mut self.0
+ }
+}
+
+impl ops::BitOr for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn bitor(self, other: FuchsiaReady) -> FuchsiaReady {
+ (self.0 | other.0).into()
+ }
+}
+
+impl ops::BitXor for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn bitxor(self, other: FuchsiaReady) -> FuchsiaReady {
+ (self.0 ^ other.0).into()
+ }
+}
+
+impl ops::BitAnd for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn bitand(self, other: FuchsiaReady) -> FuchsiaReady {
+ (self.0 & other.0).into()
+ }
+}
+
+impl ops::Sub for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn sub(self, other: FuchsiaReady) -> FuchsiaReady {
+ (self.0 & !other.0).into()
+ }
+}
+
+#[deprecated(since = "0.6.10", note = "removed")]
+#[cfg(feature = "with-deprecated")]
+#[doc(hidden)]
+impl ops::Not for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn not(self) -> FuchsiaReady {
+ (!self.0).into()
+ }
+}
+
+impl ops::BitOr<zx_signals_t> for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn bitor(self, other: zx_signals_t) -> FuchsiaReady {
+ self | FuchsiaReady::from(other)
+ }
+}
+
+impl ops::BitXor<zx_signals_t> for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn bitxor(self, other: zx_signals_t) -> FuchsiaReady {
+ self ^ FuchsiaReady::from(other)
+ }
+}
+
+impl ops::BitAnd<zx_signals_t> for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn bitand(self, other: zx_signals_t) -> FuchsiaReady {
+ self & FuchsiaReady::from(other)
+ }
+}
+
+impl ops::Sub<zx_signals_t> for FuchsiaReady {
+ type Output = FuchsiaReady;
+
+ #[inline]
+ fn sub(self, other: zx_signals_t) -> FuchsiaReady {
+ self - FuchsiaReady::from(other)
+ }
+}
diff --git a/third_party/rust/mio/src/sys/fuchsia/selector.rs b/third_party/rust/mio/src/sys/fuchsia/selector.rs
new file mode 100644
index 0000000000..27226ac5ff
--- /dev/null
+++ b/third_party/rust/mio/src/sys/fuchsia/selector.rs
@@ -0,0 +1,353 @@
+use {io, Event, PollOpt, Ready, Token};
+use sys::fuchsia::{
+ assert_fuchsia_ready_repr,
+ epoll_event_to_ready,
+ poll_opts_to_wait_async,
+ EventedFd,
+ EventedFdInner,
+ FuchsiaReady,
+};
+use zircon;
+use zircon::AsHandleRef;
+use zircon_sys::zx_handle_t;
+use std::collections::hash_map;
+use std::fmt;
+use std::mem;
+use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
+use std::sync::{Arc, Mutex, Weak};
+use std::time::Duration;
+use sys;
+
+/// The kind of registration-- file descriptor or handle.
+///
+/// The last bit of a token is set to indicate the type of the registration.
+#[derive(Copy, Clone, Eq, PartialEq)]
+enum RegType {
+ Fd,
+ Handle,
+}
+
+fn key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64> {
+ let key = token.0 as u64;
+ let msb = 1u64 << 63;
+ if (key & msb) != 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Most-significant bit of token must remain unset."));
+ }
+
+ Ok(match reg_type {
+ RegType::Fd => key,
+ RegType::Handle => key | msb,
+ })
+}
+
+fn token_and_type_from_key(key: u64) -> (Token, RegType) {
+ let msb = 1u64 << 63;
+ (
+ Token((key & !msb) as usize),
+ if (key & msb) == 0 {
+ RegType::Fd
+ } else {
+ RegType::Handle
+ }
+ )
+}
+
+/// Each Selector has a globally unique(ish) ID associated with it. This ID
+/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
+/// registered with the `Selector`. If a type that is previously associated with
+/// a `Selector` attempts to register itself with a different `Selector`, the
+/// operation will return with an error. This matches windows behavior.
+static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+pub struct Selector {
+ id: usize,
+
+ /// Zircon object on which the handles have been registered, and on which events occur
+ port: Arc<zircon::Port>,
+
+ /// Whether or not `tokens_to_rereg` contains any elements. This is a best-effort attempt
+ /// used to prevent having to lock `tokens_to_rereg` when it is empty.
+ has_tokens_to_rereg: AtomicBool,
+
+ /// List of `Token`s corresponding to registrations that need to be reregistered before the
+ /// next `port::wait`. This is necessary to provide level-triggered behavior for
+ /// `Async::repeating` registrations.
+ ///
+ /// When a level-triggered `Async::repeating` event is seen, its token is added to this list so
+ /// that it will be reregistered before the next `port::wait` call, making `port::wait` return
+ /// immediately if the signal was high during the reregistration.
+ ///
+ /// Note: when used at the same time, the `tokens_to_rereg` lock should be taken out _before_
+ /// `token_to_fd`.
+ tokens_to_rereg: Mutex<Vec<Token>>,
+
+ /// Map from tokens to weak references to `EventedFdInner`-- a structure describing a
+ /// file handle, its associated `fdio` object, and its current registration.
+ token_to_fd: Mutex<hash_map::HashMap<Token, Weak<EventedFdInner>>>,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ // Assertion from fuchsia/ready.rs to make sure that FuchsiaReady's representation is
+ // compatible with Ready.
+ assert_fuchsia_ready_repr();
+
+ let port = Arc::new(
+ zircon::Port::create(zircon::PortOpts::Default)?
+ );
+
+ // offset by 1 to avoid choosing 0 as the id of a selector
+ let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
+
+ let has_tokens_to_rereg = AtomicBool::new(false);
+ let tokens_to_rereg = Mutex::new(Vec::new());
+ let token_to_fd = Mutex::new(hash_map::HashMap::new());
+
+ Ok(Selector {
+ id: id,
+ port: port,
+ has_tokens_to_rereg: has_tokens_to_rereg,
+ tokens_to_rereg: tokens_to_rereg,
+ token_to_fd: token_to_fd,
+ })
+ }
+
+ pub fn id(&self) -> usize {
+ self.id
+ }
+
+ /// Returns a reference to the underlying port `Arc`.
+ pub fn port(&self) -> &Arc<zircon::Port> { &self.port }
+
+ /// Reregisters all registrations pointed to by the `tokens_to_rereg` list
+ /// if `has_tokens_to_rereg`.
+ fn reregister_handles(&self) -> io::Result<()> {
+ // We use `Ordering::Acquire` to make sure that we see all `tokens_to_rereg`
+ // written before the store using `Ordering::Release`.
+ if self.has_tokens_to_rereg.load(Ordering::Acquire) {
+ let mut tokens = self.tokens_to_rereg.lock().unwrap();
+ let token_to_fd = self.token_to_fd.lock().unwrap();
+ for token in tokens.drain(0..) {
+ if let Some(eventedfd) = token_to_fd.get(&token)
+ .and_then(|h| h.upgrade()) {
+ eventedfd.rereg_for_level(&self.port);
+ }
+ }
+ self.has_tokens_to_rereg.store(false, Ordering::Release);
+ }
+ Ok(())
+ }
+
+ pub fn select(&self,
+ evts: &mut Events,
+ _awakener: Token,
+ timeout: Option<Duration>) -> io::Result<bool>
+ {
+ evts.clear();
+
+ self.reregister_handles()?;
+
+ let deadline = match timeout {
+ Some(duration) => {
+ let nanos = duration.as_secs().saturating_mul(1_000_000_000)
+ .saturating_add(duration.subsec_nanos() as u64);
+
+ zircon::deadline_after(nanos)
+ }
+ None => zircon::ZX_TIME_INFINITE,
+ };
+
+ let packet = match self.port.wait(deadline) {
+ Ok(packet) => packet,
+ Err(zircon::Status::ErrTimedOut) => return Ok(false),
+ Err(e) => Err(e)?,
+ };
+
+ let observed_signals = match packet.contents() {
+ zircon::PacketContents::SignalOne(signal_packet) => {
+ signal_packet.observed()
+ }
+ zircon::PacketContents::SignalRep(signal_packet) => {
+ signal_packet.observed()
+ }
+ zircon::PacketContents::User(_user_packet) => {
+ // User packets are only ever sent by an Awakener
+ return Ok(true);
+ }
+ };
+
+ let key = packet.key();
+ let (token, reg_type) = token_and_type_from_key(key);
+
+ match reg_type {
+ RegType::Handle => {
+ // We can return immediately-- no lookup or registration necessary.
+ evts.events.push(Event::new(Ready::from(observed_signals), token));
+ Ok(false)
+ },
+ RegType::Fd => {
+ // Convert the signals to epoll events using __fdio_wait_end,
+ // and add to reregistration list if necessary.
+ let events: u32;
+ {
+ let handle = if let Some(handle) =
+ self.token_to_fd.lock().unwrap()
+ .get(&token)
+ .and_then(|h| h.upgrade()) {
+ handle
+ } else {
+ // This handle is apparently in the process of removal.
+ // It has been removed from the list, but port_cancel has not been called.
+ return Ok(false);
+ };
+
+ events = unsafe {
+ let mut events: u32 = mem::uninitialized();
+ sys::fuchsia::sys::__fdio_wait_end(handle.fdio(), observed_signals, &mut events);
+ events
+ };
+
+ // If necessary, queue to be reregistered before next port_await
+ let needs_to_rereg = {
+ let registration_lock = handle.registration().lock().unwrap();
+
+ registration_lock
+ .as_ref()
+ .and_then(|r| r.rereg_signals())
+ .is_some()
+ };
+
+ if needs_to_rereg {
+ let mut tokens_to_rereg_lock = self.tokens_to_rereg.lock().unwrap();
+ tokens_to_rereg_lock.push(token);
+ // We use `Ordering::Release` to make sure that we see all `tokens_to_rereg`
+ // written before the store.
+ self.has_tokens_to_rereg.store(true, Ordering::Release);
+ }
+ }
+
+ evts.events.push(Event::new(epoll_event_to_ready(events), token));
+ Ok(false)
+ },
+ }
+ }
+
+ /// Register event interests for the given IO handle with the OS
+ pub fn register_fd(&self,
+ handle: &zircon::Handle,
+ fd: &EventedFd,
+ token: Token,
+ signals: zircon::Signals,
+ poll_opts: PollOpt) -> io::Result<()>
+ {
+ {
+ let mut token_to_fd = self.token_to_fd.lock().unwrap();
+ match token_to_fd.entry(token) {
+ hash_map::Entry::Occupied(_) =>
+ return Err(io::Error::new(io::ErrorKind::AlreadyExists,
+ "Attempted to register a filedescriptor on an existing token.")),
+ hash_map::Entry::Vacant(slot) => slot.insert(Arc::downgrade(&fd.inner)),
+ };
+ }
+
+ let wait_async_opts = poll_opts_to_wait_async(poll_opts);
+
+ let wait_res = handle.wait_async_handle(&self.port, token.0 as u64, signals, wait_async_opts);
+
+ if wait_res.is_err() {
+ self.token_to_fd.lock().unwrap().remove(&token);
+ }
+
+ Ok(wait_res?)
+ }
+
+ /// Deregister event interests for the given IO handle with the OS
+ pub fn deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()> {
+ self.token_to_fd.lock().unwrap().remove(&token);
+
+ // We ignore NotFound errors since oneshots are automatically deregistered,
+ // but mio will attempt to deregister them manually.
+ self.port.cancel(&*handle, token.0 as u64)
+ .map_err(io::Error::from)
+ .or_else(|e| if e.kind() == io::ErrorKind::NotFound {
+ Ok(())
+ } else {
+ Err(e)
+ })
+ }
+
+ pub fn register_handle(&self,
+ handle: zx_handle_t,
+ token: Token,
+ interests: Ready,
+ poll_opts: PollOpt) -> io::Result<()>
+ {
+ if poll_opts.is_level() && !poll_opts.is_oneshot() {
+ return Err(io::Error::new(io::ErrorKind::InvalidInput,
+ "Repeated level-triggered events are not supported on Fuchsia handles."));
+ }
+
+ let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
+
+ let res = temp_handle.wait_async_handle(
+ &self.port,
+ key_from_token_and_type(token, RegType::Handle)?,
+ FuchsiaReady::from(interests).into_zx_signals(),
+ poll_opts_to_wait_async(poll_opts));
+
+ mem::forget(temp_handle);
+
+ Ok(res?)
+ }
+
+
+ pub fn deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()>
+ {
+ let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
+ let res = self.port.cancel(&temp_handle, key_from_token_and_type(token, RegType::Handle)?);
+
+ mem::forget(temp_handle);
+
+ Ok(res?)
+ }
+}
+
+pub struct Events {
+ events: Vec<Event>
+}
+
+impl Events {
+ pub fn with_capacity(_u: usize) -> Events {
+ // The Fuchsia selector only handles one event at a time,
+ // so we ignore the default capacity and set it to one.
+ Events { events: Vec::with_capacity(1) }
+ }
+ pub fn len(&self) -> usize {
+ self.events.len()
+ }
+ pub fn capacity(&self) -> usize {
+ self.events.capacity()
+ }
+ pub fn is_empty(&self) -> bool {
+ self.events.is_empty()
+ }
+ pub fn get(&self, idx: usize) -> Option<Event> {
+ self.events.get(idx).map(|e| *e)
+ }
+ pub fn push_event(&mut self, event: Event) {
+ self.events.push(event)
+ }
+ pub fn clear(&mut self) {
+ self.events.events.drain(0..);
+ }
+}
+
+impl fmt::Debug for Events {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Events")
+ .field("len", &self.len())
+ .finish()
+ }
+}
diff --git a/third_party/rust/mio/src/sys/mod.rs b/third_party/rust/mio/src/sys/mod.rs
new file mode 100644
index 0000000000..8a1705db6c
--- /dev/null
+++ b/third_party/rust/mio/src/sys/mod.rs
@@ -0,0 +1,56 @@
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+pub use self::unix::{
+ Awakener,
+ EventedFd,
+ Events,
+ Io,
+ Selector,
+ TcpStream,
+ TcpListener,
+ UdpSocket,
+ pipe,
+ set_nonblock,
+};
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+pub use self::unix::READY_ALL;
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+#[cfg(feature = "with-deprecated")]
+pub use self::unix::UnixSocket;
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+pub mod unix;
+
+#[cfg(windows)]
+pub use self::windows::{
+ Awakener,
+ Events,
+ Selector,
+ TcpStream,
+ TcpListener,
+ UdpSocket,
+ Overlapped,
+ Binding,
+};
+
+#[cfg(windows)]
+mod windows;
+
+#[cfg(target_os = "fuchsia")]
+pub use self::fuchsia::{
+ Awakener,
+ Events,
+ EventedHandle,
+ Selector,
+ TcpStream,
+ TcpListener,
+ UdpSocket,
+ set_nonblock,
+};
+
+#[cfg(target_os = "fuchsia")]
+pub mod fuchsia;
+
+#[cfg(not(all(unix, not(target_os = "fuchsia"))))]
+pub const READY_ALL: usize = 0;
diff --git a/third_party/rust/mio/src/sys/unix/awakener.rs b/third_party/rust/mio/src/sys/unix/awakener.rs
new file mode 100644
index 0000000000..9cc367a78c
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/awakener.rs
@@ -0,0 +1,74 @@
+pub use self::pipe::Awakener;
+
+/// Default awakener backed by a pipe
+mod pipe {
+ use sys::unix;
+ use {io, Ready, Poll, PollOpt, Token};
+ use event::Evented;
+ use std::io::{Read, Write};
+
+ /*
+ *
+ * ===== Awakener =====
+ *
+ */
+
+ pub struct Awakener {
+ reader: unix::Io,
+ writer: unix::Io,
+ }
+
+ impl Awakener {
+ pub fn new() -> io::Result<Awakener> {
+ let (rd, wr) = unix::pipe()?;
+
+ Ok(Awakener {
+ reader: rd,
+ writer: wr,
+ })
+ }
+
+ pub fn wakeup(&self) -> io::Result<()> {
+ match (&self.writer).write(&[1]) {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ if e.kind() == io::ErrorKind::WouldBlock {
+ Ok(())
+ } else {
+ Err(e)
+ }
+ }
+ }
+ }
+
+ pub fn cleanup(&self) {
+ let mut buf = [0; 128];
+
+ loop {
+ // Consume data until all bytes are purged
+ match (&self.reader).read(&mut buf) {
+ Ok(i) if i > 0 => {},
+ _ => return,
+ }
+ }
+ }
+
+ fn reader(&self) -> &unix::Io {
+ &self.reader
+ }
+ }
+
+ impl Evented for Awakener {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.reader().register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.reader().reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.reader().deregister(poll)
+ }
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/dlsym.rs b/third_party/rust/mio/src/sys/unix/dlsym.rs
new file mode 100644
index 0000000000..e88c595fc9
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/dlsym.rs
@@ -0,0 +1,47 @@
+use std::marker;
+use std::mem;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use libc;
+
+macro_rules! dlsym {
+ (fn $name:ident($($t:ty),*) -> $ret:ty) => (
+ #[allow(bad_style)]
+ static $name: ::sys::unix::dlsym::DlSym<unsafe extern fn($($t),*) -> $ret> =
+ ::sys::unix::dlsym::DlSym {
+ name: concat!(stringify!($name), "\0"),
+ addr: ::std::sync::atomic::ATOMIC_USIZE_INIT,
+ _marker: ::std::marker::PhantomData,
+ };
+ )
+}
+
+pub struct DlSym<F> {
+ pub name: &'static str,
+ pub addr: AtomicUsize,
+ pub _marker: marker::PhantomData<F>,
+}
+
+impl<F> DlSym<F> {
+ pub fn get(&self) -> Option<&F> {
+ assert_eq!(mem::size_of::<F>(), mem::size_of::<usize>());
+ unsafe {
+ if self.addr.load(Ordering::SeqCst) == 0 {
+ self.addr.store(fetch(self.name), Ordering::SeqCst);
+ }
+ if self.addr.load(Ordering::SeqCst) == 1 {
+ None
+ } else {
+ mem::transmute::<&AtomicUsize, Option<&F>>(&self.addr)
+ }
+ }
+ }
+}
+
+unsafe fn fetch(name: &str) -> usize {
+ assert_eq!(name.as_bytes()[name.len() - 1], 0);
+ match libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr() as *const _) as usize {
+ 0 => 1,
+ n => n,
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/epoll.rs b/third_party/rust/mio/src/sys/unix/epoll.rs
new file mode 100644
index 0000000000..03b0ebd5b3
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/epoll.rs
@@ -0,0 +1,260 @@
+#![allow(deprecated)]
+use std::os::unix::io::AsRawFd;
+use std::os::unix::io::RawFd;
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+use std::time::Duration;
+use std::{cmp, i32};
+
+use libc::{self, c_int};
+use libc::{EPOLLERR, EPOLLHUP, EPOLLONESHOT};
+use libc::{EPOLLET, EPOLLOUT, EPOLLIN, EPOLLPRI};
+
+use {io, Ready, PollOpt, Token};
+use event_imp::Event;
+use sys::unix::{cvt, UnixReady};
+use sys::unix::io::set_cloexec;
+
+/// Each Selector has a globally unique(ish) ID associated with it. This ID
+/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
+/// registered with the `Selector`. If a type that is previously associated with
+/// a `Selector` attempts to register itself with a different `Selector`, the
+/// operation will return with an error. This matches windows behavior.
+static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+#[derive(Debug)]
+pub struct Selector {
+ id: usize,
+ epfd: RawFd,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ let epfd = unsafe {
+ // Emulate `epoll_create` by using `epoll_create1` if it's available
+ // and otherwise falling back to `epoll_create` followed by a call to
+ // set the CLOEXEC flag.
+ dlsym!(fn epoll_create1(c_int) -> c_int);
+
+ match epoll_create1.get() {
+ Some(epoll_create1_fn) => {
+ cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
+ }
+ None => {
+ let fd = cvt(libc::epoll_create(1024))?;
+ drop(set_cloexec(fd));
+ fd
+ }
+ }
+ };
+
+ // offset by 1 to avoid choosing 0 as the id of a selector
+ let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
+
+ Ok(Selector {
+ id: id,
+ epfd: epfd,
+ })
+ }
+
+ pub fn id(&self) -> usize {
+ self.id
+ }
+
+ /// Wait for events from the OS
+ pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
+ let timeout_ms = timeout
+ .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32)
+ .unwrap_or(-1);
+
+ // Wait for epoll events for at most timeout_ms milliseconds
+ evts.clear();
+ unsafe {
+ let cnt = cvt(libc::epoll_wait(self.epfd,
+ evts.events.as_mut_ptr(),
+ evts.events.capacity() as i32,
+ timeout_ms))?;
+ let cnt = cnt as usize;
+ evts.events.set_len(cnt);
+
+ for i in 0..cnt {
+ if evts.events[i].u64 as usize == awakener.into() {
+ evts.events.remove(i);
+ return Ok(true);
+ }
+ }
+ }
+
+ Ok(false)
+ }
+
+ /// Register event interests for the given IO handle with the OS
+ pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut info = libc::epoll_event {
+ events: ioevent_to_epoll(interests, opts),
+ u64: usize::from(token) as u64
+ };
+
+ unsafe {
+ cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
+ Ok(())
+ }
+ }
+
+ /// Register event interests for the given IO handle with the OS
+ pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut info = libc::epoll_event {
+ events: ioevent_to_epoll(interests, opts),
+ u64: usize::from(token) as u64
+ };
+
+ unsafe {
+ cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
+ Ok(())
+ }
+ }
+
+ /// Deregister event interests for the given IO handle with the OS
+ pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
+ // The &info argument should be ignored by the system,
+ // but linux < 2.6.9 required it to be not null.
+ // For compatibility, we provide a dummy EpollEvent.
+ let mut info = libc::epoll_event {
+ events: 0,
+ u64: 0,
+ };
+
+ unsafe {
+ cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
+ Ok(())
+ }
+ }
+}
+
+fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
+ let mut kind = 0;
+
+ if interest.is_readable() {
+ kind |= EPOLLIN;
+ }
+
+ if interest.is_writable() {
+ kind |= EPOLLOUT;
+ }
+
+ if UnixReady::from(interest).is_priority() {
+ kind |= EPOLLPRI;
+ }
+
+ if opts.is_edge() {
+ kind |= EPOLLET;
+ }
+
+ if opts.is_oneshot() {
+ kind |= EPOLLONESHOT;
+ }
+
+ if opts.is_level() {
+ kind &= !EPOLLET;
+ }
+
+ kind as u32
+}
+
+impl AsRawFd for Selector {
+ fn as_raw_fd(&self) -> RawFd {
+ self.epfd
+ }
+}
+
+impl Drop for Selector {
+ fn drop(&mut self) {
+ unsafe {
+ let _ = libc::close(self.epfd);
+ }
+ }
+}
+
+pub struct Events {
+ events: Vec<libc::epoll_event>,
+}
+
+impl Events {
+ pub fn with_capacity(u: usize) -> Events {
+ Events {
+ events: Vec::with_capacity(u)
+ }
+ }
+
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.events.len()
+ }
+
+ #[inline]
+ pub fn capacity(&self) -> usize {
+ self.events.capacity()
+ }
+
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.events.is_empty()
+ }
+
+ #[inline]
+ pub fn get(&self, idx: usize) -> Option<Event> {
+ self.events.get(idx).map(|event| {
+ let epoll = event.events as c_int;
+ let mut kind = Ready::empty();
+
+ if (epoll & EPOLLIN) != 0 {
+ kind = kind | Ready::readable();
+ }
+
+ if (epoll & EPOLLPRI) != 0 {
+ kind = kind | Ready::readable() | UnixReady::priority();
+ }
+
+ if (epoll & EPOLLOUT) != 0 {
+ kind = kind | Ready::writable();
+ }
+
+ // EPOLLHUP - Usually means a socket error happened
+ if (epoll & EPOLLERR) != 0 {
+ kind = kind | UnixReady::error();
+ }
+
+ if (epoll & EPOLLHUP) != 0 {
+ kind = kind | UnixReady::hup();
+ }
+
+ let token = self.events[idx].u64;
+
+ Event::new(kind, Token(token as usize))
+ })
+ }
+
+ pub fn push_event(&mut self, event: Event) {
+ self.events.push(libc::epoll_event {
+ events: ioevent_to_epoll(event.readiness(), PollOpt::empty()),
+ u64: usize::from(event.token()) as u64
+ });
+ }
+
+ pub fn clear(&mut self) {
+ unsafe { self.events.set_len(0); }
+ }
+}
+
+const NANOS_PER_MILLI: u32 = 1_000_000;
+const MILLIS_PER_SEC: u64 = 1_000;
+
+/// Convert a `Duration` to milliseconds, rounding up and saturating at
+/// `u64::MAX`.
+///
+/// The saturating is fine because `u64::MAX` milliseconds are still many
+/// million years.
+pub fn millis(duration: Duration) -> u64 {
+ // Round up.
+ let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
+ duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
+}
diff --git a/third_party/rust/mio/src/sys/unix/eventedfd.rs b/third_party/rust/mio/src/sys/unix/eventedfd.rs
new file mode 100644
index 0000000000..72586f6652
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/eventedfd.rs
@@ -0,0 +1,107 @@
+use {io, poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use std::os::unix::io::RawFd;
+
+/*
+ *
+ * ===== EventedFd =====
+ *
+ */
+
+#[derive(Debug)]
+
+/// Adapter for [`RawFd`] providing an [`Evented`] implementation.
+///
+/// `EventedFd` enables registering any type with an FD with [`Poll`].
+///
+/// While only implementations for TCP and UDP are provided, Mio supports
+/// registering any FD that can be registered with the underlying OS selector.
+/// `EventedFd` provides the necessary bridge.
+///
+/// Note that `EventedFd` takes a `&RawFd`. This is because `EventedFd` **does
+/// not** take ownership of the FD. Specifically, it will not manage any
+/// lifecycle related operations, such as closing the FD on drop. It is expected
+/// that the `EventedFd` is constructed right before a call to
+/// [`Poll::register`]. See the examples for more detail.
+///
+/// # Examples
+///
+/// Basic usage
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Ready, Poll, PollOpt, Token};
+/// use mio::unix::EventedFd;
+///
+/// use std::os::unix::io::AsRawFd;
+/// use std::net::TcpListener;
+///
+/// // Bind a std listener
+/// let listener = TcpListener::bind("127.0.0.1:0")?;
+///
+/// let poll = Poll::new()?;
+///
+/// // Register the listener
+/// poll.register(&EventedFd(&listener.as_raw_fd()),
+/// Token(0), Ready::readable(), PollOpt::edge())?;
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// Implementing [`Evented`] for a custom type backed by a [`RawFd`].
+///
+/// ```
+/// use mio::{Ready, Poll, PollOpt, Token};
+/// use mio::event::Evented;
+/// use mio::unix::EventedFd;
+///
+/// use std::os::unix::io::RawFd;
+/// use std::io;
+///
+/// pub struct MyIo {
+/// fd: RawFd,
+/// }
+///
+/// impl Evented for MyIo {
+/// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
+/// -> io::Result<()>
+/// {
+/// EventedFd(&self.fd).register(poll, token, interest, opts)
+/// }
+///
+/// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
+/// -> io::Result<()>
+/// {
+/// EventedFd(&self.fd).reregister(poll, token, interest, opts)
+/// }
+///
+/// fn deregister(&self, poll: &Poll) -> io::Result<()> {
+/// EventedFd(&self.fd).deregister(poll)
+/// }
+/// }
+/// ```
+///
+/// [`RawFd`]: https://doc.rust-lang.org/std/os/unix/io/type.RawFd.html
+/// [`Evented`]: ../event/trait.Evented.html
+/// [`Poll`]: ../struct.Poll.html
+/// [`Poll::register`]: ../struct.Poll.html#method.register
+pub struct EventedFd<'a>(pub &'a RawFd);
+
+impl<'a> Evented for EventedFd<'a> {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ poll::selector(poll).register(*self.0, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ poll::selector(poll).reregister(*self.0, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ poll::selector(poll).deregister(*self.0)
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/io.rs b/third_party/rust/mio/src/sys/unix/io.rs
new file mode 100644
index 0000000000..47a3a70d1f
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/io.rs
@@ -0,0 +1,107 @@
+use std::fs::File;
+use std::io::{Read, Write};
+use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd};
+
+use libc;
+
+use {io, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use unix::EventedFd;
+use sys::unix::cvt;
+
+pub fn set_nonblock(fd: libc::c_int) -> io::Result<()> {
+ unsafe {
+ let flags = libc::fcntl(fd, libc::F_GETFL);
+ cvt(libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK)).map(|_|())
+ }
+}
+
+pub fn set_cloexec(fd: libc::c_int) -> io::Result<()> {
+ unsafe {
+ let flags = libc::fcntl(fd, libc::F_GETFD);
+ cvt(libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC)).map(|_| ())
+ }
+}
+
+/*
+ *
+ * ===== Basic IO type =====
+ *
+ */
+
+/// Manages a FD
+#[derive(Debug)]
+pub struct Io {
+ fd: File,
+}
+
+impl Io {
+ /// Try to clone the FD
+ pub fn try_clone(&self) -> io::Result<Io> {
+ Ok(Io { fd: self.fd.try_clone()? })
+ }
+}
+
+impl FromRawFd for Io {
+ unsafe fn from_raw_fd(fd: RawFd) -> Io {
+ Io { fd: File::from_raw_fd(fd) }
+ }
+}
+
+impl IntoRawFd for Io {
+ fn into_raw_fd(self) -> RawFd {
+ self.fd.into_raw_fd()
+ }
+}
+
+impl AsRawFd for Io {
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.as_raw_fd()
+ }
+}
+
+impl Evented for Io {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).deregister(poll)
+ }
+}
+
+impl Read for Io {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ (&self.fd).read(dst)
+ }
+}
+
+impl<'a> Read for &'a Io {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ (&self.fd).read(dst)
+ }
+}
+
+impl Write for Io {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ (&self.fd).write(src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.fd).flush()
+ }
+}
+
+impl<'a> Write for &'a Io {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ (&self.fd).write(src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.fd).flush()
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/kqueue.rs b/third_party/rust/mio/src/sys/unix/kqueue.rs
new file mode 100644
index 0000000000..59c70e1e18
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/kqueue.rs
@@ -0,0 +1,360 @@
+use std::{cmp, fmt, ptr};
+#[cfg(not(target_os = "netbsd"))]
+use std::os::raw::{c_int, c_short};
+use std::os::unix::io::AsRawFd;
+use std::os::unix::io::RawFd;
+use std::collections::HashMap;
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+use std::time::Duration;
+
+use libc::{self, time_t};
+
+use {io, Ready, PollOpt, Token};
+use event_imp::{self as event, Event};
+use sys::unix::{cvt, UnixReady};
+use sys::unix::io::set_cloexec;
+
+/// Each Selector has a globally unique(ish) ID associated with it. This ID
+/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
+/// registered with the `Selector`. If a type that is previously associated with
+/// a `Selector` attempts to register itself with a different `Selector`, the
+/// operation will return with an error. This matches windows behavior.
+static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+#[cfg(not(target_os = "netbsd"))]
+type Filter = c_short;
+#[cfg(not(target_os = "netbsd"))]
+type UData = *mut ::libc::c_void;
+#[cfg(not(target_os = "netbsd"))]
+type Count = c_int;
+
+#[cfg(target_os = "netbsd")]
+type Filter = u32;
+#[cfg(target_os = "netbsd")]
+type UData = ::libc::intptr_t;
+#[cfg(target_os = "netbsd")]
+type Count = usize;
+
+macro_rules! kevent {
+ ($id: expr, $filter: expr, $flags: expr, $data: expr) => {
+ libc::kevent {
+ ident: $id as ::libc::uintptr_t,
+ filter: $filter as Filter,
+ flags: $flags,
+ fflags: 0,
+ data: 0,
+ udata: $data as UData,
+ }
+ }
+}
+
+pub struct Selector {
+ id: usize,
+ kq: RawFd,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ // offset by 1 to avoid choosing 0 as the id of a selector
+ let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
+ let kq = unsafe { cvt(libc::kqueue())? };
+ drop(set_cloexec(kq));
+
+ Ok(Selector {
+ id,
+ kq,
+ })
+ }
+
+ pub fn id(&self) -> usize {
+ self.id
+ }
+
+ pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
+ let timeout = timeout.map(|to| {
+ libc::timespec {
+ tv_sec: cmp::min(to.as_secs(), time_t::max_value() as u64) as time_t,
+ // `Duration::subsec_nanos` is guaranteed to be less than one
+ // billion (the number of nanoseconds in a second), making the
+ // cast to i32 safe. The cast itself is needed for platforms
+ // where C's long is only 32 bits.
+ tv_nsec: libc::c_long::from(to.subsec_nanos() as i32),
+ }
+ });
+ let timeout = timeout.as_ref().map(|s| s as *const _).unwrap_or(ptr::null_mut());
+
+ evts.clear();
+ unsafe {
+ let cnt = cvt(libc::kevent(self.kq,
+ ptr::null(),
+ 0,
+ evts.sys_events.0.as_mut_ptr(),
+ evts.sys_events.0.capacity() as Count,
+ timeout))?;
+ evts.sys_events.0.set_len(cnt as usize);
+ Ok(evts.coalesce(awakener))
+ }
+ }
+
+ pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
+ trace!("registering; token={:?}; interests={:?}", token, interests);
+
+ let flags = if opts.contains(PollOpt::edge()) { libc::EV_CLEAR } else { 0 } |
+ if opts.contains(PollOpt::oneshot()) { libc::EV_ONESHOT } else { 0 } |
+ libc::EV_RECEIPT;
+
+ unsafe {
+ let r = if interests.contains(Ready::readable()) { libc::EV_ADD } else { libc::EV_DELETE };
+ let w = if interests.contains(Ready::writable()) { libc::EV_ADD } else { libc::EV_DELETE };
+ let mut changes = [
+ kevent!(fd, libc::EVFILT_READ, flags | r, usize::from(token)),
+ kevent!(fd, libc::EVFILT_WRITE, flags | w, usize::from(token)),
+ ];
+
+ cvt(libc::kevent(self.kq,
+ changes.as_ptr(),
+ changes.len() as Count,
+ changes.as_mut_ptr(),
+ changes.len() as Count,
+ ::std::ptr::null()))?;
+
+ for change in changes.iter() {
+ debug_assert_eq!(change.flags & libc::EV_ERROR, libc::EV_ERROR);
+
+ // Test to see if an error happened
+ if change.data == 0 {
+ continue
+ }
+
+ // Older versions of OSX (10.11 and 10.10 have been witnessed)
+ // can return EPIPE when registering a pipe file descriptor
+ // where the other end has already disappeared. For example code
+ // that creates a pipe, closes a file descriptor, and then
+ // registers the other end will see an EPIPE returned from
+ // `register`.
+ //
+ // It also turns out that kevent will still report events on the
+ // file descriptor, telling us that it's readable/hup at least
+ // after we've done this registration. As a result we just
+ // ignore `EPIPE` here instead of propagating it.
+ //
+ // More info can be found at carllerche/mio#582
+ if change.data as i32 == libc::EPIPE &&
+ change.filter == libc::EVFILT_WRITE as Filter {
+ continue
+ }
+
+ // ignore ENOENT error for EV_DELETE
+ let orig_flags = if change.filter == libc::EVFILT_READ as Filter { r } else { w };
+ if change.data as i32 == libc::ENOENT && orig_flags & libc::EV_DELETE != 0 {
+ continue
+ }
+
+ return Err(::std::io::Error::from_raw_os_error(change.data as i32));
+ }
+ Ok(())
+ }
+ }
+
+ pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
+ // Just need to call register here since EV_ADD is a mod if already
+ // registered
+ self.register(fd, token, interests, opts)
+ }
+
+ pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
+ unsafe {
+ // EV_RECEIPT is a nice way to apply changes and get back per-event results while not
+ // draining the actual changes.
+ let filter = libc::EV_DELETE | libc::EV_RECEIPT;
+#[cfg(not(target_os = "netbsd"))]
+ let mut changes = [
+ kevent!(fd, libc::EVFILT_READ, filter, ptr::null_mut()),
+ kevent!(fd, libc::EVFILT_WRITE, filter, ptr::null_mut()),
+ ];
+
+#[cfg(target_os = "netbsd")]
+ let mut changes = [
+ kevent!(fd, libc::EVFILT_READ, filter, 0),
+ kevent!(fd, libc::EVFILT_WRITE, filter, 0),
+ ];
+
+ cvt(libc::kevent(self.kq,
+ changes.as_ptr(),
+ changes.len() as Count,
+ changes.as_mut_ptr(),
+ changes.len() as Count,
+ ::std::ptr::null())).map(|_| ())?;
+
+ if changes[0].data as i32 == libc::ENOENT && changes[1].data as i32 == libc::ENOENT {
+ return Err(::std::io::Error::from_raw_os_error(changes[0].data as i32));
+ }
+ for change in changes.iter() {
+ debug_assert_eq!(libc::EV_ERROR & change.flags, libc::EV_ERROR);
+ if change.data != 0 && change.data as i32 != libc::ENOENT {
+ return Err(::std::io::Error::from_raw_os_error(changes[0].data as i32));
+ }
+ }
+ Ok(())
+ }
+ }
+}
+
+impl fmt::Debug for Selector {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Selector")
+ .field("id", &self.id)
+ .field("kq", &self.kq)
+ .finish()
+ }
+}
+
+impl AsRawFd for Selector {
+ fn as_raw_fd(&self) -> RawFd {
+ self.kq
+ }
+}
+
+impl Drop for Selector {
+ fn drop(&mut self) {
+ unsafe {
+ let _ = libc::close(self.kq);
+ }
+ }
+}
+
+pub struct Events {
+ sys_events: KeventList,
+ events: Vec<Event>,
+ event_map: HashMap<Token, usize>,
+}
+
+struct KeventList(Vec<libc::kevent>);
+
+unsafe impl Send for KeventList {}
+unsafe impl Sync for KeventList {}
+
+impl Events {
+ pub fn with_capacity(cap: usize) -> Events {
+ Events {
+ sys_events: KeventList(Vec::with_capacity(cap)),
+ events: Vec::with_capacity(cap),
+ event_map: HashMap::with_capacity(cap)
+ }
+ }
+
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.events.len()
+ }
+
+ #[inline]
+ pub fn capacity(&self) -> usize {
+ self.events.capacity()
+ }
+
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.events.is_empty()
+ }
+
+ pub fn get(&self, idx: usize) -> Option<Event> {
+ self.events.get(idx).cloned()
+ }
+
+ fn coalesce(&mut self, awakener: Token) -> bool {
+ let mut ret = false;
+ self.events.clear();
+ self.event_map.clear();
+
+ for e in self.sys_events.0.iter() {
+ let token = Token(e.udata as usize);
+ let len = self.events.len();
+
+ if token == awakener {
+ // TODO: Should this return an error if event is an error. It
+ // is not critical as spurious wakeups are permitted.
+ ret = true;
+ continue;
+ }
+
+ let idx = *self.event_map.entry(token)
+ .or_insert(len);
+
+ if idx == len {
+ // New entry, insert the default
+ self.events.push(Event::new(Ready::empty(), token));
+
+ }
+
+ if e.flags & libc::EV_ERROR != 0 {
+ event::kind_mut(&mut self.events[idx]).insert(*UnixReady::error());
+ }
+
+ if e.filter == libc::EVFILT_READ as Filter {
+ event::kind_mut(&mut self.events[idx]).insert(Ready::readable());
+ } else if e.filter == libc::EVFILT_WRITE as Filter {
+ event::kind_mut(&mut self.events[idx]).insert(Ready::writable());
+ }
+#[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ {
+ if e.filter == libc::EVFILT_AIO {
+ event::kind_mut(&mut self.events[idx]).insert(UnixReady::aio());
+ }
+ }
+#[cfg(any(target_os = "freebsd"))]
+ {
+ if e.filter == libc::EVFILT_LIO {
+ event::kind_mut(&mut self.events[idx]).insert(UnixReady::lio());
+ }
+ }
+ }
+
+ ret
+ }
+
+ pub fn push_event(&mut self, event: Event) {
+ self.events.push(event);
+ }
+
+ pub fn clear(&mut self) {
+ self.sys_events.0.truncate(0);
+ self.events.truncate(0);
+ self.event_map.clear();
+ }
+}
+
+impl fmt::Debug for Events {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Events")
+ .field("len", &self.sys_events.0.len())
+ .finish()
+ }
+}
+
+#[test]
+fn does_not_register_rw() {
+ use {Poll, Ready, PollOpt, Token};
+ use unix::EventedFd;
+
+ let kq = unsafe { libc::kqueue() };
+ let kqf = EventedFd(&kq);
+ let poll = Poll::new().unwrap();
+
+ // registering kqueue fd will fail if write is requested (On anything but some versions of OS
+ // X)
+ poll.register(&kqf, Token(1234), Ready::readable(),
+ PollOpt::edge() | PollOpt::oneshot()).unwrap();
+}
+
+#[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+#[test]
+fn test_coalesce_aio() {
+ let mut events = Events::with_capacity(1);
+ events.sys_events.0.push(kevent!(0x1234, libc::EVFILT_AIO, 0, 42));
+ events.coalesce(Token(0));
+ assert!(events.events[0].readiness() == UnixReady::aio().into());
+ assert!(events.events[0].token() == Token(42));
+}
diff --git a/third_party/rust/mio/src/sys/unix/mod.rs b/third_party/rust/mio/src/sys/unix/mod.rs
new file mode 100644
index 0000000000..5bb83070d2
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/mod.rs
@@ -0,0 +1,95 @@
+use libc::{self, c_int};
+
+#[macro_use]
+pub mod dlsym;
+
+#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))]
+mod epoll;
+
+#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))]
+pub use self::epoll::{Events, Selector};
+
+#[cfg(any(target_os = "bitrig", target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos",
+ target_os = "netbsd", target_os = "openbsd"))]
+mod kqueue;
+
+#[cfg(any(target_os = "bitrig", target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos",
+ target_os = "netbsd", target_os = "openbsd"))]
+pub use self::kqueue::{Events, Selector};
+
+mod awakener;
+mod eventedfd;
+mod io;
+mod ready;
+mod tcp;
+mod udp;
+mod uio;
+
+#[cfg(feature = "with-deprecated")]
+mod uds;
+
+pub use self::awakener::Awakener;
+pub use self::eventedfd::EventedFd;
+pub use self::io::{Io, set_nonblock};
+pub use self::ready::{UnixReady, READY_ALL};
+pub use self::tcp::{TcpStream, TcpListener};
+pub use self::udp::UdpSocket;
+
+#[cfg(feature = "with-deprecated")]
+pub use self::uds::UnixSocket;
+
+pub use iovec::IoVec;
+
+use std::os::unix::io::FromRawFd;
+
+pub fn pipe() -> ::io::Result<(Io, Io)> {
+ // Use pipe2 for atomically setting O_CLOEXEC if we can, but otherwise
+ // just fall back to using `pipe`.
+ dlsym!(fn pipe2(*mut c_int, c_int) -> c_int);
+
+ let mut pipes = [0; 2];
+ unsafe {
+ match pipe2.get() {
+ Some(pipe2_fn) => {
+ let flags = libc::O_NONBLOCK | libc::O_CLOEXEC;
+ cvt(pipe2_fn(pipes.as_mut_ptr(), flags))?;
+ Ok((Io::from_raw_fd(pipes[0]), Io::from_raw_fd(pipes[1])))
+ }
+ None => {
+ cvt(libc::pipe(pipes.as_mut_ptr()))?;
+ // Ensure the pipe are closed if any of the system calls below
+ // fail.
+ let r = Io::from_raw_fd(pipes[0]);
+ let w = Io::from_raw_fd(pipes[1]);
+ cvt(libc::fcntl(pipes[0], libc::F_SETFD, libc::FD_CLOEXEC))?;
+ cvt(libc::fcntl(pipes[1], libc::F_SETFD, libc::FD_CLOEXEC))?;
+ cvt(libc::fcntl(pipes[0], libc::F_SETFL, libc::O_NONBLOCK))?;
+ cvt(libc::fcntl(pipes[1], libc::F_SETFL, libc::O_NONBLOCK))?;
+ Ok((r, w))
+ }
+ }
+ }
+}
+
+trait IsMinusOne {
+ fn is_minus_one(&self) -> bool;
+}
+
+impl IsMinusOne for i32 {
+ fn is_minus_one(&self) -> bool { *self == -1 }
+}
+impl IsMinusOne for isize {
+ fn is_minus_one(&self) -> bool { *self == -1 }
+}
+
+fn cvt<T: IsMinusOne>(t: T) -> ::io::Result<T> {
+ use std::io;
+
+ if t.is_minus_one() {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(t)
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/ready.rs b/third_party/rust/mio/src/sys/unix/ready.rs
new file mode 100644
index 0000000000..1780ceae9f
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/ready.rs
@@ -0,0 +1,499 @@
+use event_imp::{Ready, ready_as_usize, ready_from_usize};
+
+use std::ops;
+use std::fmt;
+
+/// Unix specific extensions to `Ready`
+///
+/// Provides additional readiness event kinds that are available on unix
+/// platforms. Unix platforms are able to provide readiness events for
+/// additional socket events, such as HUP and error.
+///
+/// HUP events occur when the remote end of a socket hangs up. In the TCP case,
+/// this occurs when the remote end of a TCP socket shuts down writes.
+///
+/// Error events occur when the socket enters an error state. In this case, the
+/// socket will also receive a readable or writable event. Reading or writing to
+/// the socket will result in an error.
+///
+/// Conversion traits are implemented between `Ready` and `UnixReady`. See the
+/// examples.
+///
+/// For high level documentation on polling and readiness, see [`Poll`].
+///
+/// # Examples
+///
+/// Most of the time, all that is needed is using bit operations
+///
+/// ```
+/// use mio::Ready;
+/// use mio::unix::UnixReady;
+///
+/// let ready = Ready::readable() | UnixReady::hup();
+///
+/// assert!(ready.is_readable());
+/// assert!(UnixReady::from(ready).is_hup());
+/// ```
+///
+/// Basic conversion between ready types.
+///
+/// ```
+/// use mio::Ready;
+/// use mio::unix::UnixReady;
+///
+/// // Start with a portable ready
+/// let ready = Ready::readable();
+///
+/// // Convert to a unix ready, adding HUP
+/// let mut unix_ready = UnixReady::from(ready) | UnixReady::hup();
+///
+/// unix_ready.insert(UnixReady::error());
+///
+/// // `unix_ready` maintains readable interest
+/// assert!(unix_ready.is_readable());
+/// assert!(unix_ready.is_hup());
+/// assert!(unix_ready.is_error());
+///
+/// // Convert back to `Ready`
+/// let ready = Ready::from(unix_ready);
+///
+/// // Readable is maintained
+/// assert!(ready.is_readable());
+/// ```
+///
+/// Registering readable and error interest on a socket
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Ready, Poll, PollOpt, Token};
+/// use mio::net::TcpStream;
+/// use mio::unix::UnixReady;
+///
+/// let addr = "216.58.193.68:80".parse()?;
+/// let socket = TcpStream::connect(&addr)?;
+///
+/// let poll = Poll::new()?;
+///
+/// poll.register(&socket,
+/// Token(0),
+/// Ready::readable() | UnixReady::error(),
+/// PollOpt::edge())?;
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Poll`]: ../struct.Poll.html
+/// [readiness]: struct.Poll.html#readiness-operations
+#[derive(Copy, PartialEq, Eq, Clone, PartialOrd, Ord)]
+pub struct UnixReady(Ready);
+
+const ERROR: usize = 0b00_0100;
+const HUP: usize = 0b00_1000;
+
+#[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+const AIO: usize = 0b01_0000;
+
+#[cfg(not(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos")))]
+const AIO: usize = 0b00_0000;
+
+#[cfg(any(target_os = "freebsd"))]
+const LIO: usize = 0b10_0000;
+
+#[cfg(not(any(target_os = "freebsd")))]
+const LIO: usize = 0b00_0000;
+
+#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))]
+const PRI: usize = 0b100_0000;
+
+#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "solaris")))]
+const PRI: usize = 0;
+
+// Export to support `Ready::all`
+pub const READY_ALL: usize = ERROR | HUP | AIO | LIO | PRI;
+
+#[test]
+fn test_ready_all() {
+ let readable = Ready::readable().as_usize();
+ let writable = Ready::writable().as_usize();
+
+ assert_eq!(
+ READY_ALL | readable | writable,
+ ERROR + HUP + AIO + LIO + PRI + readable + writable
+ );
+
+ // Issue #896.
+ #[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))]
+ assert!(!Ready::from(UnixReady::priority()).is_writable());
+}
+
+impl UnixReady {
+ /// Returns a `Ready` representing AIO completion readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::aio();
+ ///
+ /// assert!(ready.is_aio());
+ /// ```
+ ///
+ /// [`Poll`]: ../struct.Poll.html
+ #[inline]
+ #[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ pub fn aio() -> UnixReady {
+ UnixReady(ready_from_usize(AIO))
+ }
+
+ #[cfg(not(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos")))]
+ #[deprecated(since = "0.6.12", note = "this function is now platform specific")]
+ #[doc(hidden)]
+ pub fn aio() -> UnixReady {
+ UnixReady(Ready::empty())
+ }
+
+ /// Returns a `Ready` representing error readiness.
+ ///
+ /// **Note that only readable and writable readiness is guaranteed to be
+ /// supported on all platforms**. This means that `error` readiness
+ /// should be treated as a hint. For more details, see [readiness] in the
+ /// poll documentation.
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::error();
+ ///
+ /// assert!(ready.is_error());
+ /// ```
+ ///
+ /// [`Poll`]: ../struct.Poll.html
+ /// [readiness]: ../struct.Poll.html#readiness-operations
+ #[inline]
+ pub fn error() -> UnixReady {
+ UnixReady(ready_from_usize(ERROR))
+ }
+
+ /// Returns a `Ready` representing HUP readiness.
+ ///
+ /// A HUP (or hang-up) signifies that a stream socket **peer** closed the
+ /// connection, or shut down the writing half of the connection.
+ ///
+ /// **Note that only readable and writable readiness is guaranteed to be
+ /// supported on all platforms**. This means that `hup` readiness
+ /// should be treated as a hint. For more details, see [readiness] in the
+ /// poll documentation. It is also unclear if HUP readiness will remain in 0.7. See
+ /// [here][issue-941].
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::hup();
+ ///
+ /// assert!(ready.is_hup());
+ /// ```
+ ///
+ /// [`Poll`]: ../struct.Poll.html
+ /// [readiness]: ../struct.Poll.html#readiness-operations
+ /// [issue-941]: https://github.com/tokio-rs/mio/issues/941
+ #[inline]
+ pub fn hup() -> UnixReady {
+ UnixReady(ready_from_usize(HUP))
+ }
+
+ /// Returns a `Ready` representing LIO completion readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::lio();
+ ///
+ /// assert!(ready.is_lio());
+ /// ```
+ ///
+ /// [`Poll`]: struct.Poll.html
+ #[inline]
+ #[cfg(any(target_os = "freebsd"))]
+ pub fn lio() -> UnixReady {
+ UnixReady(ready_from_usize(LIO))
+ }
+
+ /// Returns a `Ready` representing priority (`EPOLLPRI`) readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::priority();
+ ///
+ /// assert!(ready.is_priority());
+ /// ```
+ ///
+ /// [`Poll`]: struct.Poll.html
+ #[inline]
+ #[cfg(any(target_os = "linux",
+ target_os = "android", target_os = "solaris"))]
+ pub fn priority() -> UnixReady {
+ UnixReady(ready_from_usize(PRI))
+ }
+
+ /// Returns true if `Ready` contains AIO readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::aio();
+ ///
+ /// assert!(ready.is_aio());
+ /// ```
+ ///
+ /// [`Poll`]: ../struct.Poll.html
+ #[inline]
+ #[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ pub fn is_aio(&self) -> bool {
+ self.contains(ready_from_usize(AIO))
+ }
+
+ #[deprecated(since = "0.6.12", note = "this function is now platform specific")]
+ #[cfg(feature = "with-deprecated")]
+ #[cfg(not(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos")))]
+ #[doc(hidden)]
+ pub fn is_aio(&self) -> bool {
+ false
+ }
+
+ /// Returns true if the value includes error readiness
+ ///
+ /// **Note that only readable and writable readiness is guaranteed to be
+ /// supported on all platforms**. This means that `error` readiness should
+ /// be treated as a hint. For more details, see [readiness] in the poll
+ /// documentation.
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::error();
+ ///
+ /// assert!(ready.is_error());
+ /// ```
+ ///
+ /// [`Poll`]: ../struct.Poll.html
+ /// [readiness]: ../struct.Poll.html#readiness-operations
+ #[inline]
+ pub fn is_error(&self) -> bool {
+ self.contains(ready_from_usize(ERROR))
+ }
+
+ /// Returns true if the value includes HUP readiness
+ ///
+ /// A HUP (or hang-up) signifies that a stream socket **peer** closed the
+ /// connection, or shut down the writing half of the connection.
+ ///
+ /// **Note that only readable and writable readiness is guaranteed to be
+ /// supported on all platforms**. This means that `hup` readiness
+ /// should be treated as a hint. For more details, see [readiness] in the
+ /// poll documentation.
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::hup();
+ ///
+ /// assert!(ready.is_hup());
+ /// ```
+ ///
+ /// [`Poll`]: ../struct.Poll.html
+ /// [readiness]: ../struct.Poll.html#readiness-operations
+ #[inline]
+ pub fn is_hup(&self) -> bool {
+ self.contains(ready_from_usize(HUP))
+ }
+
+ /// Returns true if `Ready` contains LIO readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::lio();
+ ///
+ /// assert!(ready.is_lio());
+ /// ```
+ #[inline]
+ #[cfg(any(target_os = "freebsd"))]
+ pub fn is_lio(&self) -> bool {
+ self.contains(ready_from_usize(LIO))
+ }
+
+ /// Returns true if `Ready` contains priority (`EPOLLPRI`) readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::priority();
+ ///
+ /// assert!(ready.is_priority());
+ /// ```
+ ///
+ /// [`Poll`]: struct.Poll.html
+ #[inline]
+ #[cfg(any(target_os = "linux",
+ target_os = "android", target_os = "solaris"))]
+ pub fn is_priority(&self) -> bool {
+ self.contains(ready_from_usize(PRI))
+ }
+}
+
+impl From<Ready> for UnixReady {
+ fn from(src: Ready) -> UnixReady {
+ UnixReady(src)
+ }
+}
+
+impl From<UnixReady> for Ready {
+ fn from(src: UnixReady) -> Ready {
+ src.0
+ }
+}
+
+impl ops::Deref for UnixReady {
+ type Target = Ready;
+
+ fn deref(&self) -> &Ready {
+ &self.0
+ }
+}
+
+impl ops::DerefMut for UnixReady {
+ fn deref_mut(&mut self) -> &mut Ready {
+ &mut self.0
+ }
+}
+
+impl ops::BitOr for UnixReady {
+ type Output = UnixReady;
+
+ #[inline]
+ fn bitor(self, other: UnixReady) -> UnixReady {
+ (self.0 | other.0).into()
+ }
+}
+
+impl ops::BitXor for UnixReady {
+ type Output = UnixReady;
+
+ #[inline]
+ fn bitxor(self, other: UnixReady) -> UnixReady {
+ (self.0 ^ other.0).into()
+ }
+}
+
+impl ops::BitAnd for UnixReady {
+ type Output = UnixReady;
+
+ #[inline]
+ fn bitand(self, other: UnixReady) -> UnixReady {
+ (self.0 & other.0).into()
+ }
+}
+
+impl ops::Sub for UnixReady {
+ type Output = UnixReady;
+
+ #[inline]
+ fn sub(self, other: UnixReady) -> UnixReady {
+ ready_from_usize(ready_as_usize(self.0) & !ready_as_usize(other.0)).into()
+ }
+}
+
+#[deprecated(since = "0.6.10", note = "removed")]
+#[cfg(feature = "with-deprecated")]
+#[doc(hidden)]
+impl ops::Not for UnixReady {
+ type Output = UnixReady;
+
+ #[inline]
+ fn not(self) -> UnixReady {
+ (!self.0).into()
+ }
+}
+
+impl fmt::Debug for UnixReady {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let mut one = false;
+ let flags = [
+ (UnixReady(Ready::readable()), "Readable"),
+ (UnixReady(Ready::writable()), "Writable"),
+ (UnixReady::error(), "Error"),
+ (UnixReady::hup(), "Hup"),
+ #[allow(deprecated)]
+ (UnixReady::aio(), "Aio"),
+ #[cfg(any(target_os = "linux",
+ target_os = "android", target_os = "solaris"))]
+ (UnixReady::priority(), "Priority"),
+ ];
+
+ for &(flag, msg) in &flags {
+ if self.contains(flag) {
+ if one { write!(fmt, " | ")? }
+ write!(fmt, "{}", msg)?;
+
+ one = true
+ }
+ }
+
+ if !one {
+ fmt.write_str("(empty)")?;
+ }
+
+ Ok(())
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/tcp.rs b/third_party/rust/mio/src/sys/unix/tcp.rs
new file mode 100644
index 0000000000..79c18c74fd
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/tcp.rs
@@ -0,0 +1,286 @@
+use std::fmt;
+use std::io::{Read, Write};
+use std::net::{self, SocketAddr};
+use std::os::unix::io::{RawFd, FromRawFd, IntoRawFd, AsRawFd};
+use std::time::Duration;
+
+use libc;
+use net2::TcpStreamExt;
+use iovec::IoVec;
+
+use {io, Ready, Poll, PollOpt, Token};
+use event::Evented;
+
+use sys::unix::eventedfd::EventedFd;
+use sys::unix::io::set_nonblock;
+use sys::unix::uio::VecIo;
+
+pub struct TcpStream {
+ inner: net::TcpStream,
+}
+
+pub struct TcpListener {
+ inner: net::TcpListener,
+}
+
+impl TcpStream {
+ pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
+ set_nonblock(stream.as_raw_fd())?;
+
+ match stream.connect(addr) {
+ Ok(..) => {}
+ Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
+ Err(e) => return Err(e),
+ }
+
+ Ok(TcpStream {
+ inner: stream,
+ })
+ }
+
+ pub fn from_stream(stream: net::TcpStream) -> TcpStream {
+ TcpStream {
+ inner: stream,
+ }
+ }
+
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpStream> {
+ self.inner.try_clone().map(|s| {
+ TcpStream {
+ inner: s,
+ }
+ })
+ }
+
+ pub fn shutdown(&self, how: net::Shutdown) -> io::Result<()> {
+ self.inner.shutdown(how)
+ }
+
+ pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
+ self.inner.set_nodelay(nodelay)
+ }
+
+ pub fn nodelay(&self) -> io::Result<bool> {
+ self.inner.nodelay()
+ }
+
+ pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.inner.set_recv_buffer_size(size)
+ }
+
+ pub fn recv_buffer_size(&self) -> io::Result<usize> {
+ self.inner.recv_buffer_size()
+ }
+
+ pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.inner.set_send_buffer_size(size)
+ }
+
+ pub fn send_buffer_size(&self) -> io::Result<usize> {
+ self.inner.send_buffer_size()
+ }
+
+ pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
+ self.inner.set_keepalive(keepalive)
+ }
+
+ pub fn keepalive(&self) -> io::Result<Option<Duration>> {
+ self.inner.keepalive()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.inner.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.inner.ttl()
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.inner.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.inner.only_v6()
+ }
+
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ self.inner.set_linger(dur)
+ }
+
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ self.inner.linger()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.inner.take_error()
+ }
+
+ pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.peek(buf)
+ }
+
+ pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+ self.inner.readv(bufs)
+ }
+
+ pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+ self.inner.writev(bufs)
+ }
+}
+
+impl<'a> Read for &'a TcpStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (&self.inner).read(buf)
+ }
+}
+
+impl<'a> Write for &'a TcpStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (&self.inner).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.inner).flush()
+ }
+}
+
+impl Evented for TcpStream {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).deregister(poll)
+ }
+}
+
+impl fmt::Debug for TcpStream {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fmt::Debug::fmt(&self.inner, f)
+ }
+}
+
+impl FromRawFd for TcpStream {
+ unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
+ TcpStream {
+ inner: net::TcpStream::from_raw_fd(fd),
+ }
+ }
+}
+
+impl IntoRawFd for TcpStream {
+ fn into_raw_fd(self) -> RawFd {
+ self.inner.into_raw_fd()
+ }
+}
+
+impl AsRawFd for TcpStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+}
+
+impl TcpListener {
+ pub fn new(inner: net::TcpListener) -> io::Result<TcpListener> {
+ set_nonblock(inner.as_raw_fd())?;
+ Ok(TcpListener {
+ inner,
+ })
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpListener> {
+ self.inner.try_clone().map(|s| {
+ TcpListener {
+ inner: s,
+ }
+ })
+ }
+
+ pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> {
+ self.inner.accept()
+ }
+
+ #[allow(deprecated)]
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.inner.set_only_v6(only_v6)
+ }
+
+ #[allow(deprecated)]
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.inner.only_v6()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.inner.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.inner.ttl()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.inner.take_error()
+ }
+}
+
+impl Evented for TcpListener {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).deregister(poll)
+ }
+}
+
+impl fmt::Debug for TcpListener {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fmt::Debug::fmt(&self.inner, f)
+ }
+}
+
+impl FromRawFd for TcpListener {
+ unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
+ TcpListener {
+ inner: net::TcpListener::from_raw_fd(fd),
+ }
+ }
+}
+
+impl IntoRawFd for TcpListener {
+ fn into_raw_fd(self) -> RawFd {
+ self.inner.into_raw_fd()
+ }
+}
+
+impl AsRawFd for TcpListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+}
+
diff --git a/third_party/rust/mio/src/sys/unix/udp.rs b/third_party/rust/mio/src/sys/unix/udp.rs
new file mode 100644
index 0000000000..c77a9d6380
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/udp.rs
@@ -0,0 +1,181 @@
+use {io, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use unix::EventedFd;
+use sys::unix::uio::VecIo;
+use std::fmt;
+use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::os::unix::io::{RawFd, IntoRawFd, AsRawFd, FromRawFd};
+
+#[allow(unused_imports)] // only here for Rust 1.8
+use net2::UdpSocketExt;
+use iovec::IoVec;
+
+pub struct UdpSocket {
+ io: net::UdpSocket,
+}
+
+impl UdpSocket {
+ pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> {
+ socket.set_nonblocking(true)?;
+ Ok(UdpSocket {
+ io: socket,
+ })
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<UdpSocket> {
+ self.io.try_clone().map(|io| {
+ UdpSocket {
+ io,
+ }
+ })
+ }
+
+ pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
+ self.io.send_to(buf, target)
+ }
+
+ pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io.recv_from(buf)
+ }
+
+ pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io.send(buf)
+ }
+
+ pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.recv(buf)
+ }
+
+ pub fn connect(&self, addr: SocketAddr)
+ -> io::Result<()> {
+ self.io.connect(addr)
+ }
+
+ pub fn broadcast(&self) -> io::Result<bool> {
+ self.io.broadcast()
+ }
+
+ pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
+ self.io.set_broadcast(on)
+ }
+
+ pub fn multicast_loop_v4(&self) -> io::Result<bool> {
+ self.io.multicast_loop_v4()
+ }
+
+ pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
+ self.io.set_multicast_loop_v4(on)
+ }
+
+ pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
+ self.io.multicast_ttl_v4()
+ }
+
+ pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
+ self.io.set_multicast_ttl_v4(ttl)
+ }
+
+ pub fn multicast_loop_v6(&self) -> io::Result<bool> {
+ self.io.multicast_loop_v6()
+ }
+
+ pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
+ self.io.set_multicast_loop_v6(on)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.io.ttl()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.io.set_ttl(ttl)
+ }
+
+ pub fn join_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.io.join_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn join_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.io.join_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.io.leave_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.io.leave_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.io.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.io.only_v6()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
+ pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+ self.io.readv(bufs)
+ }
+
+ pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+ self.io.writev(bufs)
+ }
+}
+
+impl Evented for UdpSocket {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ EventedFd(&self.as_raw_fd()).deregister(poll)
+ }
+}
+
+impl fmt::Debug for UdpSocket {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fmt::Debug::fmt(&self.io, f)
+ }
+}
+
+impl FromRawFd for UdpSocket {
+ unsafe fn from_raw_fd(fd: RawFd) -> UdpSocket {
+ UdpSocket {
+ io: net::UdpSocket::from_raw_fd(fd),
+ }
+ }
+}
+
+impl IntoRawFd for UdpSocket {
+ fn into_raw_fd(self) -> RawFd {
+ self.io.into_raw_fd()
+ }
+}
+
+impl AsRawFd for UdpSocket {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/uds.rs b/third_party/rust/mio/src/sys/unix/uds.rs
new file mode 100644
index 0000000000..1bf8c5d260
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/uds.rs
@@ -0,0 +1,262 @@
+use std::io::{Read, Write};
+use std::mem;
+use std::net::Shutdown;
+use std::os::unix::prelude::*;
+use std::path::Path;
+use std::ptr;
+
+use libc;
+
+use {io, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::unix::{cvt, Io};
+use sys::unix::io::{set_nonblock, set_cloexec};
+
+trait MyInto<T> {
+ fn my_into(self) -> T;
+}
+
+impl MyInto<u32> for usize {
+ fn my_into(self) -> u32 { self as u32 }
+}
+
+impl MyInto<usize> for usize {
+ fn my_into(self) -> usize { self }
+}
+
+unsafe fn sockaddr_un(path: &Path)
+ -> io::Result<(libc::sockaddr_un, libc::socklen_t)> {
+ let mut addr: libc::sockaddr_un = mem::zeroed();
+ addr.sun_family = libc::AF_UNIX as libc::sa_family_t;
+
+ let bytes = path.as_os_str().as_bytes();
+
+ if bytes.len() >= addr.sun_path.len() {
+ return Err(io::Error::new(io::ErrorKind::InvalidInput,
+ "path must be shorter than SUN_LEN"))
+ }
+ for (dst, src) in addr.sun_path.iter_mut().zip(bytes.iter()) {
+ *dst = *src as libc::c_char;
+ }
+ // null byte for pathname addresses is already there because we zeroed the
+ // struct
+
+ let mut len = sun_path_offset() + bytes.len();
+ match bytes.get(0) {
+ Some(&0) | None => {}
+ Some(_) => len += 1,
+ }
+ Ok((addr, len as libc::socklen_t))
+}
+
+fn sun_path_offset() -> usize {
+ unsafe {
+ // Work with an actual instance of the type since using a null pointer is UB
+ let addr: libc::sockaddr_un = mem::uninitialized();
+ let base = &addr as *const _ as usize;
+ let path = &addr.sun_path as *const _ as usize;
+ path - base
+ }
+}
+
+#[derive(Debug)]
+pub struct UnixSocket {
+ io: Io,
+}
+
+impl UnixSocket {
+ /// Returns a new, unbound, non-blocking Unix domain socket
+ pub fn stream() -> io::Result<UnixSocket> {
+ #[cfg(target_os = "linux")]
+ use libc::{SOCK_CLOEXEC, SOCK_NONBLOCK};
+ #[cfg(not(target_os = "linux"))]
+ const SOCK_CLOEXEC: libc::c_int = 0;
+ #[cfg(not(target_os = "linux"))]
+ const SOCK_NONBLOCK: libc::c_int = 0;
+
+ unsafe {
+ if cfg!(target_os = "linux") {
+ let flags = libc::SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK;
+ match cvt(libc::socket(libc::AF_UNIX, flags, 0)) {
+ Ok(fd) => return Ok(UnixSocket::from_raw_fd(fd)),
+ Err(ref e) if e.raw_os_error() == Some(libc::EINVAL) => {}
+ Err(e) => return Err(e),
+ }
+ }
+
+ let fd = cvt(libc::socket(libc::AF_UNIX, libc::SOCK_STREAM, 0))?;
+ let fd = UnixSocket::from_raw_fd(fd);
+ set_cloexec(fd.as_raw_fd())?;
+ set_nonblock(fd.as_raw_fd())?;
+ Ok(fd)
+ }
+ }
+
+ /// Connect the socket to the specified address
+ pub fn connect<P: AsRef<Path> + ?Sized>(&self, addr: &P) -> io::Result<()> {
+ unsafe {
+ let (addr, len) = sockaddr_un(addr.as_ref())?;
+ cvt(libc::connect(self.as_raw_fd(),
+ &addr as *const _ as *const _,
+ len))?;
+ Ok(())
+ }
+ }
+
+ /// Listen for incoming requests
+ pub fn listen(&self, backlog: usize) -> io::Result<()> {
+ unsafe {
+ cvt(libc::listen(self.as_raw_fd(), backlog as i32))?;
+ Ok(())
+ }
+ }
+
+ pub fn accept(&self) -> io::Result<UnixSocket> {
+ unsafe {
+ let fd = cvt(libc::accept(self.as_raw_fd(),
+ ptr::null_mut(),
+ ptr::null_mut()))?;
+ let fd = Io::from_raw_fd(fd);
+ set_cloexec(fd.as_raw_fd())?;
+ set_nonblock(fd.as_raw_fd())?;
+ Ok(UnixSocket { io: fd })
+ }
+ }
+
+ /// Bind the socket to the specified address
+ pub fn bind<P: AsRef<Path> + ?Sized>(&self, addr: &P) -> io::Result<()> {
+ unsafe {
+ let (addr, len) = sockaddr_un(addr.as_ref())?;
+ cvt(libc::bind(self.as_raw_fd(),
+ &addr as *const _ as *const _,
+ len))?;
+ Ok(())
+ }
+ }
+
+ pub fn try_clone(&self) -> io::Result<UnixSocket> {
+ Ok(UnixSocket { io: self.io.try_clone()? })
+ }
+
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ let how = match how {
+ Shutdown::Read => libc::SHUT_RD,
+ Shutdown::Write => libc::SHUT_WR,
+ Shutdown::Both => libc::SHUT_RDWR,
+ };
+ unsafe {
+ cvt(libc::shutdown(self.as_raw_fd(), how))?;
+ Ok(())
+ }
+ }
+
+ pub fn read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
+ unsafe {
+ let mut iov = libc::iovec {
+ iov_base: buf.as_mut_ptr() as *mut _,
+ iov_len: buf.len(),
+ };
+ struct Cmsg {
+ hdr: libc::cmsghdr,
+ data: [libc::c_int; 1],
+ }
+ let mut cmsg: Cmsg = mem::zeroed();
+ let mut msg: libc::msghdr = mem::zeroed();
+ msg.msg_iov = &mut iov;
+ msg.msg_iovlen = 1;
+ msg.msg_control = &mut cmsg as *mut _ as *mut _;
+ msg.msg_controllen = mem::size_of_val(&cmsg).my_into();
+ let bytes = cvt(libc::recvmsg(self.as_raw_fd(), &mut msg, 0))?;
+
+ const SCM_RIGHTS: libc::c_int = 1;
+
+ let fd = if cmsg.hdr.cmsg_level == libc::SOL_SOCKET &&
+ cmsg.hdr.cmsg_type == SCM_RIGHTS {
+ Some(cmsg.data[0])
+ } else {
+ None
+ };
+ Ok((bytes as usize, fd))
+ }
+ }
+
+ pub fn write_send_fd(&mut self, buf: &[u8], fd: RawFd) -> io::Result<usize> {
+ unsafe {
+ let mut iov = libc::iovec {
+ iov_base: buf.as_ptr() as *mut _,
+ iov_len: buf.len(),
+ };
+ struct Cmsg {
+ hdr: libc::cmsghdr,
+ data: [libc::c_int; 1],
+ }
+ let mut cmsg: Cmsg = mem::zeroed();
+ cmsg.hdr.cmsg_len = mem::size_of_val(&cmsg).my_into();
+ cmsg.hdr.cmsg_level = libc::SOL_SOCKET;
+ cmsg.hdr.cmsg_type = 1; // SCM_RIGHTS
+ cmsg.data[0] = fd;
+ let mut msg: libc::msghdr = mem::zeroed();
+ msg.msg_iov = &mut iov;
+ msg.msg_iovlen = 1;
+ msg.msg_control = &mut cmsg as *mut _ as *mut _;
+ msg.msg_controllen = mem::size_of_val(&cmsg).my_into();
+ let bytes = cvt(libc::sendmsg(self.as_raw_fd(), &msg, 0))?;
+ Ok(bytes as usize)
+ }
+ }
+}
+
+impl Read for UnixSocket {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.read(buf)
+ }
+}
+
+impl Write for UnixSocket {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.io.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.io.flush()
+ }
+}
+
+impl Evented for UnixSocket {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.io.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.io.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.io.deregister(poll)
+ }
+}
+
+
+impl From<Io> for UnixSocket {
+ fn from(io: Io) -> UnixSocket {
+ UnixSocket { io }
+ }
+}
+
+impl FromRawFd for UnixSocket {
+ unsafe fn from_raw_fd(fd: RawFd) -> UnixSocket {
+ UnixSocket { io: Io::from_raw_fd(fd) }
+ }
+}
+
+impl IntoRawFd for UnixSocket {
+ fn into_raw_fd(self) -> RawFd {
+ self.io.into_raw_fd()
+ }
+}
+
+impl AsRawFd for UnixSocket {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
diff --git a/third_party/rust/mio/src/sys/unix/uio.rs b/third_party/rust/mio/src/sys/unix/uio.rs
new file mode 100644
index 0000000000..e38cd4983b
--- /dev/null
+++ b/third_party/rust/mio/src/sys/unix/uio.rs
@@ -0,0 +1,44 @@
+use std::cmp;
+use std::io;
+use std::os::unix::io::AsRawFd;
+use libc;
+use iovec::IoVec;
+use iovec::unix as iovec;
+
+pub trait VecIo {
+ fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize>;
+
+ fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize>;
+}
+
+impl<T: AsRawFd> VecIo for T {
+ fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+ unsafe {
+ let slice = iovec::as_os_slice_mut(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let rc = libc::readv(self.as_raw_fd(),
+ slice.as_ptr(),
+ len as libc::c_int);
+ if rc < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(rc as usize)
+ }
+ }
+ }
+
+ fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+ unsafe {
+ let slice = iovec::as_os_slice(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let rc = libc::writev(self.as_raw_fd(),
+ slice.as_ptr(),
+ len as libc::c_int);
+ if rc < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(rc as usize)
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/third_party/rust/mio/src/sys/windows/awakener.rs b/third_party/rust/mio/src/sys/windows/awakener.rs
new file mode 100644
index 0000000000..c913bc93f8
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/awakener.rs
@@ -0,0 +1,66 @@
+use std::sync::Mutex;
+
+use miow::iocp::CompletionStatus;
+use {io, poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::windows::Selector;
+
+pub struct Awakener {
+ inner: Mutex<Option<AwakenerInner>>,
+}
+
+struct AwakenerInner {
+ token: Token,
+ selector: Selector,
+}
+
+impl Awakener {
+ pub fn new() -> io::Result<Awakener> {
+ Ok(Awakener {
+ inner: Mutex::new(None),
+ })
+ }
+
+ pub fn wakeup(&self) -> io::Result<()> {
+ // Each wakeup notification has NULL as its `OVERLAPPED` pointer to
+ // indicate that it's from this awakener and not part of an I/O
+ // operation. This is specially recognized by the selector.
+ //
+ // If we haven't been registered with an event loop yet just silently
+ // succeed.
+ if let Some(inner) = self.inner.lock().unwrap().as_ref() {
+ let status = CompletionStatus::new(0,
+ usize::from(inner.token),
+ 0 as *mut _);
+ inner.selector.port().post(status)?;
+ }
+ Ok(())
+ }
+
+ pub fn cleanup(&self) {
+ // noop
+ }
+}
+
+impl Evented for Awakener {
+ fn register(&self, poll: &Poll, token: Token, events: Ready,
+ opts: PollOpt) -> io::Result<()> {
+ assert_eq!(opts, PollOpt::edge());
+ assert_eq!(events, Ready::readable());
+ *self.inner.lock().unwrap() = Some(AwakenerInner {
+ selector: poll::selector(poll).clone_ref(),
+ token: token,
+ });
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, events: Ready,
+ opts: PollOpt) -> io::Result<()> {
+ self.register(poll, token, events, opts)
+ }
+
+ fn deregister(&self, _poll: &Poll) -> io::Result<()> {
+ *self.inner.lock().unwrap() = None;
+ Ok(())
+ }
+}
diff --git a/third_party/rust/mio/src/sys/windows/buffer_pool.rs b/third_party/rust/mio/src/sys/windows/buffer_pool.rs
new file mode 100644
index 0000000000..86754593fd
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/buffer_pool.rs
@@ -0,0 +1,20 @@
+pub struct BufferPool {
+ pool: Vec<Vec<u8>>,
+}
+
+impl BufferPool {
+ pub fn new(cap: usize) -> BufferPool {
+ BufferPool { pool: Vec::with_capacity(cap) }
+ }
+
+ pub fn get(&mut self, default_cap: usize) -> Vec<u8> {
+ self.pool.pop().unwrap_or_else(|| Vec::with_capacity(default_cap))
+ }
+
+ pub fn put(&mut self, mut buf: Vec<u8>) {
+ if self.pool.len() < self.pool.capacity(){
+ unsafe { buf.set_len(0); }
+ self.pool.push(buf);
+ }
+ }
+}
diff --git a/third_party/rust/mio/src/sys/windows/from_raw_arc.rs b/third_party/rust/mio/src/sys/windows/from_raw_arc.rs
new file mode 100644
index 0000000000..b6d38b2408
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/from_raw_arc.rs
@@ -0,0 +1,116 @@
+//! A "Manual Arc" which allows manually frobbing the reference count
+//!
+//! This module contains a copy of the `Arc` found in the standard library,
+//! stripped down to the bare bones of what we actually need. The reason this is
+//! done is for the ability to concretely know the memory layout of the `Inner`
+//! structure of the arc pointer itself (e.g. `ArcInner` in the standard
+//! library).
+//!
+//! We do some unsafe casting from `*mut OVERLAPPED` to a `FromRawArc<T>` to
+//! ensure that data lives for the length of an I/O operation, but this means
+//! that we have to know the layouts of the structures involved. This
+//! representation primarily guarantees that the data, `T` is at the front of
+//! the inner pointer always.
+//!
+//! Note that we're missing out on some various optimizations implemented in the
+//! standard library:
+//!
+//! * The size of `FromRawArc` is actually two words because of the drop flag
+//! * The compiler doesn't understand that the pointer in `FromRawArc` is never
+//! null, so Option<FromRawArc<T>> is not a nullable pointer.
+
+use std::ops::Deref;
+use std::mem;
+use std::sync::atomic::{self, AtomicUsize, Ordering};
+
+pub struct FromRawArc<T> {
+ _inner: *mut Inner<T>,
+}
+
+unsafe impl<T: Sync + Send> Send for FromRawArc<T> { }
+unsafe impl<T: Sync + Send> Sync for FromRawArc<T> { }
+
+#[repr(C)]
+struct Inner<T> {
+ data: T,
+ cnt: AtomicUsize,
+}
+
+impl<T> FromRawArc<T> {
+ pub fn new(data: T) -> FromRawArc<T> {
+ let x = Box::new(Inner {
+ data: data,
+ cnt: AtomicUsize::new(1),
+ });
+ FromRawArc { _inner: unsafe { mem::transmute(x) } }
+ }
+
+ pub unsafe fn from_raw(ptr: *mut T) -> FromRawArc<T> {
+ // Note that if we could use `mem::transmute` here to get a libstd Arc
+ // (guaranteed) then we could just use std::sync::Arc, but this is the
+ // crucial reason this currently exists.
+ FromRawArc { _inner: ptr as *mut Inner<T> }
+ }
+}
+
+impl<T> Clone for FromRawArc<T> {
+ fn clone(&self) -> FromRawArc<T> {
+ // Atomic ordering of Relaxed lifted from libstd, but the general idea
+ // is that you need synchronization to communicate this increment to
+ // another thread, so this itself doesn't need to be synchronized.
+ unsafe {
+ (*self._inner).cnt.fetch_add(1, Ordering::Relaxed);
+ }
+ FromRawArc { _inner: self._inner }
+ }
+}
+
+impl<T> Deref for FromRawArc<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &(*self._inner).data }
+ }
+}
+
+impl<T> Drop for FromRawArc<T> {
+ fn drop(&mut self) {
+ unsafe {
+ // Atomic orderings lifted from the standard library
+ if (*self._inner).cnt.fetch_sub(1, Ordering::Release) != 1 {
+ return
+ }
+ atomic::fence(Ordering::Acquire);
+ drop(mem::transmute::<_, Box<T>>(self._inner));
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::FromRawArc;
+
+ #[test]
+ fn smoke() {
+ let a = FromRawArc::new(1);
+ assert_eq!(*a, 1);
+ assert_eq!(*a.clone(), 1);
+ }
+
+ #[test]
+ fn drops() {
+ struct A<'a>(&'a mut bool);
+ impl<'a> Drop for A<'a> {
+ fn drop(&mut self) {
+ *self.0 = true;
+ }
+ }
+ let mut a = false;
+ {
+ let a = FromRawArc::new(A(&mut a));
+ let _ = a.clone();
+ assert!(!*a.0);
+ }
+ assert!(a);
+ }
+}
diff --git a/third_party/rust/mio/src/sys/windows/mod.rs b/third_party/rust/mio/src/sys/windows/mod.rs
new file mode 100644
index 0000000000..dad70b0c15
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/mod.rs
@@ -0,0 +1,187 @@
+//! Implementation of mio for Windows using IOCP
+//!
+//! This module uses I/O Completion Ports (IOCP) on Windows to implement mio's
+//! Unix epoll-like interface. Unfortunately these two I/O models are
+//! fundamentally incompatible:
+//!
+//! * IOCP is a completion-based model where work is submitted to the kernel and
+//! a program is notified later when the work finished.
+//! * epoll is a readiness-based model where the kernel is queried as to what
+//! work can be done, and afterwards the work is done.
+//!
+//! As a result, this implementation for Windows is much less "low level" than
+//! the Unix implementation of mio. This design decision was intentional,
+//! however.
+//!
+//! ## What is IOCP?
+//!
+//! The [official docs][docs] have a comprehensive explanation of what IOCP is,
+//! but at a high level it requires the following operations to be executed to
+//! perform some I/O:
+//!
+//! 1. A completion port is created
+//! 2. An I/O handle and a token is registered with this completion port
+//! 3. Some I/O is issued on the handle. This generally means that an API was
+//! invoked with a zeroed `OVERLAPPED` structure. The API will immediately
+//! return.
+//! 4. After some time, the application queries the I/O port for completed
+//! events. The port will returned a pointer to the `OVERLAPPED` along with
+//! the token presented at registration time.
+//!
+//! Many I/O operations can be fired off before waiting on a port, and the port
+//! will block execution of the calling thread until an I/O event has completed
+//! (or a timeout has elapsed).
+//!
+//! Currently all of these low-level operations are housed in a separate `miow`
+//! crate to provide a 0-cost abstraction over IOCP. This crate uses that to
+//! implement all fiddly bits so there's very few actual Windows API calls or
+//! `unsafe` blocks as a result.
+//!
+//! [docs]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198%28v=vs.85%29.aspx
+//!
+//! ## Safety of IOCP
+//!
+//! Unfortunately for us, IOCP is pretty unsafe in terms of Rust lifetimes and
+//! such. When an I/O operation is submitted to the kernel, it involves handing
+//! the kernel a few pointers like a buffer to read/write, an `OVERLAPPED`
+//! structure pointer, and perhaps some other buffers such as for socket
+//! addresses. These pointers all have to remain valid **for the entire I/O
+//! operation's duration**.
+//!
+//! There's no way to define a safe lifetime for these pointers/buffers over
+//! the span of an I/O operation, so we're forced to add a layer of abstraction
+//! (not 0-cost) to make these APIs safe. Currently this implementation
+//! basically just boxes everything up on the heap to give it a stable address
+//! and then keys off that most of the time.
+//!
+//! ## From completion to readiness
+//!
+//! Translating a completion-based model to a readiness-based model is also no
+//! easy task, and a significant portion of this implementation is managing this
+//! translation. The basic idea behind this implementation is to issue I/O
+//! operations preemptively and then translate their completions to a "I'm
+//! ready" event.
+//!
+//! For example, in the case of reading a `TcpSocket`, as soon as a socket is
+//! connected (or registered after an accept) a read operation is executed.
+//! While the read is in progress calls to `read` will return `WouldBlock`, and
+//! once the read is completed we translate the completion notification into a
+//! `readable` event. Once the internal buffer is drained (e.g. all data from it
+//! has been read) a read operation is re-issued.
+//!
+//! Write operations are a little different from reads, and the current
+//! implementation is to just schedule a write as soon as `write` is first
+//! called. While that write operation is in progress all future calls to
+//! `write` will return `WouldBlock`. Completion of the write then translates to
+//! a `writable` event. Note that this will probably want to add some layer of
+//! internal buffering in the future.
+//!
+//! ## Buffer Management
+//!
+//! As there's lots of I/O operations in flight at any one point in time,
+//! there's lots of live buffers that need to be juggled around (e.g. this
+//! implementation's own internal buffers).
+//!
+//! Currently all buffers are created for the I/O operation at hand and are then
+//! discarded when it completes (this is listed as future work below).
+//!
+//! ## Callback Management
+//!
+//! When the main event loop receives a notification that an I/O operation has
+//! completed, some work needs to be done to translate that to a set of events
+//! or perhaps some more I/O needs to be scheduled. For example after a
+//! `TcpStream` is connected it generates a writable event and also schedules a
+//! read.
+//!
+//! To manage all this the `Selector` uses the `OVERLAPPED` pointer from the
+//! completion status. The selector assumes that all `OVERLAPPED` pointers are
+//! actually pointers to the interior of a `selector::Overlapped` which means
+//! that right after the `OVERLAPPED` itself there's a function pointer. This
+//! function pointer is given the completion status as well as another callback
+//! to push events onto the selector.
+//!
+//! The callback for each I/O operation doesn't have any environment, so it
+//! relies on memory layout and unsafe casting to translate an `OVERLAPPED`
+//! pointer (or in this case a `selector::Overlapped` pointer) to a type of
+//! `FromRawArc<T>` (see module docs for why this type exists).
+//!
+//! ## Thread Safety
+//!
+//! Currently all of the I/O primitives make liberal use of `Arc` and `Mutex`
+//! as an implementation detail. The main reason for this is to ensure that the
+//! types are `Send` and `Sync`, but the implementations have not been stressed
+//! in multithreaded situations yet. As a result, there are bound to be
+//! functional surprises in using these concurrently.
+//!
+//! ## Future Work
+//!
+//! First up, let's take a look at unimplemented portions of this module:
+//!
+//! * The `PollOpt::level()` option is currently entirely unimplemented.
+//! * Each `EventLoop` currently owns its completion port, but this prevents an
+//! I/O handle from being added to multiple event loops (something that can be
+//! done on Unix). Additionally, it hinders event loops moving across threads.
+//! This should be solved by likely having a global `Selector` which all
+//! others then communicate with.
+//! * Although Unix sockets don't exist on Windows, there are named pipes and
+//! those should likely be bound here in a similar fashion to `TcpStream`.
+//!
+//! Next up, there are a few performance improvements and optimizations that can
+//! still be implemented
+//!
+//! * Buffer management right now is pretty bad, they're all just allocated
+//! right before an I/O operation and discarded right after. There should at
+//! least be some form of buffering buffers.
+//! * No calls to `write` are internally buffered before being scheduled, which
+//! means that writing performance is abysmal compared to Unix. There should
+//! be some level of buffering of writes probably.
+
+use std::io;
+use std::os::windows::prelude::*;
+
+use kernel32;
+use winapi;
+
+mod awakener;
+#[macro_use]
+mod selector;
+mod tcp;
+mod udp;
+mod from_raw_arc;
+mod buffer_pool;
+
+pub use self::awakener::Awakener;
+pub use self::selector::{Events, Selector, Overlapped, Binding};
+pub use self::tcp::{TcpStream, TcpListener};
+pub use self::udp::UdpSocket;
+
+#[derive(Copy, Clone)]
+enum Family {
+ V4, V6,
+}
+
+unsafe fn cancel(socket: &AsRawSocket,
+ overlapped: &Overlapped) -> io::Result<()> {
+ let handle = socket.as_raw_socket() as winapi::HANDLE;
+ let ret = kernel32::CancelIoEx(handle, overlapped.as_mut_ptr());
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+}
+
+unsafe fn no_notify_on_instant_completion(handle: winapi::HANDLE) -> io::Result<()> {
+ // TODO: move those to winapi
+ const FILE_SKIP_COMPLETION_PORT_ON_SUCCESS: winapi::UCHAR = 1;
+ const FILE_SKIP_SET_EVENT_ON_HANDLE: winapi::UCHAR = 2;
+
+ let flags = FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE;
+
+ let r = kernel32::SetFileCompletionNotificationModes(handle, flags);
+ if r == winapi::TRUE {
+ Ok(())
+ } else {
+ Err(io::Error::last_os_error())
+ }
+}
diff --git a/third_party/rust/mio/src/sys/windows/selector.rs b/third_party/rust/mio/src/sys/windows/selector.rs
new file mode 100644
index 0000000000..5692084c7a
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/selector.rs
@@ -0,0 +1,534 @@
+#![allow(deprecated)]
+
+use std::{fmt, io};
+use std::cell::UnsafeCell;
+use std::os::windows::prelude::*;
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+use std::time::Duration;
+
+use lazycell::AtomicLazyCell;
+
+use winapi::*;
+use miow;
+use miow::iocp::{CompletionPort, CompletionStatus};
+
+use event_imp::{Event, Evented, Ready};
+use poll::{self, Poll};
+use sys::windows::buffer_pool::BufferPool;
+use {Token, PollOpt};
+
+/// Each Selector has a globally unique(ish) ID associated with it. This ID
+/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
+/// registered with the `Selector`. If a type that is previously associated with
+/// a `Selector` attempts to register itself with a different `Selector`, the
+/// operation will return with an error. This matches windows behavior.
+static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+/// The guts of the Windows event loop, this is the struct which actually owns
+/// a completion port.
+///
+/// Internally this is just an `Arc`, and this allows handing out references to
+/// the internals to I/O handles registered on this selector. This is
+/// required to schedule I/O operations independently of being inside the event
+/// loop (e.g. when a call to `write` is seen we're not "in the event loop").
+pub struct Selector {
+ inner: Arc<SelectorInner>,
+}
+
+struct SelectorInner {
+ /// Unique identifier of the `Selector`
+ id: usize,
+
+ /// The actual completion port that's used to manage all I/O
+ port: CompletionPort,
+
+ /// A pool of buffers usable by this selector.
+ ///
+ /// Primitives will take buffers from this pool to perform I/O operations,
+ /// and once complete they'll be put back in.
+ buffers: Mutex<BufferPool>,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ // offset by 1 to avoid choosing 0 as the id of a selector
+ let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
+
+ CompletionPort::new(0).map(|cp| {
+ Selector {
+ inner: Arc::new(SelectorInner {
+ id: id,
+ port: cp,
+ buffers: Mutex::new(BufferPool::new(256)),
+ }),
+ }
+ })
+ }
+
+ pub fn select(&self,
+ events: &mut Events,
+ awakener: Token,
+ timeout: Option<Duration>) -> io::Result<bool> {
+ trace!("select; timeout={:?}", timeout);
+
+ // Clear out the previous list of I/O events and get some more!
+ events.clear();
+
+ trace!("polling IOCP");
+ let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
+ Ok(statuses) => statuses.len(),
+ Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
+ Err(e) => return Err(e),
+ };
+
+ let mut ret = false;
+ for status in events.statuses[..n].iter() {
+ // This should only ever happen from the awakener, and we should
+ // only ever have one awakener right now, so assert as such.
+ if status.overlapped() as usize == 0 {
+ assert_eq!(status.token(), usize::from(awakener));
+ ret = true;
+ continue;
+ }
+
+ let callback = unsafe {
+ (*(status.overlapped() as *mut Overlapped)).callback
+ };
+
+ trace!("select; -> got overlapped");
+ callback(status.entry());
+ }
+
+ trace!("returning");
+ Ok(ret)
+ }
+
+ /// Gets a reference to the underlying `CompletionPort` structure.
+ pub fn port(&self) -> &CompletionPort {
+ &self.inner.port
+ }
+
+ /// Gets a new reference to this selector, although all underlying data
+ /// structures will refer to the same completion port.
+ pub fn clone_ref(&self) -> Selector {
+ Selector { inner: self.inner.clone() }
+ }
+
+ /// Return the `Selector`'s identifier
+ pub fn id(&self) -> usize {
+ self.inner.id
+ }
+}
+
+impl SelectorInner {
+ fn identical(&self, other: &SelectorInner) -> bool {
+ (self as *const SelectorInner) == (other as *const SelectorInner)
+ }
+}
+
+// A registration is stored in each I/O object which keeps track of how it is
+// associated with a `Selector` above.
+//
+// Once associated with a `Selector`, a registration can never be un-associated
+// (due to IOCP requirements). This is actually implemented through the
+// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
+// level/edge/filtering business.
+/// A `Binding` is embedded in all I/O objects associated with a `Poll`
+/// object.
+///
+/// Each registration keeps track of which selector the I/O object is
+/// associated with, ensuring that implementations of `Evented` can be
+/// conformant for the various methods on Windows.
+///
+/// If you're working with custom IOCP-enabled objects then you'll want to
+/// ensure that one of these instances is stored in your object and used in the
+/// implementation of `Evented`.
+///
+/// For more information about how to use this see the `windows` module
+/// documentation in this crate.
+pub struct Binding {
+ selector: AtomicLazyCell<Arc<SelectorInner>>,
+}
+
+impl Binding {
+ /// Creates a new blank binding ready to be inserted into an I/O
+ /// object.
+ ///
+ /// Won't actually do anything until associated with a `Poll` loop.
+ pub fn new() -> Binding {
+ Binding { selector: AtomicLazyCell::new() }
+ }
+
+ /// Registers a new handle with the `Poll` specified, also assigning the
+ /// `token` specified.
+ ///
+ /// This function is intended to be used as part of `Evented::register` for
+ /// custom IOCP objects. It will add the specified handle to the internal
+ /// IOCP object with the provided `token`. All future events generated by
+ /// the handled provided will be received by the `Poll`'s internal IOCP
+ /// object.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as the `Poll` instance has assumptions about
+ /// what the `OVERLAPPED` pointer used for each I/O operation looks like.
+ /// Specifically they must all be instances of the `Overlapped` type in
+ /// this crate. More information about this can be found on the
+ /// `windows` module in this crate.
+ pub unsafe fn register_handle(&self,
+ handle: &AsRawHandle,
+ token: Token,
+ poll: &Poll) -> io::Result<()> {
+ let selector = poll::selector(poll);
+
+ // Ignore errors, we'll see them on the next line.
+ drop(self.selector.fill(selector.inner.clone()));
+ self.check_same_selector(poll)?;
+
+ selector.inner.port.add_handle(usize::from(token), handle)
+ }
+
+ /// Same as `register_handle` but for sockets.
+ pub unsafe fn register_socket(&self,
+ handle: &AsRawSocket,
+ token: Token,
+ poll: &Poll) -> io::Result<()> {
+ let selector = poll::selector(poll);
+ drop(self.selector.fill(selector.inner.clone()));
+ self.check_same_selector(poll)?;
+ selector.inner.port.add_socket(usize::from(token), handle)
+ }
+
+ /// Reregisters the handle provided from the `Poll` provided.
+ ///
+ /// This is intended to be used as part of `Evented::reregister` but note
+ /// that this function does not currently reregister the provided handle
+ /// with the `poll` specified. IOCP has a special binding for changing the
+ /// token which has not yet been implemented. Instead this function should
+ /// be used to assert that the call to `reregister` happened on the same
+ /// `Poll` that was passed into to `register`.
+ ///
+ /// Eventually, though, the provided `handle` will be re-assigned to have
+ /// the token `token` on the given `poll` assuming that it's been
+ /// previously registered with it.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe for similar reasons to `register`. That is,
+ /// there may be pending I/O events and such which aren't handled correctly.
+ pub unsafe fn reregister_handle(&self,
+ _handle: &AsRawHandle,
+ _token: Token,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ /// Same as `reregister_handle`, but for sockets.
+ pub unsafe fn reregister_socket(&self,
+ _socket: &AsRawSocket,
+ _token: Token,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ /// Deregisters the handle provided from the `Poll` provided.
+ ///
+ /// This is intended to be used as part of `Evented::deregister` but note
+ /// that this function does not currently deregister the provided handle
+ /// from the `poll` specified. IOCP has a special binding for that which has
+ /// not yet been implemented. Instead this function should be used to assert
+ /// that the call to `deregister` happened on the same `Poll` that was
+ /// passed into to `register`.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe for similar reasons to `register`. That is,
+ /// there may be pending I/O events and such which aren't handled correctly.
+ pub unsafe fn deregister_handle(&self,
+ _handle: &AsRawHandle,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ /// Same as `deregister_handle`, but for sockets.
+ pub unsafe fn deregister_socket(&self,
+ _socket: &AsRawSocket,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
+ let selector = poll::selector(poll);
+ match self.selector.borrow() {
+ Some(prev) if prev.identical(&selector.inner) => Ok(()),
+ Some(_) |
+ None => Err(other("socket already registered")),
+ }
+ }
+}
+
+impl fmt::Debug for Binding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Binding")
+ .finish()
+ }
+}
+
+/// Helper struct used for TCP and UDP which bundles a `binding` with a
+/// `SetReadiness` handle.
+pub struct ReadyBinding {
+ binding: Binding,
+ readiness: Option<poll::SetReadiness>,
+}
+
+impl ReadyBinding {
+ /// Creates a new blank binding ready to be inserted into an I/O object.
+ ///
+ /// Won't actually do anything until associated with an `Selector` loop.
+ pub fn new() -> ReadyBinding {
+ ReadyBinding {
+ binding: Binding::new(),
+ readiness: None,
+ }
+ }
+
+ /// Returns whether this binding has been associated with a selector
+ /// yet.
+ pub fn registered(&self) -> bool {
+ self.readiness.is_some()
+ }
+
+ /// Acquires a buffer with at least `size` capacity.
+ ///
+ /// If associated with a selector, this will attempt to pull a buffer from
+ /// that buffer pool. If not associated with a selector, this will allocate
+ /// a fresh buffer.
+ pub fn get_buffer(&self, size: usize) -> Vec<u8> {
+ match self.binding.selector.borrow() {
+ Some(i) => i.buffers.lock().unwrap().get(size),
+ None => Vec::with_capacity(size),
+ }
+ }
+
+ /// Returns a buffer to this binding.
+ ///
+ /// If associated with a selector, this will push the buffer back into the
+ /// selector's pool of buffers. Otherwise this will just drop the buffer.
+ pub fn put_buffer(&self, buf: Vec<u8>) {
+ if let Some(i) = self.binding.selector.borrow() {
+ i.buffers.lock().unwrap().put(buf);
+ }
+ }
+
+ /// Sets the readiness of this I/O object to a particular `set`.
+ ///
+ /// This is later used to fill out and respond to requests to `poll`. Note
+ /// that this is all implemented through the `SetReadiness` structure in the
+ /// `poll` module.
+ pub fn set_readiness(&self, set: Ready) {
+ if let Some(ref i) = self.readiness {
+ trace!("set readiness to {:?}", set);
+ i.set_readiness(set).expect("event loop disappeared?");
+ }
+ }
+
+ /// Queries what the current readiness of this I/O object is.
+ ///
+ /// This is what's being used to generate events returned by `poll`.
+ pub fn readiness(&self) -> Ready {
+ match self.readiness {
+ Some(ref i) => i.readiness(),
+ None => Ready::empty(),
+ }
+ }
+
+ /// Implementation of the `Evented::register` function essentially.
+ ///
+ /// Returns an error if we're already registered with another event loop,
+ /// and otherwise just reassociates ourselves with the event loop to
+ /// possible change tokens.
+ pub fn register_socket(&mut self,
+ socket: &AsRawSocket,
+ poll: &Poll,
+ token: Token,
+ events: Ready,
+ opts: PollOpt,
+ registration: &Mutex<Option<poll::Registration>>)
+ -> io::Result<()> {
+ trace!("register {:?} {:?}", token, events);
+ unsafe {
+ self.binding.register_socket(socket, token, poll)?;
+ }
+
+ let (r, s) = poll::new_registration(poll, token, events, opts);
+ self.readiness = Some(s);
+ *registration.lock().unwrap() = Some(r);
+ Ok(())
+ }
+
+ /// Implementation of `Evented::reregister` function.
+ pub fn reregister_socket(&mut self,
+ socket: &AsRawSocket,
+ poll: &Poll,
+ token: Token,
+ events: Ready,
+ opts: PollOpt,
+ registration: &Mutex<Option<poll::Registration>>)
+ -> io::Result<()> {
+ trace!("reregister {:?} {:?}", token, events);
+ unsafe {
+ self.binding.reregister_socket(socket, token, poll)?;
+ }
+
+ registration.lock().unwrap()
+ .as_mut().unwrap()
+ .reregister(poll, token, events, opts)
+ }
+
+ /// Implementation of the `Evented::deregister` function.
+ ///
+ /// Doesn't allow registration with another event loop, just shuts down
+ /// readiness notifications and such.
+ pub fn deregister(&mut self,
+ socket: &AsRawSocket,
+ poll: &Poll,
+ registration: &Mutex<Option<poll::Registration>>)
+ -> io::Result<()> {
+ trace!("deregistering");
+ unsafe {
+ self.binding.deregister_socket(socket, poll)?;
+ }
+
+ registration.lock().unwrap()
+ .as_ref().unwrap()
+ .deregister(poll)
+ }
+}
+
+fn other(s: &str) -> io::Error {
+ io::Error::new(io::ErrorKind::Other, s)
+}
+
+#[derive(Debug)]
+pub struct Events {
+ /// Raw I/O event completions are filled in here by the call to `get_many`
+ /// on the completion port above. These are then processed to run callbacks
+ /// which figure out what to do after the event is done.
+ statuses: Box<[CompletionStatus]>,
+
+ /// Literal events returned by `get` to the upwards `EventLoop`. This file
+ /// doesn't really modify this (except for the awakener), instead almost all
+ /// events are filled in by the `ReadinessQueue` from the `poll` module.
+ events: Vec<Event>,
+}
+
+impl Events {
+ pub fn with_capacity(cap: usize) -> Events {
+ // Note that it's possible for the output `events` to grow beyond the
+ // capacity as it can also include deferred events, but that's certainly
+ // not the end of the world!
+ Events {
+ statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
+ events: Vec::with_capacity(cap),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.events.is_empty()
+ }
+
+ pub fn len(&self) -> usize {
+ self.events.len()
+ }
+
+ pub fn capacity(&self) -> usize {
+ self.events.capacity()
+ }
+
+ pub fn get(&self, idx: usize) -> Option<Event> {
+ self.events.get(idx).map(|e| *e)
+ }
+
+ pub fn push_event(&mut self, event: Event) {
+ self.events.push(event);
+ }
+
+ pub fn clear(&mut self) {
+ self.events.truncate(0);
+ }
+}
+
+macro_rules! overlapped2arc {
+ ($e:expr, $t:ty, $($field:ident).+) => ({
+ let offset = offset_of!($t, $($field).+);
+ debug_assert!(offset < mem::size_of::<$t>());
+ FromRawArc::from_raw(($e as usize - offset) as *mut $t)
+ })
+}
+
+macro_rules! offset_of {
+ ($t:ty, $($field:ident).+) => (
+ &(*(0 as *const $t)).$($field).+ as *const _ as usize
+ )
+}
+
+// See sys::windows module docs for why this exists.
+//
+// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
+// actually inside one of these structures so it can use the `Callback` stored
+// right after it.
+//
+// We use repr(C) here to ensure that we can assume the overlapped pointer is
+// at the start of the structure so we can just do a cast.
+/// A wrapper around an internal instance over `miow::Overlapped` which is in
+/// turn a wrapper around the Windows type `OVERLAPPED`.
+///
+/// This type is required to be used for all IOCP operations on handles that are
+/// registered with an event loop. The event loop will receive notifications
+/// over `OVERLAPPED` pointers that have completed, and it will cast that
+/// pointer to a pointer to this structure and invoke the associated callback.
+#[repr(C)]
+pub struct Overlapped {
+ inner: UnsafeCell<miow::Overlapped>,
+ callback: fn(&OVERLAPPED_ENTRY),
+}
+
+impl Overlapped {
+ /// Creates a new `Overlapped` which will invoke the provided `cb` callback
+ /// whenever it's triggered.
+ ///
+ /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
+ /// I/O operations that are registered with mio's event loop. When the I/O
+ /// operation associated with an `OVERLAPPED` pointer completes the event
+ /// loop will invoke the function pointer provided by `cb`.
+ pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
+ Overlapped {
+ inner: UnsafeCell::new(miow::Overlapped::zero()),
+ callback: cb,
+ }
+ }
+
+ /// Get the underlying `Overlapped` instance as a raw pointer.
+ ///
+ /// This can be useful when only a shared borrow is held and the overlapped
+ /// pointer needs to be passed down to winapi.
+ pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
+ unsafe {
+ (*self.inner.get()).raw()
+ }
+ }
+}
+
+impl fmt::Debug for Overlapped {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Overlapped")
+ .finish()
+ }
+}
+
+// Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as
+// unsafe as they must be used with caution to ensure thread safety. The
+// structure itself is safe to send across threads.
+unsafe impl Send for Overlapped {}
+unsafe impl Sync for Overlapped {}
diff --git a/third_party/rust/mio/src/sys/windows/tcp.rs b/third_party/rust/mio/src/sys/windows/tcp.rs
new file mode 100644
index 0000000000..f2ab39f5cf
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/tcp.rs
@@ -0,0 +1,851 @@
+use std::fmt;
+use std::io::{self, Read, ErrorKind};
+use std::mem;
+use std::net::{self, SocketAddr, Shutdown};
+use std::os::windows::prelude::*;
+use std::sync::{Mutex, MutexGuard};
+use std::time::Duration;
+
+use miow::iocp::CompletionStatus;
+use miow::net::*;
+use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
+use winapi::*;
+use iovec::IoVec;
+
+use {poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::windows::from_raw_arc::FromRawArc;
+use sys::windows::selector::{Overlapped, ReadyBinding};
+use sys::windows::Family;
+
+pub struct TcpStream {
+ /// Separately stored implementation to ensure that the `Drop`
+ /// implementation on this type is only executed when it's actually dropped
+ /// (many clones of this `imp` are made).
+ imp: StreamImp,
+ registration: Mutex<Option<poll::Registration>>,
+}
+
+pub struct TcpListener {
+ imp: ListenerImp,
+ registration: Mutex<Option<poll::Registration>>,
+}
+
+#[derive(Clone)]
+struct StreamImp {
+ /// A stable address and synchronized access for all internals. This serves
+ /// to ensure that all `Overlapped` pointers are valid for a long period of
+ /// time as well as allowing completion callbacks to have access to the
+ /// internals without having ownership.
+ ///
+ /// Note that the reference count also allows us "loan out" copies to
+ /// completion ports while I/O is running to guarantee that this stays alive
+ /// until the I/O completes. You'll notice a number of calls to
+ /// `mem::forget` below, and these only happen on successful scheduling of
+ /// I/O and are paired with `overlapped2arc!` macro invocations in the
+ /// completion callbacks (to have a decrement match the increment).
+ inner: FromRawArc<StreamIo>,
+}
+
+#[derive(Clone)]
+struct ListenerImp {
+ inner: FromRawArc<ListenerIo>,
+}
+
+struct StreamIo {
+ inner: Mutex<StreamInner>,
+ read: Overlapped, // also used for connect
+ write: Overlapped,
+ socket: net::TcpStream,
+}
+
+struct ListenerIo {
+ inner: Mutex<ListenerInner>,
+ accept: Overlapped,
+ family: Family,
+ socket: net::TcpListener,
+}
+
+struct StreamInner {
+ iocp: ReadyBinding,
+ deferred_connect: Option<SocketAddr>,
+ read: State<(), ()>,
+ write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
+ /// whether we are instantly notified of success
+ /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
+ /// without a roundtrip through the event loop)
+ instant_notify: bool,
+}
+
+struct ListenerInner {
+ iocp: ReadyBinding,
+ accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
+ accept_buf: AcceptAddrsBuf,
+ instant_notify: bool,
+}
+
+enum State<T, U> {
+ Empty, // no I/O operation in progress
+ Pending(T), // an I/O operation is in progress
+ Ready(U), // I/O has finished with this value
+ Error(io::Error), // there was an I/O error
+}
+
+impl TcpStream {
+ fn new(socket: net::TcpStream,
+ deferred_connect: Option<SocketAddr>) -> TcpStream {
+ TcpStream {
+ registration: Mutex::new(None),
+ imp: StreamImp {
+ inner: FromRawArc::new(StreamIo {
+ read: Overlapped::new(read_done),
+ write: Overlapped::new(write_done),
+ socket: socket,
+ inner: Mutex::new(StreamInner {
+ iocp: ReadyBinding::new(),
+ deferred_connect: deferred_connect,
+ read: State::Empty,
+ write: State::Empty,
+ instant_notify: false,
+ }),
+ }),
+ },
+ }
+ }
+
+ pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
+ -> io::Result<TcpStream> {
+ socket.set_nonblocking(true)?;
+ Ok(TcpStream::new(socket, Some(*addr)))
+ }
+
+ pub fn from_stream(stream: net::TcpStream) -> TcpStream {
+ TcpStream::new(stream, None)
+ }
+
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.peer_addr()
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpStream> {
+ self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None))
+ }
+
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.imp.inner.socket.shutdown(how)
+ }
+
+ pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_nodelay(nodelay)
+ }
+
+ pub fn nodelay(&self) -> io::Result<bool> {
+ self.imp.inner.socket.nodelay()
+ }
+
+ pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.imp.inner.socket.set_recv_buffer_size(size)
+ }
+
+ pub fn recv_buffer_size(&self) -> io::Result<usize> {
+ self.imp.inner.socket.recv_buffer_size()
+ }
+
+ pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.imp.inner.socket.set_send_buffer_size(size)
+ }
+
+ pub fn send_buffer_size(&self) -> io::Result<usize> {
+ self.imp.inner.socket.send_buffer_size()
+ }
+
+ pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
+ self.imp.inner.socket.set_keepalive(keepalive)
+ }
+
+ pub fn keepalive(&self) -> io::Result<Option<Duration>> {
+ self.imp.inner.socket.keepalive()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.imp.inner.socket.ttl()
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.only_v6()
+ }
+
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ self.imp.inner.socket.set_linger(dur)
+ }
+
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ self.imp.inner.socket.linger()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ if let Some(e) = self.imp.inner.socket.take_error()? {
+ return Ok(Some(e))
+ }
+
+ // If the syscall didn't return anything then also check to see if we've
+ // squirreled away an error elsewhere for example as part of a connect
+ // operation.
+ //
+ // Typically this is used like so:
+ //
+ // 1. A `connect` is issued
+ // 2. Wait for the socket to be writable
+ // 3. Call `take_error` to see if the connect succeeded.
+ //
+ // Right now the `connect` operation finishes in `read_done` below and
+ // fill will in `State::Error` in the `read` slot if it fails, so we
+ // extract that here.
+ let mut me = self.inner();
+ match mem::replace(&mut me.read, State::Empty) {
+ State::Error(e) => {
+ self.imp.schedule_read(&mut me);
+ Ok(Some(e))
+ }
+ other => {
+ me.read = other;
+ Ok(None)
+ }
+ }
+ }
+
+ fn inner(&self) -> MutexGuard<StreamInner> {
+ self.imp.inner()
+ }
+
+ fn before_read(&self) -> io::Result<MutexGuard<StreamInner>> {
+ let mut me = self.inner();
+
+ match me.read {
+ // Empty == we're not associated yet, and if we're pending then
+ // these are both cases where we return "would block"
+ State::Empty |
+ State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()),
+
+ // If we got a delayed error as part of a `read_overlapped` below,
+ // return that here. Also schedule another read in case it was
+ // transient.
+ State::Error(_) => {
+ let e = match mem::replace(&mut me.read, State::Empty) {
+ State::Error(e) => e,
+ _ => panic!(),
+ };
+ self.imp.schedule_read(&mut me);
+ return Err(e)
+ }
+
+ // If we're ready for a read then some previous 0-byte read has
+ // completed. In that case the OS's socket buffer has something for
+ // us, so we just keep pulling out bytes while we can in the loop
+ // below.
+ State::Ready(()) => {}
+ }
+
+ Ok(me)
+ }
+
+ fn post_register(&self, interest: Ready, me: &mut StreamInner) {
+ if interest.is_readable() {
+ self.imp.schedule_read(me);
+ }
+
+ // At least with epoll, if a socket is registered with an interest in
+ // writing and it's immediately writable then a writable event is
+ // generated immediately, so do so here.
+ if interest.is_writable() {
+ if let State::Empty = me.write {
+ self.imp.add_readiness(me, Ready::writable());
+ }
+ }
+ }
+
+ pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ match IoVec::from_bytes_mut(buf) {
+ Some(vec) => self.readv(&mut [vec]),
+ None => Ok(0),
+ }
+ }
+
+ pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+ let mut me = self.before_read()?;
+
+ match (&self.imp.inner.socket).peek(buf) {
+ Ok(n) => Ok(n),
+ Err(e) => {
+ me.read = State::Empty;
+ self.imp.schedule_read(&mut me);
+ Err(e)
+ }
+ }
+ }
+
+ pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+ let mut me = self.before_read()?;
+
+ // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to
+ // call that instead of looping over all the buffers and calling
+ // `recv` on each buffer. I'm not sure though if an overlapped
+ // socket in nonblocking mode would work with that use case,
+ // however, so for now we just call `recv`.
+
+ let mut amt = 0;
+ for buf in bufs {
+ match (&self.imp.inner.socket).read(buf) {
+ // If we did a partial read, then return what we've read so far
+ Ok(n) if n < buf.len() => return Ok(amt + n),
+
+ // Otherwise filled this buffer entirely, so try to fill the
+ // next one as well.
+ Ok(n) => amt += n,
+
+ // If we hit an error then things get tricky if we've already
+ // read some data. If the error is "would block" then we just
+ // return the data we've read so far while scheduling another
+ // 0-byte read.
+ //
+ // If we've read data and the error kind is not "would block",
+ // then we stash away the error to get returned later and return
+ // the data that we've read.
+ //
+ // Finally if we haven't actually read any data we just
+ // reschedule a 0-byte read to happen again and then return the
+ // error upwards.
+ Err(e) => {
+ if amt > 0 && e.kind() == io::ErrorKind::WouldBlock {
+ me.read = State::Empty;
+ self.imp.schedule_read(&mut me);
+ return Ok(amt)
+ } else if amt > 0 {
+ me.read = State::Error(e);
+ return Ok(amt)
+ } else {
+ me.read = State::Empty;
+ self.imp.schedule_read(&mut me);
+ return Err(e)
+ }
+ }
+ }
+ }
+
+ Ok(amt)
+ }
+
+ pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
+ match IoVec::from_bytes(buf) {
+ Some(vec) => self.writev(&[vec]),
+ None => Ok(0),
+ }
+ }
+
+ pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+ let mut me = self.inner();
+ let me = &mut *me;
+
+ match mem::replace(&mut me.write, State::Empty) {
+ State::Empty => {}
+ State::Error(e) => return Err(e),
+ other => {
+ me.write = other;
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+ }
+
+ if !me.iocp.registered() {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ if bufs.is_empty() {
+ return Ok(0)
+ }
+
+ let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b);
+ let mut intermediate = me.iocp.get_buffer(len);
+ for buf in bufs {
+ intermediate.extend_from_slice(buf);
+ }
+ self.imp.schedule_write(intermediate, 0, me);
+ Ok(len)
+ }
+
+ pub fn flush(&self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl StreamImp {
+ fn inner(&self) -> MutexGuard<StreamInner> {
+ self.inner.inner.lock().unwrap()
+ }
+
+ fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
+ unsafe {
+ trace!("scheduling a connect");
+ self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
+ }
+ // see docs above on StreamImp.inner for rationale on forget
+ mem::forget(self.clone());
+ Ok(())
+ }
+
+ /// Schedule a read to happen on this socket, enqueuing us to receive a
+ /// notification when a read is ready.
+ ///
+ /// Note that this does *not* work with a buffer. When reading a TCP stream
+ /// we actually read into a 0-byte buffer so Windows will send us a
+ /// notification when the socket is otherwise ready for reading. This allows
+ /// us to avoid buffer allocations for in-flight reads.
+ fn schedule_read(&self, me: &mut StreamInner) {
+ match me.read {
+ State::Empty => {}
+ State::Ready(_) | State::Error(_) => {
+ self.add_readiness(me, Ready::readable());
+ return;
+ }
+ _ => return,
+ }
+
+ me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
+
+ trace!("scheduling a read");
+ let res = unsafe {
+ self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr())
+ };
+ match res {
+ // Note that `Ok(true)` means that this completed immediately and
+ // our socket is readable. This typically means that the caller of
+ // this function (likely `read` above) can try again as an
+ // optimization and return bytes quickly.
+ //
+ // Normally, though, although the read completed immediately
+ // there's still an IOCP completion packet enqueued that we're going
+ // to receive.
+ //
+ // You can configure this behavior (miow) with
+ // SetFileCompletionNotificationModes to indicate that `Ok(true)`
+ // does **not** enqueue a completion packet. (This is the case
+ // for me.instant_notify)
+ //
+ // Note that apparently libuv has scary code to work around bugs in
+ // `WSARecv` for UDP sockets apparently for handles which have had
+ // the `SetFileCompletionNotificationModes` function called on them,
+ // worth looking into!
+ Ok(Some(_)) if me.instant_notify => {
+ me.read = State::Ready(());
+ self.add_readiness(me, Ready::readable());
+ }
+ Ok(_) => {
+ // see docs above on StreamImp.inner for rationale on forget
+ me.read = State::Pending(());
+ mem::forget(self.clone());
+ }
+ Err(e) => {
+ me.read = State::Error(e);
+ self.add_readiness(me, Ready::readable());
+ }
+ }
+ }
+
+ /// Similar to `schedule_read`, except that this issues, well, writes.
+ ///
+ /// This function will continually attempt to write the entire contents of
+ /// the buffer `buf` until they have all been written. The `pos` argument is
+ /// the current offset within the buffer up to which the contents have
+ /// already been written.
+ ///
+ /// A new writable event (e.g. allowing another write) will only happen once
+ /// the buffer has been written completely (or hit an error).
+ fn schedule_write(&self,
+ buf: Vec<u8>,
+ mut pos: usize,
+ me: &mut StreamInner) {
+
+ // About to write, clear any pending level triggered events
+ me.iocp.set_readiness(me.iocp.readiness() - Ready::writable());
+
+ loop {
+ trace!("scheduling a write of {} bytes", buf[pos..].len());
+ let ret = unsafe {
+ self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr())
+ };
+ match ret {
+ Ok(Some(transferred_bytes)) if me.instant_notify => {
+ trace!("done immediately with {} bytes", transferred_bytes);
+ if transferred_bytes == buf.len() - pos {
+ self.add_readiness(me, Ready::writable());
+ me.write = State::Empty;
+ break;
+ }
+ pos += transferred_bytes;
+ }
+ Ok(_) => {
+ trace!("scheduled for later");
+ // see docs above on StreamImp.inner for rationale on forget
+ me.write = State::Pending((buf, pos));
+ mem::forget(self.clone());
+ break;
+ }
+ Err(e) => {
+ trace!("write error: {}", e);
+ me.write = State::Error(e);
+ self.add_readiness(me, Ready::writable());
+ me.iocp.put_buffer(buf);
+ break;
+ }
+ }
+ }
+ }
+
+ /// Pushes an event for this socket onto the selector its registered for.
+ ///
+ /// When an event is generated on this socket, if it happened after the
+ /// socket was closed then we don't want to actually push the event onto our
+ /// selector as otherwise it's just a spurious notification.
+ fn add_readiness(&self, me: &mut StreamInner, set: Ready) {
+ me.iocp.set_readiness(set | me.iocp.readiness());
+ }
+}
+
+fn read_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ let me2 = StreamImp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
+ };
+
+ let mut me = me2.inner();
+ match mem::replace(&mut me.read, State::Empty) {
+ State::Pending(()) => {
+ trace!("finished a read: {}", status.bytes_transferred());
+ assert_eq!(status.bytes_transferred(), 0);
+ me.read = State::Ready(());
+ return me2.add_readiness(&mut me, Ready::readable())
+ }
+ s => me.read = s,
+ }
+
+ // If a read didn't complete, then the connect must have just finished.
+ trace!("finished a connect");
+
+ // By guarding with socket.result(), we ensure that a connection
+ // was successfully made before performing operations requiring a
+ // connected socket.
+ match unsafe { me2.inner.socket.result(status.overlapped()) }
+ .and_then(|_| me2.inner.socket.connect_complete())
+ {
+ Ok(()) => {
+ me2.add_readiness(&mut me, Ready::writable());
+ me2.schedule_read(&mut me);
+ }
+ Err(e) => {
+ me2.add_readiness(&mut me, Ready::readable() | Ready::writable());
+ me.read = State::Error(e);
+ }
+ }
+}
+
+fn write_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ trace!("finished a write {}", status.bytes_transferred());
+ let me2 = StreamImp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
+ };
+ let mut me = me2.inner();
+ let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
+ State::Pending(pair) => pair,
+ _ => unreachable!(),
+ };
+ let new_pos = pos + (status.bytes_transferred() as usize);
+ if new_pos == buf.len() {
+ me2.add_readiness(&mut me, Ready::writable());
+ } else {
+ me2.schedule_write(buf, new_pos, &mut me);
+ }
+}
+
+impl Evented for TcpStream {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.register_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+
+ unsafe {
+ super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
+ me.instant_notify = true;
+ }
+
+ // If we were connected before being registered process that request
+ // here and go along our merry ways. Note that the callback for a
+ // successful connect will worry about generating writable/readable
+ // events and scheduling a new read.
+ if let Some(addr) = me.deferred_connect.take() {
+ return self.imp.schedule_connect(&addr).map(|_| ())
+ }
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner().iocp.deregister(&self.imp.inner.socket,
+ poll, &self.registration)
+ }
+}
+
+impl fmt::Debug for TcpStream {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("TcpStream")
+ .finish()
+ }
+}
+
+impl Drop for TcpStream {
+ fn drop(&mut self) {
+ // If we're still internally reading, we're no longer interested. Note
+ // though that we don't cancel any writes which may have been issued to
+ // preserve the same semantics as Unix.
+ //
+ // Note that "Empty" here may mean that a connect is pending, so we
+ // cancel even if that happens as well.
+ unsafe {
+ match self.inner().read {
+ State::Pending(_) | State::Empty => {
+ trace!("cancelling active TCP read");
+ drop(super::cancel(&self.imp.inner.socket,
+ &self.imp.inner.read));
+ }
+ State::Ready(_) | State::Error(_) => {}
+ }
+ }
+ }
+}
+
+impl TcpListener {
+ pub fn new(socket: net::TcpListener)
+ -> io::Result<TcpListener> {
+ let addr = socket.local_addr()?;
+ Ok(TcpListener::new_family(socket, match addr {
+ SocketAddr::V4(..) => Family::V4,
+ SocketAddr::V6(..) => Family::V6,
+ }))
+ }
+
+ fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
+ TcpListener {
+ registration: Mutex::new(None),
+ imp: ListenerImp {
+ inner: FromRawArc::new(ListenerIo {
+ accept: Overlapped::new(accept_done),
+ family: family,
+ socket: socket,
+ inner: Mutex::new(ListenerInner {
+ iocp: ReadyBinding::new(),
+ accept: State::Empty,
+ accept_buf: AcceptAddrsBuf::new(),
+ instant_notify: false,
+ }),
+ }),
+ },
+ }
+ }
+
+ pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> {
+ let mut me = self.inner();
+
+ let ret = match mem::replace(&mut me.accept, State::Empty) {
+ State::Empty => return Err(io::ErrorKind::WouldBlock.into()),
+ State::Pending(t) => {
+ me.accept = State::Pending(t);
+ return Err(io::ErrorKind::WouldBlock.into());
+ }
+ State::Ready((s, a)) => Ok((s, a)),
+ State::Error(e) => Err(e),
+ };
+
+ self.imp.schedule_accept(&mut me);
+
+ return ret
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpListener> {
+ self.imp.inner.socket.try_clone().map(|s| {
+ TcpListener::new_family(s, self.imp.inner.family)
+ })
+ }
+
+ #[allow(deprecated)]
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_only_v6(only_v6)
+ }
+
+ #[allow(deprecated)]
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.only_v6()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.imp.inner.socket.ttl()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.imp.inner.socket.take_error()
+ }
+
+ fn inner(&self) -> MutexGuard<ListenerInner> {
+ self.imp.inner()
+ }
+}
+
+impl ListenerImp {
+ fn inner(&self) -> MutexGuard<ListenerInner> {
+ self.inner.inner.lock().unwrap()
+ }
+
+ fn schedule_accept(&self, me: &mut ListenerInner) {
+ match me.accept {
+ State::Empty => {}
+ _ => return
+ }
+
+ me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
+
+ let res = match self.inner.family {
+ Family::V4 => TcpBuilder::new_v4(),
+ Family::V6 => TcpBuilder::new_v6(),
+ }.and_then(|builder| unsafe {
+ trace!("scheduling an accept");
+ self.inner.socket.accept_overlapped(&builder, &mut me.accept_buf,
+ self.inner.accept.as_mut_ptr())
+ });
+ match res {
+ Ok((socket, _)) => {
+ // see docs above on StreamImp.inner for rationale on forget
+ me.accept = State::Pending(socket);
+ mem::forget(self.clone());
+ }
+ Err(e) => {
+ me.accept = State::Error(e);
+ self.add_readiness(me, Ready::readable());
+ }
+ }
+ }
+
+ // See comments in StreamImp::push
+ fn add_readiness(&self, me: &mut ListenerInner, set: Ready) {
+ me.iocp.set_readiness(set | me.iocp.readiness());
+ }
+}
+
+fn accept_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ let me2 = ListenerImp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
+ };
+
+ let mut me = me2.inner();
+ let socket = match mem::replace(&mut me.accept, State::Empty) {
+ State::Pending(s) => s,
+ _ => unreachable!(),
+ };
+ trace!("finished an accept");
+ let result = me2.inner.socket.accept_complete(&socket).and_then(|()| {
+ me.accept_buf.parse(&me2.inner.socket)
+ }).and_then(|buf| {
+ buf.remote().ok_or_else(|| {
+ io::Error::new(ErrorKind::Other, "could not obtain remote address")
+ })
+ });
+ me.accept = match result {
+ Ok(remote_addr) => State::Ready((socket, remote_addr)),
+ Err(e) => State::Error(e),
+ };
+ me2.add_readiness(&mut me, Ready::readable());
+}
+
+impl Evented for TcpListener {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.register_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+
+ unsafe {
+ super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
+ me.instant_notify = true;
+ }
+
+ self.imp.schedule_accept(&mut me);
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+ self.imp.schedule_accept(&mut me);
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner().iocp.deregister(&self.imp.inner.socket,
+ poll, &self.registration)
+ }
+}
+
+impl fmt::Debug for TcpListener {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("TcpListener")
+ .finish()
+ }
+}
+
+impl Drop for TcpListener {
+ fn drop(&mut self) {
+ // If we're still internally reading, we're no longer interested.
+ unsafe {
+ match self.inner().accept {
+ State::Pending(_) => {
+ trace!("cancelling active TCP accept");
+ drop(super::cancel(&self.imp.inner.socket,
+ &self.imp.inner.accept));
+ }
+ State::Empty |
+ State::Ready(_) |
+ State::Error(_) => {}
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio/src/sys/windows/udp.rs b/third_party/rust/mio/src/sys/windows/udp.rs
new file mode 100644
index 0000000000..4d3fc040fd
--- /dev/null
+++ b/third_party/rust/mio/src/sys/windows/udp.rs
@@ -0,0 +1,413 @@
+//! UDP for IOCP
+//!
+//! Note that most of this module is quite similar to the TCP module, so if
+//! something seems odd you may also want to try the docs over there.
+
+use std::fmt;
+use std::io::prelude::*;
+use std::io;
+use std::mem;
+use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::sync::{Mutex, MutexGuard};
+
+#[allow(unused_imports)]
+use net2::{UdpBuilder, UdpSocketExt};
+use winapi::*;
+use miow::iocp::CompletionStatus;
+use miow::net::SocketAddrBuf;
+use miow::net::UdpSocketExt as MiowUdpSocketExt;
+
+use {poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::windows::from_raw_arc::FromRawArc;
+use sys::windows::selector::{Overlapped, ReadyBinding};
+
+pub struct UdpSocket {
+ imp: Imp,
+ registration: Mutex<Option<poll::Registration>>,
+}
+
+#[derive(Clone)]
+struct Imp {
+ inner: FromRawArc<Io>,
+}
+
+struct Io {
+ read: Overlapped,
+ write: Overlapped,
+ socket: net::UdpSocket,
+ inner: Mutex<Inner>,
+}
+
+struct Inner {
+ iocp: ReadyBinding,
+ read: State<Vec<u8>, Vec<u8>>,
+ write: State<Vec<u8>, (Vec<u8>, usize)>,
+ read_buf: SocketAddrBuf,
+}
+
+enum State<T, U> {
+ Empty,
+ Pending(T),
+ Ready(U),
+ Error(io::Error),
+}
+
+impl UdpSocket {
+ pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> {
+ Ok(UdpSocket {
+ registration: Mutex::new(None),
+ imp: Imp {
+ inner: FromRawArc::new(Io {
+ read: Overlapped::new(recv_done),
+ write: Overlapped::new(send_done),
+ socket: socket,
+ inner: Mutex::new(Inner {
+ iocp: ReadyBinding::new(),
+ read: State::Empty,
+ write: State::Empty,
+ read_buf: SocketAddrBuf::new(),
+ }),
+ }),
+ },
+ })
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<UdpSocket> {
+ self.imp.inner.socket.try_clone().and_then(UdpSocket::new)
+ }
+
+ /// Note that unlike `TcpStream::write` this function will not attempt to
+ /// continue writing `buf` until its entirely written.
+ ///
+ /// TODO: This... may be wrong in the long run. We're reporting that we
+ /// successfully wrote all of the bytes in `buf` but it's possible
+ /// that we don't actually end up writing all of them!
+ pub fn send_to(&self, buf: &[u8], target: &SocketAddr)
+ -> io::Result<usize> {
+ let mut me = self.inner();
+ let me = &mut *me;
+
+ match me.write {
+ State::Empty => {}
+ _ => return Err(io::ErrorKind::WouldBlock.into()),
+ }
+
+ if !me.iocp.registered() {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let interest = me.iocp.readiness();
+ me.iocp.set_readiness(interest - Ready::writable());
+
+ let mut owned_buf = me.iocp.get_buffer(64 * 1024);
+ let amt = owned_buf.write(buf)?;
+ unsafe {
+ trace!("scheduling a send");
+ self.imp.inner.socket.send_to_overlapped(&owned_buf, target,
+ self.imp.inner.write.as_mut_ptr())
+ }?;
+ me.write = State::Pending(owned_buf);
+ mem::forget(self.imp.clone());
+ Ok(amt)
+ }
+
+ /// Note that unlike `TcpStream::write` this function will not attempt to
+ /// continue writing `buf` until its entirely written.
+ ///
+ /// TODO: This... may be wrong in the long run. We're reporting that we
+ /// successfully wrote all of the bytes in `buf` but it's possible
+ /// that we don't actually end up writing all of them!
+ pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ let mut me = self.inner();
+ let me = &mut *me;
+
+ match me.write {
+ State::Empty => {}
+ _ => return Err(io::ErrorKind::WouldBlock.into()),
+ }
+
+ if !me.iocp.registered() {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let interest = me.iocp.readiness();
+ me.iocp.set_readiness(interest - Ready::writable());
+
+ let mut owned_buf = me.iocp.get_buffer(64 * 1024);
+ let amt = owned_buf.write(buf)?;
+ unsafe {
+ trace!("scheduling a send");
+ self.imp.inner.socket.send_overlapped(&owned_buf, self.imp.inner.write.as_mut_ptr())
+
+ }?;
+ me.write = State::Pending(owned_buf);
+ mem::forget(self.imp.clone());
+ Ok(amt)
+ }
+
+ pub fn recv_from(&self, mut buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let mut me = self.inner();
+ match mem::replace(&mut me.read, State::Empty) {
+ State::Empty => Err(io::ErrorKind::WouldBlock.into()),
+ State::Pending(b) => { me.read = State::Pending(b); Err(io::ErrorKind::WouldBlock.into()) }
+ State::Ready(data) => {
+ // If we weren't provided enough space to receive the message
+ // then don't actually read any data, just return an error.
+ if buf.len() < data.len() {
+ me.read = State::Ready(data);
+ Err(io::Error::from_raw_os_error(WSAEMSGSIZE as i32))
+ } else {
+ let r = if let Some(addr) = me.read_buf.to_socket_addr() {
+ buf.write(&data).unwrap();
+ Ok((data.len(), addr))
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "failed to parse socket address"))
+ };
+ me.iocp.put_buffer(data);
+ self.imp.schedule_read_from(&mut me);
+ r
+ }
+ }
+ State::Error(e) => {
+ self.imp.schedule_read_from(&mut me);
+ Err(e)
+ }
+ }
+ }
+
+ pub fn recv(&self, buf: &mut [u8])
+ -> io::Result<usize> {
+ //Since recv_from can be used on connected sockets just call it and drop the address.
+ self.recv_from(buf).map(|(size,_)| size)
+ }
+
+ pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
+ self.imp.inner.socket.connect(addr)
+ }
+
+ pub fn broadcast(&self) -> io::Result<bool> {
+ self.imp.inner.socket.broadcast()
+ }
+
+ pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_broadcast(on)
+ }
+
+ pub fn multicast_loop_v4(&self) -> io::Result<bool> {
+ self.imp.inner.socket.multicast_loop_v4()
+ }
+
+ pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_multicast_loop_v4(on)
+ }
+
+ pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
+ self.imp.inner.socket.multicast_ttl_v4()
+ }
+
+ pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_multicast_ttl_v4(ttl)
+ }
+
+ pub fn multicast_loop_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.multicast_loop_v6()
+ }
+
+ pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_multicast_loop_v6(on)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.imp.inner.socket.ttl()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_ttl(ttl)
+ }
+
+ pub fn join_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.imp.inner.socket.join_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn join_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.imp.inner.socket.join_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.imp.inner.socket.leave_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.imp.inner.socket.leave_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.only_v6()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.imp.inner.socket.take_error()
+ }
+
+ fn inner(&self) -> MutexGuard<Inner> {
+ self.imp.inner()
+ }
+
+ fn post_register(&self, interest: Ready, me: &mut Inner) {
+ if interest.is_readable() {
+ //We use recv_from here since it is well specified for both
+ //connected and non-connected sockets and we can discard the address
+ //when calling recv().
+ self.imp.schedule_read_from(me);
+ }
+ // See comments in TcpSocket::post_register for what's going on here
+ if interest.is_writable() {
+ if let State::Empty = me.write {
+ self.imp.add_readiness(me, Ready::writable());
+ }
+ }
+ }
+}
+
+impl Imp {
+ fn inner(&self) -> MutexGuard<Inner> {
+ self.inner.inner.lock().unwrap()
+ }
+
+ fn schedule_read_from(&self, me: &mut Inner) {
+ match me.read {
+ State::Empty => {}
+ _ => return,
+ }
+
+ let interest = me.iocp.readiness();
+ me.iocp.set_readiness(interest - Ready::readable());
+
+ let mut buf = me.iocp.get_buffer(64 * 1024);
+ let res = unsafe {
+ trace!("scheduling a read");
+ let cap = buf.capacity();
+ buf.set_len(cap);
+ self.inner.socket.recv_from_overlapped(&mut buf, &mut me.read_buf,
+ self.inner.read.as_mut_ptr())
+ };
+ match res {
+ Ok(_) => {
+ me.read = State::Pending(buf);
+ mem::forget(self.clone());
+ }
+ Err(e) => {
+ me.read = State::Error(e);
+ self.add_readiness(me, Ready::readable());
+ me.iocp.put_buffer(buf);
+ }
+ }
+ }
+
+ // See comments in tcp::StreamImp::push
+ fn add_readiness(&self, me: &Inner, set: Ready) {
+ me.iocp.set_readiness(set | me.iocp.readiness());
+ }
+}
+
+impl Evented for UdpSocket {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.register_socket(&self.imp.inner.socket,
+ poll, token, interest, opts,
+ &self.registration)?;
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.reregister_socket(&self.imp.inner.socket,
+ poll, token, interest,
+ opts, &self.registration)?;
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner().iocp.deregister(&self.imp.inner.socket,
+ poll, &self.registration)
+ }
+}
+
+impl fmt::Debug for UdpSocket {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("UdpSocket")
+ .finish()
+ }
+}
+
+impl Drop for UdpSocket {
+ fn drop(&mut self) {
+ let inner = self.inner();
+
+ // If we're still internally reading, we're no longer interested. Note
+ // though that we don't cancel any writes which may have been issued to
+ // preserve the same semantics as Unix.
+ unsafe {
+ match inner.read {
+ State::Pending(_) => {
+ drop(super::cancel(&self.imp.inner.socket,
+ &self.imp.inner.read));
+ }
+ State::Empty |
+ State::Ready(_) |
+ State::Error(_) => {}
+ }
+ }
+ }
+}
+
+fn send_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ trace!("finished a send {}", status.bytes_transferred());
+ let me2 = Imp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), Io, write) },
+ };
+ let mut me = me2.inner();
+ me.write = State::Empty;
+ me2.add_readiness(&mut me, Ready::writable());
+}
+
+fn recv_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ trace!("finished a recv {}", status.bytes_transferred());
+ let me2 = Imp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), Io, read) },
+ };
+ let mut me = me2.inner();
+ let mut buf = match mem::replace(&mut me.read, State::Empty) {
+ State::Pending(buf) => buf,
+ _ => unreachable!(),
+ };
+ unsafe {
+ buf.set_len(status.bytes_transferred() as usize);
+ }
+ me.read = State::Ready(buf);
+ me2.add_readiness(&mut me, Ready::readable());
+}