summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio-0.6.23/src/sys/windows
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/mio-0.6.23/src/sys/windows')
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs66
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs20
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs116
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/mod.rs193
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/selector.rs538
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs853
-rw-r--r--third_party/rust/mio-0.6.23/src/sys/windows/udp.rs414
7 files changed, 2200 insertions, 0 deletions
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs b/third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs
new file mode 100644
index 0000000000..c913bc93f8
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs
@@ -0,0 +1,66 @@
+use std::sync::Mutex;
+
+use miow::iocp::CompletionStatus;
+use {io, poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::windows::Selector;
+
+pub struct Awakener {
+ inner: Mutex<Option<AwakenerInner>>,
+}
+
+struct AwakenerInner {
+ token: Token,
+ selector: Selector,
+}
+
+impl Awakener {
+ pub fn new() -> io::Result<Awakener> {
+ Ok(Awakener {
+ inner: Mutex::new(None),
+ })
+ }
+
+ pub fn wakeup(&self) -> io::Result<()> {
+ // Each wakeup notification has NULL as its `OVERLAPPED` pointer to
+ // indicate that it's from this awakener and not part of an I/O
+ // operation. This is specially recognized by the selector.
+ //
+ // If we haven't been registered with an event loop yet just silently
+ // succeed.
+ if let Some(inner) = self.inner.lock().unwrap().as_ref() {
+ let status = CompletionStatus::new(0,
+ usize::from(inner.token),
+ 0 as *mut _);
+ inner.selector.port().post(status)?;
+ }
+ Ok(())
+ }
+
+ pub fn cleanup(&self) {
+ // noop
+ }
+}
+
+impl Evented for Awakener {
+ fn register(&self, poll: &Poll, token: Token, events: Ready,
+ opts: PollOpt) -> io::Result<()> {
+ assert_eq!(opts, PollOpt::edge());
+ assert_eq!(events, Ready::readable());
+ *self.inner.lock().unwrap() = Some(AwakenerInner {
+ selector: poll::selector(poll).clone_ref(),
+ token: token,
+ });
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, events: Ready,
+ opts: PollOpt) -> io::Result<()> {
+ self.register(poll, token, events, opts)
+ }
+
+ fn deregister(&self, _poll: &Poll) -> io::Result<()> {
+ *self.inner.lock().unwrap() = None;
+ Ok(())
+ }
+}
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs b/third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs
new file mode 100644
index 0000000000..86754593fd
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs
@@ -0,0 +1,20 @@
+pub struct BufferPool {
+ pool: Vec<Vec<u8>>,
+}
+
+impl BufferPool {
+ pub fn new(cap: usize) -> BufferPool {
+ BufferPool { pool: Vec::with_capacity(cap) }
+ }
+
+ pub fn get(&mut self, default_cap: usize) -> Vec<u8> {
+ self.pool.pop().unwrap_or_else(|| Vec::with_capacity(default_cap))
+ }
+
+ pub fn put(&mut self, mut buf: Vec<u8>) {
+ if self.pool.len() < self.pool.capacity(){
+ unsafe { buf.set_len(0); }
+ self.pool.push(buf);
+ }
+ }
+}
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs b/third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs
new file mode 100644
index 0000000000..b6d38b2408
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs
@@ -0,0 +1,116 @@
+//! A "Manual Arc" which allows manually frobbing the reference count
+//!
+//! This module contains a copy of the `Arc` found in the standard library,
+//! stripped down to the bare bones of what we actually need. The reason this is
+//! done is for the ability to concretely know the memory layout of the `Inner`
+//! structure of the arc pointer itself (e.g. `ArcInner` in the standard
+//! library).
+//!
+//! We do some unsafe casting from `*mut OVERLAPPED` to a `FromRawArc<T>` to
+//! ensure that data lives for the length of an I/O operation, but this means
+//! that we have to know the layouts of the structures involved. This
+//! representation primarily guarantees that the data, `T` is at the front of
+//! the inner pointer always.
+//!
+//! Note that we're missing out on some various optimizations implemented in the
+//! standard library:
+//!
+//! * The size of `FromRawArc` is actually two words because of the drop flag
+//! * The compiler doesn't understand that the pointer in `FromRawArc` is never
+//! null, so Option<FromRawArc<T>> is not a nullable pointer.
+
+use std::ops::Deref;
+use std::mem;
+use std::sync::atomic::{self, AtomicUsize, Ordering};
+
+pub struct FromRawArc<T> {
+ _inner: *mut Inner<T>,
+}
+
+unsafe impl<T: Sync + Send> Send for FromRawArc<T> { }
+unsafe impl<T: Sync + Send> Sync for FromRawArc<T> { }
+
+#[repr(C)]
+struct Inner<T> {
+ data: T,
+ cnt: AtomicUsize,
+}
+
+impl<T> FromRawArc<T> {
+ pub fn new(data: T) -> FromRawArc<T> {
+ let x = Box::new(Inner {
+ data: data,
+ cnt: AtomicUsize::new(1),
+ });
+ FromRawArc { _inner: unsafe { mem::transmute(x) } }
+ }
+
+ pub unsafe fn from_raw(ptr: *mut T) -> FromRawArc<T> {
+ // Note that if we could use `mem::transmute` here to get a libstd Arc
+ // (guaranteed) then we could just use std::sync::Arc, but this is the
+ // crucial reason this currently exists.
+ FromRawArc { _inner: ptr as *mut Inner<T> }
+ }
+}
+
+impl<T> Clone for FromRawArc<T> {
+ fn clone(&self) -> FromRawArc<T> {
+ // Atomic ordering of Relaxed lifted from libstd, but the general idea
+ // is that you need synchronization to communicate this increment to
+ // another thread, so this itself doesn't need to be synchronized.
+ unsafe {
+ (*self._inner).cnt.fetch_add(1, Ordering::Relaxed);
+ }
+ FromRawArc { _inner: self._inner }
+ }
+}
+
+impl<T> Deref for FromRawArc<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ unsafe { &(*self._inner).data }
+ }
+}
+
+impl<T> Drop for FromRawArc<T> {
+ fn drop(&mut self) {
+ unsafe {
+ // Atomic orderings lifted from the standard library
+ if (*self._inner).cnt.fetch_sub(1, Ordering::Release) != 1 {
+ return
+ }
+ atomic::fence(Ordering::Acquire);
+ drop(mem::transmute::<_, Box<T>>(self._inner));
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::FromRawArc;
+
+ #[test]
+ fn smoke() {
+ let a = FromRawArc::new(1);
+ assert_eq!(*a, 1);
+ assert_eq!(*a.clone(), 1);
+ }
+
+ #[test]
+ fn drops() {
+ struct A<'a>(&'a mut bool);
+ impl<'a> Drop for A<'a> {
+ fn drop(&mut self) {
+ *self.0 = true;
+ }
+ }
+ let mut a = false;
+ {
+ let a = FromRawArc::new(A(&mut a));
+ let _ = a.clone();
+ assert!(!*a.0);
+ }
+ assert!(a);
+ }
+}
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/mod.rs b/third_party/rust/mio-0.6.23/src/sys/windows/mod.rs
new file mode 100644
index 0000000000..9b9f054495
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/mod.rs
@@ -0,0 +1,193 @@
+//! Implementation of mio for Windows using IOCP
+//!
+//! This module uses I/O Completion Ports (IOCP) on Windows to implement mio's
+//! Unix epoll-like interface. Unfortunately these two I/O models are
+//! fundamentally incompatible:
+//!
+//! * IOCP is a completion-based model where work is submitted to the kernel and
+//! a program is notified later when the work finished.
+//! * epoll is a readiness-based model where the kernel is queried as to what
+//! work can be done, and afterwards the work is done.
+//!
+//! As a result, this implementation for Windows is much less "low level" than
+//! the Unix implementation of mio. This design decision was intentional,
+//! however.
+//!
+//! ## What is IOCP?
+//!
+//! The [official docs][docs] have a comprehensive explanation of what IOCP is,
+//! but at a high level it requires the following operations to be executed to
+//! perform some I/O:
+//!
+//! 1. A completion port is created
+//! 2. An I/O handle and a token is registered with this completion port
+//! 3. Some I/O is issued on the handle. This generally means that an API was
+//! invoked with a zeroed `OVERLAPPED` structure. The API will immediately
+//! return.
+//! 4. After some time, the application queries the I/O port for completed
+//! events. The port will returned a pointer to the `OVERLAPPED` along with
+//! the token presented at registration time.
+//!
+//! Many I/O operations can be fired off before waiting on a port, and the port
+//! will block execution of the calling thread until an I/O event has completed
+//! (or a timeout has elapsed).
+//!
+//! Currently all of these low-level operations are housed in a separate `miow`
+//! crate to provide a 0-cost abstraction over IOCP. This crate uses that to
+//! implement all fiddly bits so there's very few actual Windows API calls or
+//! `unsafe` blocks as a result.
+//!
+//! [docs]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198%28v=vs.85%29.aspx
+//!
+//! ## Safety of IOCP
+//!
+//! Unfortunately for us, IOCP is pretty unsafe in terms of Rust lifetimes and
+//! such. When an I/O operation is submitted to the kernel, it involves handing
+//! the kernel a few pointers like a buffer to read/write, an `OVERLAPPED`
+//! structure pointer, and perhaps some other buffers such as for socket
+//! addresses. These pointers all have to remain valid **for the entire I/O
+//! operation's duration**.
+//!
+//! There's no way to define a safe lifetime for these pointers/buffers over
+//! the span of an I/O operation, so we're forced to add a layer of abstraction
+//! (not 0-cost) to make these APIs safe. Currently this implementation
+//! basically just boxes everything up on the heap to give it a stable address
+//! and then keys off that most of the time.
+//!
+//! ## From completion to readiness
+//!
+//! Translating a completion-based model to a readiness-based model is also no
+//! easy task, and a significant portion of this implementation is managing this
+//! translation. The basic idea behind this implementation is to issue I/O
+//! operations preemptively and then translate their completions to a "I'm
+//! ready" event.
+//!
+//! For example, in the case of reading a `TcpSocket`, as soon as a socket is
+//! connected (or registered after an accept) a read operation is executed.
+//! While the read is in progress calls to `read` will return `WouldBlock`, and
+//! once the read is completed we translate the completion notification into a
+//! `readable` event. Once the internal buffer is drained (e.g. all data from it
+//! has been read) a read operation is re-issued.
+//!
+//! Write operations are a little different from reads, and the current
+//! implementation is to just schedule a write as soon as `write` is first
+//! called. While that write operation is in progress all future calls to
+//! `write` will return `WouldBlock`. Completion of the write then translates to
+//! a `writable` event. Note that this will probably want to add some layer of
+//! internal buffering in the future.
+//!
+//! ## Buffer Management
+//!
+//! As there's lots of I/O operations in flight at any one point in time,
+//! there's lots of live buffers that need to be juggled around (e.g. this
+//! implementation's own internal buffers).
+//!
+//! Currently all buffers are created for the I/O operation at hand and are then
+//! discarded when it completes (this is listed as future work below).
+//!
+//! ## Callback Management
+//!
+//! When the main event loop receives a notification that an I/O operation has
+//! completed, some work needs to be done to translate that to a set of events
+//! or perhaps some more I/O needs to be scheduled. For example after a
+//! `TcpStream` is connected it generates a writable event and also schedules a
+//! read.
+//!
+//! To manage all this the `Selector` uses the `OVERLAPPED` pointer from the
+//! completion status. The selector assumes that all `OVERLAPPED` pointers are
+//! actually pointers to the interior of a `selector::Overlapped` which means
+//! that right after the `OVERLAPPED` itself there's a function pointer. This
+//! function pointer is given the completion status as well as another callback
+//! to push events onto the selector.
+//!
+//! The callback for each I/O operation doesn't have any environment, so it
+//! relies on memory layout and unsafe casting to translate an `OVERLAPPED`
+//! pointer (or in this case a `selector::Overlapped` pointer) to a type of
+//! `FromRawArc<T>` (see module docs for why this type exists).
+//!
+//! ## Thread Safety
+//!
+//! Currently all of the I/O primitives make liberal use of `Arc` and `Mutex`
+//! as an implementation detail. The main reason for this is to ensure that the
+//! types are `Send` and `Sync`, but the implementations have not been stressed
+//! in multithreaded situations yet. As a result, there are bound to be
+//! functional surprises in using these concurrently.
+//!
+//! ## Future Work
+//!
+//! First up, let's take a look at unimplemented portions of this module:
+//!
+//! * The `PollOpt::level()` option is currently entirely unimplemented.
+//! * Each `EventLoop` currently owns its completion port, but this prevents an
+//! I/O handle from being added to multiple event loops (something that can be
+//! done on Unix). Additionally, it hinders event loops moving across threads.
+//! This should be solved by likely having a global `Selector` which all
+//! others then communicate with.
+//! * Although Unix sockets don't exist on Windows, there are named pipes and
+//! those should likely be bound here in a similar fashion to `TcpStream`.
+//!
+//! Next up, there are a few performance improvements and optimizations that can
+//! still be implemented
+//!
+//! * Buffer management right now is pretty bad, they're all just allocated
+//! right before an I/O operation and discarded right after. There should at
+//! least be some form of buffering buffers.
+//! * No calls to `write` are internally buffered before being scheduled, which
+//! means that writing performance is abysmal compared to Unix. There should
+//! be some level of buffering of writes probably.
+
+use std::io;
+use std::os::windows::prelude::*;
+
+mod kernel32 {
+ pub use ::winapi::um::ioapiset::CancelIoEx;
+ pub use ::winapi::um::winbase::SetFileCompletionNotificationModes;
+}
+mod winapi {
+ pub use ::winapi::shared::minwindef::{TRUE, UCHAR};
+ pub use ::winapi::um::winnt::HANDLE;
+}
+
+mod awakener;
+#[macro_use]
+mod selector;
+mod tcp;
+mod udp;
+mod from_raw_arc;
+mod buffer_pool;
+
+pub use self::awakener::Awakener;
+pub use self::selector::{Events, Selector, Overlapped, Binding};
+pub use self::tcp::{TcpStream, TcpListener};
+pub use self::udp::UdpSocket;
+
+#[derive(Copy, Clone)]
+enum Family {
+ V4, V6,
+}
+
+unsafe fn cancel(socket: &AsRawSocket,
+ overlapped: &Overlapped) -> io::Result<()> {
+ let handle = socket.as_raw_socket() as winapi::HANDLE;
+ let ret = kernel32::CancelIoEx(handle, overlapped.as_mut_ptr());
+ if ret == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(())
+ }
+}
+
+unsafe fn no_notify_on_instant_completion(handle: winapi::HANDLE) -> io::Result<()> {
+ // TODO: move those to winapi
+ const FILE_SKIP_COMPLETION_PORT_ON_SUCCESS: winapi::UCHAR = 1;
+ const FILE_SKIP_SET_EVENT_ON_HANDLE: winapi::UCHAR = 2;
+
+ let flags = FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE;
+
+ let r = kernel32::SetFileCompletionNotificationModes(handle, flags);
+ if r == winapi::TRUE {
+ Ok(())
+ } else {
+ Err(io::Error::last_os_error())
+ }
+}
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/selector.rs b/third_party/rust/mio-0.6.23/src/sys/windows/selector.rs
new file mode 100644
index 0000000000..23b145acdd
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/selector.rs
@@ -0,0 +1,538 @@
+#![allow(deprecated)]
+
+use std::{fmt, io};
+use std::cell::UnsafeCell;
+use std::os::windows::prelude::*;
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+use std::time::Duration;
+
+use lazycell::AtomicLazyCell;
+
+use winapi::shared::winerror::WAIT_TIMEOUT;
+use winapi::um::minwinbase::{OVERLAPPED, OVERLAPPED_ENTRY};
+use miow;
+use miow::iocp::{CompletionPort, CompletionStatus};
+
+use event_imp::{Event, Evented, Ready};
+use poll::{self, Poll};
+use sys::windows::buffer_pool::BufferPool;
+use {Token, PollOpt};
+
+/// Each Selector has a globally unique(ish) ID associated with it. This ID
+/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
+/// registered with the `Selector`. If a type that is previously associated with
+/// a `Selector` attempts to register itself with a different `Selector`, the
+/// operation will return with an error. This matches windows behavior.
+static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+
+/// The guts of the Windows event loop, this is the struct which actually owns
+/// a completion port.
+///
+/// Internally this is just an `Arc`, and this allows handing out references to
+/// the internals to I/O handles registered on this selector. This is
+/// required to schedule I/O operations independently of being inside the event
+/// loop (e.g. when a call to `write` is seen we're not "in the event loop").
+pub struct Selector {
+ inner: Arc<SelectorInner>,
+}
+
+struct SelectorInner {
+ /// Unique identifier of the `Selector`
+ id: usize,
+
+ /// The actual completion port that's used to manage all I/O
+ port: CompletionPort,
+
+ /// A pool of buffers usable by this selector.
+ ///
+ /// Primitives will take buffers from this pool to perform I/O operations,
+ /// and once complete they'll be put back in.
+ buffers: Mutex<BufferPool>,
+}
+
+impl Selector {
+ pub fn new() -> io::Result<Selector> {
+ // offset by 1 to avoid choosing 0 as the id of a selector
+ let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
+
+ CompletionPort::new(0).map(|cp| {
+ Selector {
+ inner: Arc::new(SelectorInner {
+ id: id,
+ port: cp,
+ buffers: Mutex::new(BufferPool::new(256)),
+ }),
+ }
+ })
+ }
+
+ pub fn select(&self,
+ events: &mut Events,
+ awakener: Token,
+ timeout: Option<Duration>) -> io::Result<bool> {
+ trace!("select; timeout={:?}", timeout);
+
+ // Clear out the previous list of I/O events and get some more!
+ events.clear();
+
+ trace!("polling IOCP");
+ let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
+ Ok(statuses) => statuses.len(),
+ Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
+ Err(e) => return Err(e),
+ };
+
+ let mut ret = false;
+ for status in events.statuses[..n].iter() {
+ // This should only ever happen from the awakener, and we should
+ // only ever have one awakener right now, so assert as such.
+ if status.overlapped() as usize == 0 {
+ assert_eq!(status.token(), usize::from(awakener));
+ ret = true;
+ continue;
+ }
+
+ let callback = unsafe {
+ (*(status.overlapped() as *mut Overlapped)).callback
+ };
+
+ trace!("select; -> got overlapped");
+ callback(status.entry());
+ }
+
+ trace!("returning");
+ Ok(ret)
+ }
+
+ /// Gets a reference to the underlying `CompletionPort` structure.
+ pub fn port(&self) -> &CompletionPort {
+ &self.inner.port
+ }
+
+ /// Gets a new reference to this selector, although all underlying data
+ /// structures will refer to the same completion port.
+ pub fn clone_ref(&self) -> Selector {
+ Selector { inner: self.inner.clone() }
+ }
+
+ /// Return the `Selector`'s identifier
+ pub fn id(&self) -> usize {
+ self.inner.id
+ }
+}
+
+impl SelectorInner {
+ fn identical(&self, other: &SelectorInner) -> bool {
+ (self as *const SelectorInner) == (other as *const SelectorInner)
+ }
+}
+
+// A registration is stored in each I/O object which keeps track of how it is
+// associated with a `Selector` above.
+//
+// Once associated with a `Selector`, a registration can never be un-associated
+// (due to IOCP requirements). This is actually implemented through the
+// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
+// level/edge/filtering business.
+/// A `Binding` is embedded in all I/O objects associated with a `Poll`
+/// object.
+///
+/// Each registration keeps track of which selector the I/O object is
+/// associated with, ensuring that implementations of `Evented` can be
+/// conformant for the various methods on Windows.
+///
+/// If you're working with custom IOCP-enabled objects then you'll want to
+/// ensure that one of these instances is stored in your object and used in the
+/// implementation of `Evented`.
+///
+/// For more information about how to use this see the `windows` module
+/// documentation in this crate.
+pub struct Binding {
+ selector: AtomicLazyCell<Arc<SelectorInner>>,
+}
+
+impl Binding {
+ /// Creates a new blank binding ready to be inserted into an I/O
+ /// object.
+ ///
+ /// Won't actually do anything until associated with a `Poll` loop.
+ pub fn new() -> Binding {
+ Binding { selector: AtomicLazyCell::new() }
+ }
+
+ /// Registers a new handle with the `Poll` specified, also assigning the
+ /// `token` specified.
+ ///
+ /// This function is intended to be used as part of `Evented::register` for
+ /// custom IOCP objects. It will add the specified handle to the internal
+ /// IOCP object with the provided `token`. All future events generated by
+ /// the handled provided will be received by the `Poll`'s internal IOCP
+ /// object.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as the `Poll` instance has assumptions about
+ /// what the `OVERLAPPED` pointer used for each I/O operation looks like.
+ /// Specifically they must all be instances of the `Overlapped` type in
+ /// this crate. More information about this can be found on the
+ /// `windows` module in this crate.
+ pub unsafe fn register_handle(&self,
+ handle: &AsRawHandle,
+ token: Token,
+ poll: &Poll) -> io::Result<()> {
+ let selector = poll::selector(poll);
+
+ // Ignore errors, we'll see them on the next line.
+ drop(self.selector.fill(selector.inner.clone()));
+ self.check_same_selector(poll)?;
+
+ selector.inner.port.add_handle(usize::from(token), handle)
+ }
+
+ /// Same as `register_handle` but for sockets.
+ pub unsafe fn register_socket(&self,
+ handle: &AsRawSocket,
+ token: Token,
+ poll: &Poll) -> io::Result<()> {
+ let selector = poll::selector(poll);
+ drop(self.selector.fill(selector.inner.clone()));
+ self.check_same_selector(poll)?;
+ selector.inner.port.add_socket(usize::from(token), handle)
+ }
+
+ /// Reregisters the handle provided from the `Poll` provided.
+ ///
+ /// This is intended to be used as part of `Evented::reregister` but note
+ /// that this function does not currently reregister the provided handle
+ /// with the `poll` specified. IOCP has a special binding for changing the
+ /// token which has not yet been implemented. Instead this function should
+ /// be used to assert that the call to `reregister` happened on the same
+ /// `Poll` that was passed into to `register`.
+ ///
+ /// Eventually, though, the provided `handle` will be re-assigned to have
+ /// the token `token` on the given `poll` assuming that it's been
+ /// previously registered with it.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe for similar reasons to `register`. That is,
+ /// there may be pending I/O events and such which aren't handled correctly.
+ pub unsafe fn reregister_handle(&self,
+ _handle: &AsRawHandle,
+ _token: Token,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ /// Same as `reregister_handle`, but for sockets.
+ pub unsafe fn reregister_socket(&self,
+ _socket: &AsRawSocket,
+ _token: Token,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ /// Deregisters the handle provided from the `Poll` provided.
+ ///
+ /// This is intended to be used as part of `Evented::deregister` but note
+ /// that this function does not currently deregister the provided handle
+ /// from the `poll` specified. IOCP has a special binding for that which has
+ /// not yet been implemented. Instead this function should be used to assert
+ /// that the call to `deregister` happened on the same `Poll` that was
+ /// passed into to `register`.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe for similar reasons to `register`. That is,
+ /// there may be pending I/O events and such which aren't handled correctly.
+ pub unsafe fn deregister_handle(&self,
+ _handle: &AsRawHandle,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ /// Same as `deregister_handle`, but for sockets.
+ pub unsafe fn deregister_socket(&self,
+ _socket: &AsRawSocket,
+ poll: &Poll) -> io::Result<()> {
+ self.check_same_selector(poll)
+ }
+
+ fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
+ let selector = poll::selector(poll);
+ match self.selector.borrow() {
+ Some(prev) if prev.identical(&selector.inner) => Ok(()),
+ Some(_) |
+ None => Err(other("socket already registered")),
+ }
+ }
+}
+
+impl fmt::Debug for Binding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Binding")
+ .finish()
+ }
+}
+
+/// Helper struct used for TCP and UDP which bundles a `binding` with a
+/// `SetReadiness` handle.
+pub struct ReadyBinding {
+ binding: Binding,
+ readiness: Option<poll::SetReadiness>,
+}
+
+impl ReadyBinding {
+ /// Creates a new blank binding ready to be inserted into an I/O object.
+ ///
+ /// Won't actually do anything until associated with an `Selector` loop.
+ pub fn new() -> ReadyBinding {
+ ReadyBinding {
+ binding: Binding::new(),
+ readiness: None,
+ }
+ }
+
+ /// Returns whether this binding has been associated with a selector
+ /// yet.
+ pub fn registered(&self) -> bool {
+ self.readiness.is_some()
+ }
+
+ /// Acquires a buffer with at least `size` capacity.
+ ///
+ /// If associated with a selector, this will attempt to pull a buffer from
+ /// that buffer pool. If not associated with a selector, this will allocate
+ /// a fresh buffer.
+ pub fn get_buffer(&self, size: usize) -> Vec<u8> {
+ match self.binding.selector.borrow() {
+ Some(i) => i.buffers.lock().unwrap().get(size),
+ None => Vec::with_capacity(size),
+ }
+ }
+
+ /// Returns a buffer to this binding.
+ ///
+ /// If associated with a selector, this will push the buffer back into the
+ /// selector's pool of buffers. Otherwise this will just drop the buffer.
+ pub fn put_buffer(&self, buf: Vec<u8>) {
+ if let Some(i) = self.binding.selector.borrow() {
+ i.buffers.lock().unwrap().put(buf);
+ }
+ }
+
+ /// Sets the readiness of this I/O object to a particular `set`.
+ ///
+ /// This is later used to fill out and respond to requests to `poll`. Note
+ /// that this is all implemented through the `SetReadiness` structure in the
+ /// `poll` module.
+ pub fn set_readiness(&self, set: Ready) {
+ if let Some(ref i) = self.readiness {
+ trace!("set readiness to {:?}", set);
+ i.set_readiness(set).expect("event loop disappeared?");
+ }
+ }
+
+ /// Queries what the current readiness of this I/O object is.
+ ///
+ /// This is what's being used to generate events returned by `poll`.
+ pub fn readiness(&self) -> Ready {
+ match self.readiness {
+ Some(ref i) => i.readiness(),
+ None => Ready::empty(),
+ }
+ }
+
+ /// Implementation of the `Evented::register` function essentially.
+ ///
+ /// Returns an error if we're already registered with another event loop,
+ /// and otherwise just reassociates ourselves with the event loop to
+ /// possible change tokens.
+ pub fn register_socket(&mut self,
+ socket: &AsRawSocket,
+ poll: &Poll,
+ token: Token,
+ events: Ready,
+ opts: PollOpt,
+ registration: &Mutex<Option<poll::Registration>>)
+ -> io::Result<()> {
+ trace!("register {:?} {:?}", token, events);
+ unsafe {
+ self.binding.register_socket(socket, token, poll)?;
+ }
+
+ let (r, s) = poll::new_registration(poll, token, events, opts);
+ self.readiness = Some(s);
+ *registration.lock().unwrap() = Some(r);
+ Ok(())
+ }
+
+ /// Implementation of `Evented::reregister` function.
+ pub fn reregister_socket(&mut self,
+ socket: &AsRawSocket,
+ poll: &Poll,
+ token: Token,
+ events: Ready,
+ opts: PollOpt,
+ registration: &Mutex<Option<poll::Registration>>)
+ -> io::Result<()> {
+ trace!("reregister {:?} {:?}", token, events);
+ unsafe {
+ self.binding.reregister_socket(socket, token, poll)?;
+ }
+
+ registration.lock().unwrap()
+ .as_mut().unwrap()
+ .reregister(poll, token, events, opts)
+ }
+
+ /// Implementation of the `Evented::deregister` function.
+ ///
+ /// Doesn't allow registration with another event loop, just shuts down
+ /// readiness notifications and such.
+ pub fn deregister(&mut self,
+ socket: &AsRawSocket,
+ poll: &Poll,
+ registration: &Mutex<Option<poll::Registration>>)
+ -> io::Result<()> {
+ trace!("deregistering");
+ unsafe {
+ self.binding.deregister_socket(socket, poll)?;
+ }
+
+ registration.lock().unwrap()
+ .as_ref().unwrap()
+ .deregister(poll)
+ }
+}
+
+fn other(s: &str) -> io::Error {
+ io::Error::new(io::ErrorKind::Other, s)
+}
+
+#[derive(Debug)]
+pub struct Events {
+ /// Raw I/O event completions are filled in here by the call to `get_many`
+ /// on the completion port above. These are then processed to run callbacks
+ /// which figure out what to do after the event is done.
+ statuses: Box<[CompletionStatus]>,
+
+ /// Literal events returned by `get` to the upwards `EventLoop`. This file
+ /// doesn't really modify this (except for the awakener), instead almost all
+ /// events are filled in by the `ReadinessQueue` from the `poll` module.
+ events: Vec<Event>,
+}
+
+impl Events {
+ pub fn with_capacity(cap: usize) -> Events {
+ // Note that it's possible for the output `events` to grow beyond the
+ // capacity as it can also include deferred events, but that's certainly
+ // not the end of the world!
+ Events {
+ statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
+ events: Vec::with_capacity(cap),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.events.is_empty()
+ }
+
+ pub fn len(&self) -> usize {
+ self.events.len()
+ }
+
+ pub fn capacity(&self) -> usize {
+ self.events.capacity()
+ }
+
+ pub fn get(&self, idx: usize) -> Option<Event> {
+ self.events.get(idx).map(|e| *e)
+ }
+
+ pub fn push_event(&mut self, event: Event) {
+ self.events.push(event);
+ }
+
+ pub fn clear(&mut self) {
+ self.events.truncate(0);
+ }
+}
+
+macro_rules! overlapped2arc {
+ ($e:expr, $t:ty, $($field:ident).+) => (
+ #[allow(deref_nullptr)]
+ {
+ let offset = offset_of!($t, $($field).+);
+ debug_assert!(offset < mem::size_of::<$t>());
+ FromRawArc::from_raw(($e as usize - offset) as *mut $t)
+ }
+ )
+}
+
+macro_rules! offset_of {
+ ($t:ty, $($field:ident).+) => (
+ &(*(0 as *const $t)).$($field).+ as *const _ as usize
+ )
+}
+
+// See sys::windows module docs for why this exists.
+//
+// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
+// actually inside one of these structures so it can use the `Callback` stored
+// right after it.
+//
+// We use repr(C) here to ensure that we can assume the overlapped pointer is
+// at the start of the structure so we can just do a cast.
+/// A wrapper around an internal instance over `miow::Overlapped` which is in
+/// turn a wrapper around the Windows type `OVERLAPPED`.
+///
+/// This type is required to be used for all IOCP operations on handles that are
+/// registered with an event loop. The event loop will receive notifications
+/// over `OVERLAPPED` pointers that have completed, and it will cast that
+/// pointer to a pointer to this structure and invoke the associated callback.
+#[repr(C)]
+pub struct Overlapped {
+ inner: UnsafeCell<miow::Overlapped>,
+ callback: fn(&OVERLAPPED_ENTRY),
+}
+
+impl Overlapped {
+ /// Creates a new `Overlapped` which will invoke the provided `cb` callback
+ /// whenever it's triggered.
+ ///
+ /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
+ /// I/O operations that are registered with mio's event loop. When the I/O
+ /// operation associated with an `OVERLAPPED` pointer completes the event
+ /// loop will invoke the function pointer provided by `cb`.
+ pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
+ Overlapped {
+ inner: UnsafeCell::new(miow::Overlapped::zero()),
+ callback: cb,
+ }
+ }
+
+ /// Get the underlying `Overlapped` instance as a raw pointer.
+ ///
+ /// This can be useful when only a shared borrow is held and the overlapped
+ /// pointer needs to be passed down to winapi.
+ pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
+ unsafe {
+ (*self.inner.get()).raw()
+ }
+ }
+}
+
+impl fmt::Debug for Overlapped {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Overlapped")
+ .finish()
+ }
+}
+
+// Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as
+// unsafe as they must be used with caution to ensure thread safety. The
+// structure itself is safe to send across threads.
+unsafe impl Send for Overlapped {}
+unsafe impl Sync for Overlapped {}
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs b/third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs
new file mode 100644
index 0000000000..236e7866a6
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs
@@ -0,0 +1,853 @@
+use std::fmt;
+use std::io::{self, Read, ErrorKind};
+use std::mem;
+use std::net::{self, SocketAddr, Shutdown};
+use std::os::windows::prelude::*;
+use std::sync::{Mutex, MutexGuard};
+use std::time::Duration;
+
+use miow::iocp::CompletionStatus;
+use miow::net::*;
+use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
+use winapi::um::minwinbase::OVERLAPPED_ENTRY;
+use winapi::um::winnt::HANDLE;
+use iovec::IoVec;
+
+use {poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::windows::from_raw_arc::FromRawArc;
+use sys::windows::selector::{Overlapped, ReadyBinding};
+use sys::windows::Family;
+
+pub struct TcpStream {
+ /// Separately stored implementation to ensure that the `Drop`
+ /// implementation on this type is only executed when it's actually dropped
+ /// (many clones of this `imp` are made).
+ imp: StreamImp,
+ registration: Mutex<Option<poll::Registration>>,
+}
+
+pub struct TcpListener {
+ imp: ListenerImp,
+ registration: Mutex<Option<poll::Registration>>,
+}
+
+#[derive(Clone)]
+struct StreamImp {
+ /// A stable address and synchronized access for all internals. This serves
+ /// to ensure that all `Overlapped` pointers are valid for a long period of
+ /// time as well as allowing completion callbacks to have access to the
+ /// internals without having ownership.
+ ///
+ /// Note that the reference count also allows us "loan out" copies to
+ /// completion ports while I/O is running to guarantee that this stays alive
+ /// until the I/O completes. You'll notice a number of calls to
+ /// `mem::forget` below, and these only happen on successful scheduling of
+ /// I/O and are paired with `overlapped2arc!` macro invocations in the
+ /// completion callbacks (to have a decrement match the increment).
+ inner: FromRawArc<StreamIo>,
+}
+
+#[derive(Clone)]
+struct ListenerImp {
+ inner: FromRawArc<ListenerIo>,
+}
+
+struct StreamIo {
+ inner: Mutex<StreamInner>,
+ read: Overlapped, // also used for connect
+ write: Overlapped,
+ socket: net::TcpStream,
+}
+
+struct ListenerIo {
+ inner: Mutex<ListenerInner>,
+ accept: Overlapped,
+ family: Family,
+ socket: net::TcpListener,
+}
+
+struct StreamInner {
+ iocp: ReadyBinding,
+ deferred_connect: Option<SocketAddr>,
+ read: State<(), ()>,
+ write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
+ /// whether we are instantly notified of success
+ /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
+ /// without a roundtrip through the event loop)
+ instant_notify: bool,
+}
+
+struct ListenerInner {
+ iocp: ReadyBinding,
+ accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
+ accept_buf: AcceptAddrsBuf,
+}
+
+enum State<T, U> {
+ Empty, // no I/O operation in progress
+ Pending(T), // an I/O operation is in progress
+ Ready(U), // I/O has finished with this value
+ Error(io::Error), // there was an I/O error
+}
+
+impl TcpStream {
+ fn new(socket: net::TcpStream,
+ deferred_connect: Option<SocketAddr>) -> TcpStream {
+ TcpStream {
+ registration: Mutex::new(None),
+ imp: StreamImp {
+ inner: FromRawArc::new(StreamIo {
+ read: Overlapped::new(read_done),
+ write: Overlapped::new(write_done),
+ socket: socket,
+ inner: Mutex::new(StreamInner {
+ iocp: ReadyBinding::new(),
+ deferred_connect: deferred_connect,
+ read: State::Empty,
+ write: State::Empty,
+ instant_notify: false,
+ }),
+ }),
+ },
+ }
+ }
+
+ pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
+ -> io::Result<TcpStream> {
+ socket.set_nonblocking(true)?;
+ Ok(TcpStream::new(socket, Some(*addr)))
+ }
+
+ pub fn from_stream(stream: net::TcpStream) -> TcpStream {
+ TcpStream::new(stream, None)
+ }
+
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.peer_addr()
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpStream> {
+ self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None))
+ }
+
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.imp.inner.socket.shutdown(how)
+ }
+
+ pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_nodelay(nodelay)
+ }
+
+ pub fn nodelay(&self) -> io::Result<bool> {
+ self.imp.inner.socket.nodelay()
+ }
+
+ pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.imp.inner.socket.set_recv_buffer_size(size)
+ }
+
+ pub fn recv_buffer_size(&self) -> io::Result<usize> {
+ self.imp.inner.socket.recv_buffer_size()
+ }
+
+ pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
+ self.imp.inner.socket.set_send_buffer_size(size)
+ }
+
+ pub fn send_buffer_size(&self) -> io::Result<usize> {
+ self.imp.inner.socket.send_buffer_size()
+ }
+
+ pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
+ self.imp.inner.socket.set_keepalive(keepalive)
+ }
+
+ pub fn keepalive(&self) -> io::Result<Option<Duration>> {
+ self.imp.inner.socket.keepalive()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.imp.inner.socket.ttl()
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.only_v6()
+ }
+
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ Net2TcpExt::set_linger(&self.imp.inner.socket, dur)
+ }
+
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ Net2TcpExt::linger(&self.imp.inner.socket)
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ if let Some(e) = self.imp.inner.socket.take_error()? {
+ return Ok(Some(e))
+ }
+
+ // If the syscall didn't return anything then also check to see if we've
+ // squirreled away an error elsewhere for example as part of a connect
+ // operation.
+ //
+ // Typically this is used like so:
+ //
+ // 1. A `connect` is issued
+ // 2. Wait for the socket to be writable
+ // 3. Call `take_error` to see if the connect succeeded.
+ //
+ // Right now the `connect` operation finishes in `read_done` below and
+ // fill will in `State::Error` in the `read` slot if it fails, so we
+ // extract that here.
+ let mut me = self.inner();
+ match mem::replace(&mut me.read, State::Empty) {
+ State::Error(e) => {
+ self.imp.schedule_read(&mut me);
+ Ok(Some(e))
+ }
+ other => {
+ me.read = other;
+ Ok(None)
+ }
+ }
+ }
+
+ fn inner(&self) -> MutexGuard<StreamInner> {
+ self.imp.inner()
+ }
+
+ fn before_read(&self) -> io::Result<MutexGuard<StreamInner>> {
+ let mut me = self.inner();
+
+ match me.read {
+ // Empty == we're not associated yet, and if we're pending then
+ // these are both cases where we return "would block"
+ State::Empty |
+ State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()),
+
+ // If we got a delayed error as part of a `read_overlapped` below,
+ // return that here. Also schedule another read in case it was
+ // transient.
+ State::Error(_) => {
+ let e = match mem::replace(&mut me.read, State::Empty) {
+ State::Error(e) => e,
+ _ => panic!(),
+ };
+ self.imp.schedule_read(&mut me);
+ return Err(e)
+ }
+
+ // If we're ready for a read then some previous 0-byte read has
+ // completed. In that case the OS's socket buffer has something for
+ // us, so we just keep pulling out bytes while we can in the loop
+ // below.
+ State::Ready(()) => {}
+ }
+
+ Ok(me)
+ }
+
+ fn post_register(&self, interest: Ready, me: &mut StreamInner) {
+ if interest.is_readable() {
+ self.imp.schedule_read(me);
+ }
+
+ // At least with epoll, if a socket is registered with an interest in
+ // writing and it's immediately writable then a writable event is
+ // generated immediately, so do so here.
+ if interest.is_writable() {
+ if let State::Empty = me.write {
+ self.imp.add_readiness(me, Ready::writable());
+ }
+ }
+ }
+
+ pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ match IoVec::from_bytes_mut(buf) {
+ Some(vec) => self.readv(&mut [vec]),
+ None => Ok(0),
+ }
+ }
+
+ pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
+ let mut me = self.before_read()?;
+
+ match (&self.imp.inner.socket).peek(buf) {
+ Ok(n) => Ok(n),
+ Err(e) => {
+ me.read = State::Empty;
+ self.imp.schedule_read(&mut me);
+ Err(e)
+ }
+ }
+ }
+
+ pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
+ let mut me = self.before_read()?;
+
+ // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to
+ // call that instead of looping over all the buffers and calling
+ // `recv` on each buffer. I'm not sure though if an overlapped
+ // socket in nonblocking mode would work with that use case,
+ // however, so for now we just call `recv`.
+
+ let mut amt = 0;
+ for buf in bufs {
+ match (&self.imp.inner.socket).read(buf) {
+ // If we did a partial read, then return what we've read so far
+ Ok(n) if n < buf.len() => return Ok(amt + n),
+
+ // Otherwise filled this buffer entirely, so try to fill the
+ // next one as well.
+ Ok(n) => amt += n,
+
+ // If we hit an error then things get tricky if we've already
+ // read some data. If the error is "would block" then we just
+ // return the data we've read so far while scheduling another
+ // 0-byte read.
+ //
+ // If we've read data and the error kind is not "would block",
+ // then we stash away the error to get returned later and return
+ // the data that we've read.
+ //
+ // Finally if we haven't actually read any data we just
+ // reschedule a 0-byte read to happen again and then return the
+ // error upwards.
+ Err(e) => {
+ if amt > 0 && e.kind() == io::ErrorKind::WouldBlock {
+ me.read = State::Empty;
+ self.imp.schedule_read(&mut me);
+ return Ok(amt)
+ } else if amt > 0 {
+ me.read = State::Error(e);
+ return Ok(amt)
+ } else {
+ me.read = State::Empty;
+ self.imp.schedule_read(&mut me);
+ return Err(e)
+ }
+ }
+ }
+ }
+
+ Ok(amt)
+ }
+
+ pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
+ match IoVec::from_bytes(buf) {
+ Some(vec) => self.writev(&[vec]),
+ None => Ok(0),
+ }
+ }
+
+ pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
+ let mut me = self.inner();
+ let me = &mut *me;
+
+ match mem::replace(&mut me.write, State::Empty) {
+ State::Empty => {}
+ State::Error(e) => return Err(e),
+ other => {
+ me.write = other;
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+ }
+
+ if !me.iocp.registered() {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ if bufs.is_empty() {
+ return Ok(0)
+ }
+
+ let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b);
+ let mut intermediate = me.iocp.get_buffer(len);
+ for buf in bufs {
+ intermediate.extend_from_slice(buf);
+ }
+ self.imp.schedule_write(intermediate, 0, me);
+ Ok(len)
+ }
+
+ pub fn flush(&self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl StreamImp {
+ fn inner(&self) -> MutexGuard<StreamInner> {
+ self.inner.inner.lock().unwrap()
+ }
+
+ fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
+ unsafe {
+ trace!("scheduling a connect");
+ self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
+ }
+ // see docs above on StreamImp.inner for rationale on forget
+ mem::forget(self.clone());
+ Ok(())
+ }
+
+ /// Schedule a read to happen on this socket, enqueuing us to receive a
+ /// notification when a read is ready.
+ ///
+ /// Note that this does *not* work with a buffer. When reading a TCP stream
+ /// we actually read into a 0-byte buffer so Windows will send us a
+ /// notification when the socket is otherwise ready for reading. This allows
+ /// us to avoid buffer allocations for in-flight reads.
+ fn schedule_read(&self, me: &mut StreamInner) {
+ match me.read {
+ State::Empty => {}
+ State::Ready(_) | State::Error(_) => {
+ self.add_readiness(me, Ready::readable());
+ return;
+ }
+ _ => return,
+ }
+
+ me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
+
+ trace!("scheduling a read");
+ let res = unsafe {
+ self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr())
+ };
+ match res {
+ // Note that `Ok(true)` means that this completed immediately and
+ // our socket is readable. This typically means that the caller of
+ // this function (likely `read` above) can try again as an
+ // optimization and return bytes quickly.
+ //
+ // Normally, though, although the read completed immediately
+ // there's still an IOCP completion packet enqueued that we're going
+ // to receive.
+ //
+ // You can configure this behavior (miow) with
+ // SetFileCompletionNotificationModes to indicate that `Ok(true)`
+ // does **not** enqueue a completion packet. (This is the case
+ // for me.instant_notify)
+ //
+ // Note that apparently libuv has scary code to work around bugs in
+ // `WSARecv` for UDP sockets apparently for handles which have had
+ // the `SetFileCompletionNotificationModes` function called on them,
+ // worth looking into!
+ Ok(Some(_)) if me.instant_notify => {
+ me.read = State::Ready(());
+ self.add_readiness(me, Ready::readable());
+ }
+ Ok(_) => {
+ // see docs above on StreamImp.inner for rationale on forget
+ me.read = State::Pending(());
+ mem::forget(self.clone());
+ }
+ Err(e) => {
+ me.read = State::Error(e);
+ self.add_readiness(me, Ready::readable());
+ }
+ }
+ }
+
+ /// Similar to `schedule_read`, except that this issues, well, writes.
+ ///
+ /// This function will continually attempt to write the entire contents of
+ /// the buffer `buf` until they have all been written. The `pos` argument is
+ /// the current offset within the buffer up to which the contents have
+ /// already been written.
+ ///
+ /// A new writable event (e.g. allowing another write) will only happen once
+ /// the buffer has been written completely (or hit an error).
+ fn schedule_write(&self,
+ buf: Vec<u8>,
+ mut pos: usize,
+ me: &mut StreamInner) {
+
+ // About to write, clear any pending level triggered events
+ me.iocp.set_readiness(me.iocp.readiness() - Ready::writable());
+
+ loop {
+ trace!("scheduling a write of {} bytes", buf[pos..].len());
+ let ret = unsafe {
+ self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr())
+ };
+ match ret {
+ Ok(Some(transferred_bytes)) if me.instant_notify => {
+ trace!("done immediately with {} bytes", transferred_bytes);
+ if transferred_bytes == buf.len() - pos {
+ self.add_readiness(me, Ready::writable());
+ me.write = State::Empty;
+ break;
+ }
+ pos += transferred_bytes;
+ }
+ Ok(_) => {
+ trace!("scheduled for later");
+ // see docs above on StreamImp.inner for rationale on forget
+ me.write = State::Pending((buf, pos));
+ mem::forget(self.clone());
+ break;
+ }
+ Err(e) => {
+ trace!("write error: {}", e);
+ me.write = State::Error(e);
+ self.add_readiness(me, Ready::writable());
+ me.iocp.put_buffer(buf);
+ break;
+ }
+ }
+ }
+ }
+
+ /// Pushes an event for this socket onto the selector its registered for.
+ ///
+ /// When an event is generated on this socket, if it happened after the
+ /// socket was closed then we don't want to actually push the event onto our
+ /// selector as otherwise it's just a spurious notification.
+ fn add_readiness(&self, me: &mut StreamInner, set: Ready) {
+ me.iocp.set_readiness(set | me.iocp.readiness());
+ }
+}
+
+fn read_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ let me2 = StreamImp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
+ };
+
+ let mut me = me2.inner();
+ match mem::replace(&mut me.read, State::Empty) {
+ State::Pending(()) => {
+ trace!("finished a read: {}", status.bytes_transferred());
+ assert_eq!(status.bytes_transferred(), 0);
+ me.read = State::Ready(());
+ return me2.add_readiness(&mut me, Ready::readable())
+ }
+ s => me.read = s,
+ }
+
+ // If a read didn't complete, then the connect must have just finished.
+ trace!("finished a connect");
+
+ // By guarding with socket.result(), we ensure that a connection
+ // was successfully made before performing operations requiring a
+ // connected socket.
+ match unsafe { me2.inner.socket.result(status.overlapped()) }
+ .and_then(|_| me2.inner.socket.connect_complete())
+ {
+ Ok(()) => {
+ me2.add_readiness(&mut me, Ready::writable());
+ me2.schedule_read(&mut me);
+ }
+ Err(e) => {
+ me2.add_readiness(&mut me, Ready::readable() | Ready::writable());
+ me.read = State::Error(e);
+ }
+ }
+}
+
+fn write_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ trace!("finished a write {}", status.bytes_transferred());
+ let me2 = StreamImp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
+ };
+ let mut me = me2.inner();
+ let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
+ State::Pending(pair) => pair,
+ _ => unreachable!(),
+ };
+ let new_pos = pos + (status.bytes_transferred() as usize);
+ if new_pos == buf.len() {
+ me2.add_readiness(&mut me, Ready::writable());
+ } else {
+ me2.schedule_write(buf, new_pos, &mut me);
+ }
+}
+
+impl Evented for TcpStream {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.register_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+
+ unsafe {
+ super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
+ me.instant_notify = true;
+ }
+
+ // If we were connected before being registered process that request
+ // here and go along our merry ways. Note that the callback for a
+ // successful connect will worry about generating writable/readable
+ // events and scheduling a new read.
+ if let Some(addr) = me.deferred_connect.take() {
+ return self.imp.schedule_connect(&addr).map(|_| ())
+ }
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner().iocp.deregister(&self.imp.inner.socket,
+ poll, &self.registration)
+ }
+}
+
+impl fmt::Debug for TcpStream {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("TcpStream")
+ .finish()
+ }
+}
+
+impl Drop for TcpStream {
+ fn drop(&mut self) {
+ // If we're still internally reading, we're no longer interested. Note
+ // though that we don't cancel any writes which may have been issued to
+ // preserve the same semantics as Unix.
+ //
+ // Note that "Empty" here may mean that a connect is pending, so we
+ // cancel even if that happens as well.
+ unsafe {
+ match self.inner().read {
+ State::Pending(_) | State::Empty => {
+ trace!("cancelling active TCP read");
+ drop(super::cancel(&self.imp.inner.socket,
+ &self.imp.inner.read));
+ }
+ State::Ready(_) | State::Error(_) => {}
+ }
+ }
+ }
+}
+
+impl TcpListener {
+ pub fn new(socket: net::TcpListener)
+ -> io::Result<TcpListener> {
+ let addr = socket.local_addr()?;
+ Ok(TcpListener::new_family(socket, match addr {
+ SocketAddr::V4(..) => Family::V4,
+ SocketAddr::V6(..) => Family::V6,
+ }))
+ }
+
+ fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
+ TcpListener {
+ registration: Mutex::new(None),
+ imp: ListenerImp {
+ inner: FromRawArc::new(ListenerIo {
+ accept: Overlapped::new(accept_done),
+ family: family,
+ socket: socket,
+ inner: Mutex::new(ListenerInner {
+ iocp: ReadyBinding::new(),
+ accept: State::Empty,
+ accept_buf: AcceptAddrsBuf::new(),
+ }),
+ }),
+ },
+ }
+ }
+
+ pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> {
+ let mut me = self.inner();
+
+ let ret = match mem::replace(&mut me.accept, State::Empty) {
+ State::Empty => return Err(io::ErrorKind::WouldBlock.into()),
+ State::Pending(t) => {
+ me.accept = State::Pending(t);
+ return Err(io::ErrorKind::WouldBlock.into());
+ }
+ State::Ready((s, a)) => Ok((s, a)),
+ State::Error(e) => Err(e),
+ };
+
+ self.imp.schedule_accept(&mut me);
+
+ return ret
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<TcpListener> {
+ self.imp.inner.socket.try_clone().map(|s| {
+ TcpListener::new_family(s, self.imp.inner.family)
+ })
+ }
+
+ #[allow(deprecated)]
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_only_v6(only_v6)
+ }
+
+ #[allow(deprecated)]
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.only_v6()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_ttl(ttl)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.imp.inner.socket.ttl()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.imp.inner.socket.take_error()
+ }
+
+ fn inner(&self) -> MutexGuard<ListenerInner> {
+ self.imp.inner()
+ }
+}
+
+impl ListenerImp {
+ fn inner(&self) -> MutexGuard<ListenerInner> {
+ self.inner.inner.lock().unwrap()
+ }
+
+ fn schedule_accept(&self, me: &mut ListenerInner) {
+ match me.accept {
+ State::Empty => {}
+ _ => return
+ }
+
+ me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
+
+ let res = match self.inner.family {
+ Family::V4 => TcpBuilder::new_v4(),
+ Family::V6 => TcpBuilder::new_v6(),
+ }
+ .and_then(|builder| builder.to_tcp_stream())
+ .and_then(|stream| unsafe {
+ trace!("scheduling an accept");
+ self.inner
+ .socket
+ .accept_overlapped(&stream, &mut me.accept_buf, self.inner.accept.as_mut_ptr())
+ .map(|x| (stream, x))
+ });
+ match res {
+ Ok((socket, _)) => {
+ // see docs above on StreamImp.inner for rationale on forget
+ me.accept = State::Pending(socket);
+ mem::forget(self.clone());
+ }
+ Err(e) => {
+ me.accept = State::Error(e);
+ self.add_readiness(me, Ready::readable());
+ }
+ }
+ }
+
+ // See comments in StreamImp::push
+ fn add_readiness(&self, me: &mut ListenerInner, set: Ready) {
+ me.iocp.set_readiness(set | me.iocp.readiness());
+ }
+}
+
+fn accept_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ let me2 = ListenerImp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
+ };
+
+ let mut me = me2.inner();
+ let socket = match mem::replace(&mut me.accept, State::Empty) {
+ State::Pending(s) => s,
+ _ => unreachable!(),
+ };
+ trace!("finished an accept");
+ let result = me2.inner.socket.accept_complete(&socket).and_then(|()| {
+ me.accept_buf.parse(&me2.inner.socket)
+ }).and_then(|buf| {
+ buf.remote().ok_or_else(|| {
+ io::Error::new(ErrorKind::Other, "could not obtain remote address")
+ })
+ });
+ me.accept = match result {
+ Ok(remote_addr) => State::Ready((socket, remote_addr)),
+ Err(e) => State::Error(e),
+ };
+ me2.add_readiness(&mut me, Ready::readable());
+}
+
+impl Evented for TcpListener {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.register_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+
+ unsafe {
+ super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
+ }
+
+ self.imp.schedule_accept(&mut me);
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
+ interest, opts, &self.registration)?;
+ self.imp.schedule_accept(&mut me);
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner().iocp.deregister(&self.imp.inner.socket,
+ poll, &self.registration)
+ }
+}
+
+impl fmt::Debug for TcpListener {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("TcpListener")
+ .finish()
+ }
+}
+
+impl Drop for TcpListener {
+ fn drop(&mut self) {
+ // If we're still internally reading, we're no longer interested.
+ unsafe {
+ match self.inner().accept {
+ State::Pending(_) => {
+ trace!("cancelling active TCP accept");
+ drop(super::cancel(&self.imp.inner.socket,
+ &self.imp.inner.accept));
+ }
+ State::Empty |
+ State::Ready(_) |
+ State::Error(_) => {}
+ }
+ }
+ }
+}
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/udp.rs b/third_party/rust/mio-0.6.23/src/sys/windows/udp.rs
new file mode 100644
index 0000000000..f5ea96c324
--- /dev/null
+++ b/third_party/rust/mio-0.6.23/src/sys/windows/udp.rs
@@ -0,0 +1,414 @@
+//! UDP for IOCP
+//!
+//! Note that most of this module is quite similar to the TCP module, so if
+//! something seems odd you may also want to try the docs over there.
+
+use std::fmt;
+use std::io::prelude::*;
+use std::io;
+use std::mem;
+use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::sync::{Mutex, MutexGuard};
+
+#[allow(unused_imports)]
+use net2::{UdpBuilder, UdpSocketExt};
+use winapi::shared::winerror::WSAEMSGSIZE;
+use winapi::um::minwinbase::OVERLAPPED_ENTRY;
+use miow::iocp::CompletionStatus;
+use miow::net::SocketAddrBuf;
+use miow::net::UdpSocketExt as MiowUdpSocketExt;
+
+use {poll, Ready, Poll, PollOpt, Token};
+use event::Evented;
+use sys::windows::from_raw_arc::FromRawArc;
+use sys::windows::selector::{Overlapped, ReadyBinding};
+
+pub struct UdpSocket {
+ imp: Imp,
+ registration: Mutex<Option<poll::Registration>>,
+}
+
+#[derive(Clone)]
+struct Imp {
+ inner: FromRawArc<Io>,
+}
+
+struct Io {
+ read: Overlapped,
+ write: Overlapped,
+ socket: net::UdpSocket,
+ inner: Mutex<Inner>,
+}
+
+struct Inner {
+ iocp: ReadyBinding,
+ read: State<Vec<u8>, Vec<u8>>,
+ write: State<Vec<u8>, (Vec<u8>, usize)>,
+ read_buf: SocketAddrBuf,
+}
+
+enum State<T, U> {
+ Empty,
+ Pending(T),
+ Ready(U),
+ Error(io::Error),
+}
+
+impl UdpSocket {
+ pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> {
+ Ok(UdpSocket {
+ registration: Mutex::new(None),
+ imp: Imp {
+ inner: FromRawArc::new(Io {
+ read: Overlapped::new(recv_done),
+ write: Overlapped::new(send_done),
+ socket: socket,
+ inner: Mutex::new(Inner {
+ iocp: ReadyBinding::new(),
+ read: State::Empty,
+ write: State::Empty,
+ read_buf: SocketAddrBuf::new(),
+ }),
+ }),
+ },
+ })
+ }
+
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.imp.inner.socket.local_addr()
+ }
+
+ pub fn try_clone(&self) -> io::Result<UdpSocket> {
+ self.imp.inner.socket.try_clone().and_then(UdpSocket::new)
+ }
+
+ /// Note that unlike `TcpStream::write` this function will not attempt to
+ /// continue writing `buf` until its entirely written.
+ ///
+ /// TODO: This... may be wrong in the long run. We're reporting that we
+ /// successfully wrote all of the bytes in `buf` but it's possible
+ /// that we don't actually end up writing all of them!
+ pub fn send_to(&self, buf: &[u8], target: &SocketAddr)
+ -> io::Result<usize> {
+ let mut me = self.inner();
+ let me = &mut *me;
+
+ match me.write {
+ State::Empty => {}
+ _ => return Err(io::ErrorKind::WouldBlock.into()),
+ }
+
+ if !me.iocp.registered() {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let interest = me.iocp.readiness();
+ me.iocp.set_readiness(interest - Ready::writable());
+
+ let mut owned_buf = me.iocp.get_buffer(64 * 1024);
+ let amt = owned_buf.write(buf)?;
+ unsafe {
+ trace!("scheduling a send");
+ self.imp.inner.socket.send_to_overlapped(&owned_buf, target,
+ self.imp.inner.write.as_mut_ptr())
+ }?;
+ me.write = State::Pending(owned_buf);
+ mem::forget(self.imp.clone());
+ Ok(amt)
+ }
+
+ /// Note that unlike `TcpStream::write` this function will not attempt to
+ /// continue writing `buf` until its entirely written.
+ ///
+ /// TODO: This... may be wrong in the long run. We're reporting that we
+ /// successfully wrote all of the bytes in `buf` but it's possible
+ /// that we don't actually end up writing all of them!
+ pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ let mut me = self.inner();
+ let me = &mut *me;
+
+ match me.write {
+ State::Empty => {}
+ _ => return Err(io::ErrorKind::WouldBlock.into()),
+ }
+
+ if !me.iocp.registered() {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let interest = me.iocp.readiness();
+ me.iocp.set_readiness(interest - Ready::writable());
+
+ let mut owned_buf = me.iocp.get_buffer(64 * 1024);
+ let amt = owned_buf.write(buf)?;
+ unsafe {
+ trace!("scheduling a send");
+ self.imp.inner.socket.send_overlapped(&owned_buf, self.imp.inner.write.as_mut_ptr())
+
+ }?;
+ me.write = State::Pending(owned_buf);
+ mem::forget(self.imp.clone());
+ Ok(amt)
+ }
+
+ pub fn recv_from(&self, mut buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let mut me = self.inner();
+ match mem::replace(&mut me.read, State::Empty) {
+ State::Empty => Err(io::ErrorKind::WouldBlock.into()),
+ State::Pending(b) => { me.read = State::Pending(b); Err(io::ErrorKind::WouldBlock.into()) }
+ State::Ready(data) => {
+ // If we weren't provided enough space to receive the message
+ // then don't actually read any data, just return an error.
+ if buf.len() < data.len() {
+ me.read = State::Ready(data);
+ Err(io::Error::from_raw_os_error(WSAEMSGSIZE as i32))
+ } else {
+ let r = if let Some(addr) = me.read_buf.to_socket_addr() {
+ buf.write(&data).unwrap();
+ Ok((data.len(), addr))
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "failed to parse socket address"))
+ };
+ me.iocp.put_buffer(data);
+ self.imp.schedule_read_from(&mut me);
+ r
+ }
+ }
+ State::Error(e) => {
+ self.imp.schedule_read_from(&mut me);
+ Err(e)
+ }
+ }
+ }
+
+ pub fn recv(&self, buf: &mut [u8])
+ -> io::Result<usize> {
+ //Since recv_from can be used on connected sockets just call it and drop the address.
+ self.recv_from(buf).map(|(size,_)| size)
+ }
+
+ pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
+ self.imp.inner.socket.connect(addr)
+ }
+
+ pub fn broadcast(&self) -> io::Result<bool> {
+ self.imp.inner.socket.broadcast()
+ }
+
+ pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_broadcast(on)
+ }
+
+ pub fn multicast_loop_v4(&self) -> io::Result<bool> {
+ self.imp.inner.socket.multicast_loop_v4()
+ }
+
+ pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_multicast_loop_v4(on)
+ }
+
+ pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
+ self.imp.inner.socket.multicast_ttl_v4()
+ }
+
+ pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_multicast_ttl_v4(ttl)
+ }
+
+ pub fn multicast_loop_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.multicast_loop_v6()
+ }
+
+ pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_multicast_loop_v6(on)
+ }
+
+ pub fn ttl(&self) -> io::Result<u32> {
+ self.imp.inner.socket.ttl()
+ }
+
+ pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
+ self.imp.inner.socket.set_ttl(ttl)
+ }
+
+ pub fn join_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.imp.inner.socket.join_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn join_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.imp.inner.socket.join_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v4(&self,
+ multiaddr: &Ipv4Addr,
+ interface: &Ipv4Addr) -> io::Result<()> {
+ self.imp.inner.socket.leave_multicast_v4(multiaddr, interface)
+ }
+
+ pub fn leave_multicast_v6(&self,
+ multiaddr: &Ipv6Addr,
+ interface: u32) -> io::Result<()> {
+ self.imp.inner.socket.leave_multicast_v6(multiaddr, interface)
+ }
+
+ pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
+ self.imp.inner.socket.set_only_v6(only_v6)
+ }
+
+ pub fn only_v6(&self) -> io::Result<bool> {
+ self.imp.inner.socket.only_v6()
+ }
+
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.imp.inner.socket.take_error()
+ }
+
+ fn inner(&self) -> MutexGuard<Inner> {
+ self.imp.inner()
+ }
+
+ fn post_register(&self, interest: Ready, me: &mut Inner) {
+ if interest.is_readable() {
+ //We use recv_from here since it is well specified for both
+ //connected and non-connected sockets and we can discard the address
+ //when calling recv().
+ self.imp.schedule_read_from(me);
+ }
+ // See comments in TcpSocket::post_register for what's going on here
+ if interest.is_writable() {
+ if let State::Empty = me.write {
+ self.imp.add_readiness(me, Ready::writable());
+ }
+ }
+ }
+}
+
+impl Imp {
+ fn inner(&self) -> MutexGuard<Inner> {
+ self.inner.inner.lock().unwrap()
+ }
+
+ fn schedule_read_from(&self, me: &mut Inner) {
+ match me.read {
+ State::Empty => {}
+ _ => return,
+ }
+
+ let interest = me.iocp.readiness();
+ me.iocp.set_readiness(interest - Ready::readable());
+
+ let mut buf = me.iocp.get_buffer(64 * 1024);
+ let res = unsafe {
+ trace!("scheduling a read");
+ let cap = buf.capacity();
+ buf.set_len(cap);
+ self.inner.socket.recv_from_overlapped(&mut buf, &mut me.read_buf,
+ self.inner.read.as_mut_ptr())
+ };
+ match res {
+ Ok(_) => {
+ me.read = State::Pending(buf);
+ mem::forget(self.clone());
+ }
+ Err(e) => {
+ me.read = State::Error(e);
+ self.add_readiness(me, Ready::readable());
+ me.iocp.put_buffer(buf);
+ }
+ }
+ }
+
+ // See comments in tcp::StreamImp::push
+ fn add_readiness(&self, me: &Inner, set: Ready) {
+ me.iocp.set_readiness(set | me.iocp.readiness());
+ }
+}
+
+impl Evented for UdpSocket {
+ fn register(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.register_socket(&self.imp.inner.socket,
+ poll, token, interest, opts,
+ &self.registration)?;
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token,
+ interest: Ready, opts: PollOpt) -> io::Result<()> {
+ let mut me = self.inner();
+ me.iocp.reregister_socket(&self.imp.inner.socket,
+ poll, token, interest,
+ opts, &self.registration)?;
+ self.post_register(interest, &mut me);
+ Ok(())
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner().iocp.deregister(&self.imp.inner.socket,
+ poll, &self.registration)
+ }
+}
+
+impl fmt::Debug for UdpSocket {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("UdpSocket")
+ .finish()
+ }
+}
+
+impl Drop for UdpSocket {
+ fn drop(&mut self) {
+ let inner = self.inner();
+
+ // If we're still internally reading, we're no longer interested. Note
+ // though that we don't cancel any writes which may have been issued to
+ // preserve the same semantics as Unix.
+ unsafe {
+ match inner.read {
+ State::Pending(_) => {
+ drop(super::cancel(&self.imp.inner.socket,
+ &self.imp.inner.read));
+ }
+ State::Empty |
+ State::Ready(_) |
+ State::Error(_) => {}
+ }
+ }
+ }
+}
+
+fn send_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ trace!("finished a send {}", status.bytes_transferred());
+ let me2 = Imp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), Io, write) },
+ };
+ let mut me = me2.inner();
+ me.write = State::Empty;
+ me2.add_readiness(&mut me, Ready::writable());
+}
+
+fn recv_done(status: &OVERLAPPED_ENTRY) {
+ let status = CompletionStatus::from_entry(status);
+ trace!("finished a recv {}", status.bytes_transferred());
+ let me2 = Imp {
+ inner: unsafe { overlapped2arc!(status.overlapped(), Io, read) },
+ };
+ let mut me = me2.inner();
+ let mut buf = match mem::replace(&mut me.read, State::Empty) {
+ State::Pending(buf) => buf,
+ _ => unreachable!(),
+ };
+ unsafe {
+ buf.set_len(status.bytes_transferred() as usize);
+ }
+ me.read = State::Ready(buf);
+ me2.add_readiness(&mut me, Ready::readable());
+}