diff options
Diffstat (limited to 'third_party/rust/mio-0.6.23/src/sys/windows')
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs | 66 | ||||
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs | 20 | ||||
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs | 116 | ||||
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/mod.rs | 193 | ||||
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/selector.rs | 538 | ||||
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs | 853 | ||||
-rw-r--r-- | third_party/rust/mio-0.6.23/src/sys/windows/udp.rs | 414 |
7 files changed, 2200 insertions, 0 deletions
diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs b/third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs new file mode 100644 index 0000000000..c913bc93f8 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/awakener.rs @@ -0,0 +1,66 @@ +use std::sync::Mutex; + +use miow::iocp::CompletionStatus; +use {io, poll, Ready, Poll, PollOpt, Token}; +use event::Evented; +use sys::windows::Selector; + +pub struct Awakener { + inner: Mutex<Option<AwakenerInner>>, +} + +struct AwakenerInner { + token: Token, + selector: Selector, +} + +impl Awakener { + pub fn new() -> io::Result<Awakener> { + Ok(Awakener { + inner: Mutex::new(None), + }) + } + + pub fn wakeup(&self) -> io::Result<()> { + // Each wakeup notification has NULL as its `OVERLAPPED` pointer to + // indicate that it's from this awakener and not part of an I/O + // operation. This is specially recognized by the selector. + // + // If we haven't been registered with an event loop yet just silently + // succeed. + if let Some(inner) = self.inner.lock().unwrap().as_ref() { + let status = CompletionStatus::new(0, + usize::from(inner.token), + 0 as *mut _); + inner.selector.port().post(status)?; + } + Ok(()) + } + + pub fn cleanup(&self) { + // noop + } +} + +impl Evented for Awakener { + fn register(&self, poll: &Poll, token: Token, events: Ready, + opts: PollOpt) -> io::Result<()> { + assert_eq!(opts, PollOpt::edge()); + assert_eq!(events, Ready::readable()); + *self.inner.lock().unwrap() = Some(AwakenerInner { + selector: poll::selector(poll).clone_ref(), + token: token, + }); + Ok(()) + } + + fn reregister(&self, poll: &Poll, token: Token, events: Ready, + opts: PollOpt) -> io::Result<()> { + self.register(poll, token, events, opts) + } + + fn deregister(&self, _poll: &Poll) -> io::Result<()> { + *self.inner.lock().unwrap() = None; + Ok(()) + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs b/third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs new file mode 100644 index 0000000000..86754593fd --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/buffer_pool.rs @@ -0,0 +1,20 @@ +pub struct BufferPool { + pool: Vec<Vec<u8>>, +} + +impl BufferPool { + pub fn new(cap: usize) -> BufferPool { + BufferPool { pool: Vec::with_capacity(cap) } + } + + pub fn get(&mut self, default_cap: usize) -> Vec<u8> { + self.pool.pop().unwrap_or_else(|| Vec::with_capacity(default_cap)) + } + + pub fn put(&mut self, mut buf: Vec<u8>) { + if self.pool.len() < self.pool.capacity(){ + unsafe { buf.set_len(0); } + self.pool.push(buf); + } + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs b/third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs new file mode 100644 index 0000000000..b6d38b2408 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/from_raw_arc.rs @@ -0,0 +1,116 @@ +//! A "Manual Arc" which allows manually frobbing the reference count +//! +//! This module contains a copy of the `Arc` found in the standard library, +//! stripped down to the bare bones of what we actually need. The reason this is +//! done is for the ability to concretely know the memory layout of the `Inner` +//! structure of the arc pointer itself (e.g. `ArcInner` in the standard +//! library). +//! +//! We do some unsafe casting from `*mut OVERLAPPED` to a `FromRawArc<T>` to +//! ensure that data lives for the length of an I/O operation, but this means +//! that we have to know the layouts of the structures involved. This +//! representation primarily guarantees that the data, `T` is at the front of +//! the inner pointer always. +//! +//! Note that we're missing out on some various optimizations implemented in the +//! standard library: +//! +//! * The size of `FromRawArc` is actually two words because of the drop flag +//! * The compiler doesn't understand that the pointer in `FromRawArc` is never +//! null, so Option<FromRawArc<T>> is not a nullable pointer. + +use std::ops::Deref; +use std::mem; +use std::sync::atomic::{self, AtomicUsize, Ordering}; + +pub struct FromRawArc<T> { + _inner: *mut Inner<T>, +} + +unsafe impl<T: Sync + Send> Send for FromRawArc<T> { } +unsafe impl<T: Sync + Send> Sync for FromRawArc<T> { } + +#[repr(C)] +struct Inner<T> { + data: T, + cnt: AtomicUsize, +} + +impl<T> FromRawArc<T> { + pub fn new(data: T) -> FromRawArc<T> { + let x = Box::new(Inner { + data: data, + cnt: AtomicUsize::new(1), + }); + FromRawArc { _inner: unsafe { mem::transmute(x) } } + } + + pub unsafe fn from_raw(ptr: *mut T) -> FromRawArc<T> { + // Note that if we could use `mem::transmute` here to get a libstd Arc + // (guaranteed) then we could just use std::sync::Arc, but this is the + // crucial reason this currently exists. + FromRawArc { _inner: ptr as *mut Inner<T> } + } +} + +impl<T> Clone for FromRawArc<T> { + fn clone(&self) -> FromRawArc<T> { + // Atomic ordering of Relaxed lifted from libstd, but the general idea + // is that you need synchronization to communicate this increment to + // another thread, so this itself doesn't need to be synchronized. + unsafe { + (*self._inner).cnt.fetch_add(1, Ordering::Relaxed); + } + FromRawArc { _inner: self._inner } + } +} + +impl<T> Deref for FromRawArc<T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &(*self._inner).data } + } +} + +impl<T> Drop for FromRawArc<T> { + fn drop(&mut self) { + unsafe { + // Atomic orderings lifted from the standard library + if (*self._inner).cnt.fetch_sub(1, Ordering::Release) != 1 { + return + } + atomic::fence(Ordering::Acquire); + drop(mem::transmute::<_, Box<T>>(self._inner)); + } + } +} + +#[cfg(test)] +mod tests { + use super::FromRawArc; + + #[test] + fn smoke() { + let a = FromRawArc::new(1); + assert_eq!(*a, 1); + assert_eq!(*a.clone(), 1); + } + + #[test] + fn drops() { + struct A<'a>(&'a mut bool); + impl<'a> Drop for A<'a> { + fn drop(&mut self) { + *self.0 = true; + } + } + let mut a = false; + { + let a = FromRawArc::new(A(&mut a)); + let _ = a.clone(); + assert!(!*a.0); + } + assert!(a); + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/mod.rs b/third_party/rust/mio-0.6.23/src/sys/windows/mod.rs new file mode 100644 index 0000000000..9b9f054495 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/mod.rs @@ -0,0 +1,193 @@ +//! Implementation of mio for Windows using IOCP +//! +//! This module uses I/O Completion Ports (IOCP) on Windows to implement mio's +//! Unix epoll-like interface. Unfortunately these two I/O models are +//! fundamentally incompatible: +//! +//! * IOCP is a completion-based model where work is submitted to the kernel and +//! a program is notified later when the work finished. +//! * epoll is a readiness-based model where the kernel is queried as to what +//! work can be done, and afterwards the work is done. +//! +//! As a result, this implementation for Windows is much less "low level" than +//! the Unix implementation of mio. This design decision was intentional, +//! however. +//! +//! ## What is IOCP? +//! +//! The [official docs][docs] have a comprehensive explanation of what IOCP is, +//! but at a high level it requires the following operations to be executed to +//! perform some I/O: +//! +//! 1. A completion port is created +//! 2. An I/O handle and a token is registered with this completion port +//! 3. Some I/O is issued on the handle. This generally means that an API was +//! invoked with a zeroed `OVERLAPPED` structure. The API will immediately +//! return. +//! 4. After some time, the application queries the I/O port for completed +//! events. The port will returned a pointer to the `OVERLAPPED` along with +//! the token presented at registration time. +//! +//! Many I/O operations can be fired off before waiting on a port, and the port +//! will block execution of the calling thread until an I/O event has completed +//! (or a timeout has elapsed). +//! +//! Currently all of these low-level operations are housed in a separate `miow` +//! crate to provide a 0-cost abstraction over IOCP. This crate uses that to +//! implement all fiddly bits so there's very few actual Windows API calls or +//! `unsafe` blocks as a result. +//! +//! [docs]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198%28v=vs.85%29.aspx +//! +//! ## Safety of IOCP +//! +//! Unfortunately for us, IOCP is pretty unsafe in terms of Rust lifetimes and +//! such. When an I/O operation is submitted to the kernel, it involves handing +//! the kernel a few pointers like a buffer to read/write, an `OVERLAPPED` +//! structure pointer, and perhaps some other buffers such as for socket +//! addresses. These pointers all have to remain valid **for the entire I/O +//! operation's duration**. +//! +//! There's no way to define a safe lifetime for these pointers/buffers over +//! the span of an I/O operation, so we're forced to add a layer of abstraction +//! (not 0-cost) to make these APIs safe. Currently this implementation +//! basically just boxes everything up on the heap to give it a stable address +//! and then keys off that most of the time. +//! +//! ## From completion to readiness +//! +//! Translating a completion-based model to a readiness-based model is also no +//! easy task, and a significant portion of this implementation is managing this +//! translation. The basic idea behind this implementation is to issue I/O +//! operations preemptively and then translate their completions to a "I'm +//! ready" event. +//! +//! For example, in the case of reading a `TcpSocket`, as soon as a socket is +//! connected (or registered after an accept) a read operation is executed. +//! While the read is in progress calls to `read` will return `WouldBlock`, and +//! once the read is completed we translate the completion notification into a +//! `readable` event. Once the internal buffer is drained (e.g. all data from it +//! has been read) a read operation is re-issued. +//! +//! Write operations are a little different from reads, and the current +//! implementation is to just schedule a write as soon as `write` is first +//! called. While that write operation is in progress all future calls to +//! `write` will return `WouldBlock`. Completion of the write then translates to +//! a `writable` event. Note that this will probably want to add some layer of +//! internal buffering in the future. +//! +//! ## Buffer Management +//! +//! As there's lots of I/O operations in flight at any one point in time, +//! there's lots of live buffers that need to be juggled around (e.g. this +//! implementation's own internal buffers). +//! +//! Currently all buffers are created for the I/O operation at hand and are then +//! discarded when it completes (this is listed as future work below). +//! +//! ## Callback Management +//! +//! When the main event loop receives a notification that an I/O operation has +//! completed, some work needs to be done to translate that to a set of events +//! or perhaps some more I/O needs to be scheduled. For example after a +//! `TcpStream` is connected it generates a writable event and also schedules a +//! read. +//! +//! To manage all this the `Selector` uses the `OVERLAPPED` pointer from the +//! completion status. The selector assumes that all `OVERLAPPED` pointers are +//! actually pointers to the interior of a `selector::Overlapped` which means +//! that right after the `OVERLAPPED` itself there's a function pointer. This +//! function pointer is given the completion status as well as another callback +//! to push events onto the selector. +//! +//! The callback for each I/O operation doesn't have any environment, so it +//! relies on memory layout and unsafe casting to translate an `OVERLAPPED` +//! pointer (or in this case a `selector::Overlapped` pointer) to a type of +//! `FromRawArc<T>` (see module docs for why this type exists). +//! +//! ## Thread Safety +//! +//! Currently all of the I/O primitives make liberal use of `Arc` and `Mutex` +//! as an implementation detail. The main reason for this is to ensure that the +//! types are `Send` and `Sync`, but the implementations have not been stressed +//! in multithreaded situations yet. As a result, there are bound to be +//! functional surprises in using these concurrently. +//! +//! ## Future Work +//! +//! First up, let's take a look at unimplemented portions of this module: +//! +//! * The `PollOpt::level()` option is currently entirely unimplemented. +//! * Each `EventLoop` currently owns its completion port, but this prevents an +//! I/O handle from being added to multiple event loops (something that can be +//! done on Unix). Additionally, it hinders event loops moving across threads. +//! This should be solved by likely having a global `Selector` which all +//! others then communicate with. +//! * Although Unix sockets don't exist on Windows, there are named pipes and +//! those should likely be bound here in a similar fashion to `TcpStream`. +//! +//! Next up, there are a few performance improvements and optimizations that can +//! still be implemented +//! +//! * Buffer management right now is pretty bad, they're all just allocated +//! right before an I/O operation and discarded right after. There should at +//! least be some form of buffering buffers. +//! * No calls to `write` are internally buffered before being scheduled, which +//! means that writing performance is abysmal compared to Unix. There should +//! be some level of buffering of writes probably. + +use std::io; +use std::os::windows::prelude::*; + +mod kernel32 { + pub use ::winapi::um::ioapiset::CancelIoEx; + pub use ::winapi::um::winbase::SetFileCompletionNotificationModes; +} +mod winapi { + pub use ::winapi::shared::minwindef::{TRUE, UCHAR}; + pub use ::winapi::um::winnt::HANDLE; +} + +mod awakener; +#[macro_use] +mod selector; +mod tcp; +mod udp; +mod from_raw_arc; +mod buffer_pool; + +pub use self::awakener::Awakener; +pub use self::selector::{Events, Selector, Overlapped, Binding}; +pub use self::tcp::{TcpStream, TcpListener}; +pub use self::udp::UdpSocket; + +#[derive(Copy, Clone)] +enum Family { + V4, V6, +} + +unsafe fn cancel(socket: &AsRawSocket, + overlapped: &Overlapped) -> io::Result<()> { + let handle = socket.as_raw_socket() as winapi::HANDLE; + let ret = kernel32::CancelIoEx(handle, overlapped.as_mut_ptr()); + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} + +unsafe fn no_notify_on_instant_completion(handle: winapi::HANDLE) -> io::Result<()> { + // TODO: move those to winapi + const FILE_SKIP_COMPLETION_PORT_ON_SUCCESS: winapi::UCHAR = 1; + const FILE_SKIP_SET_EVENT_ON_HANDLE: winapi::UCHAR = 2; + + let flags = FILE_SKIP_COMPLETION_PORT_ON_SUCCESS | FILE_SKIP_SET_EVENT_ON_HANDLE; + + let r = kernel32::SetFileCompletionNotificationModes(handle, flags); + if r == winapi::TRUE { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/selector.rs b/third_party/rust/mio-0.6.23/src/sys/windows/selector.rs new file mode 100644 index 0000000000..23b145acdd --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/selector.rs @@ -0,0 +1,538 @@ +#![allow(deprecated)] + +use std::{fmt, io}; +use std::cell::UnsafeCell; +use std::os::windows::prelude::*; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::time::Duration; + +use lazycell::AtomicLazyCell; + +use winapi::shared::winerror::WAIT_TIMEOUT; +use winapi::um::minwinbase::{OVERLAPPED, OVERLAPPED_ENTRY}; +use miow; +use miow::iocp::{CompletionPort, CompletionStatus}; + +use event_imp::{Event, Evented, Ready}; +use poll::{self, Poll}; +use sys::windows::buffer_pool::BufferPool; +use {Token, PollOpt}; + +/// Each Selector has a globally unique(ish) ID associated with it. This ID +/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first +/// registered with the `Selector`. If a type that is previously associated with +/// a `Selector` attempts to register itself with a different `Selector`, the +/// operation will return with an error. This matches windows behavior. +static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; + +/// The guts of the Windows event loop, this is the struct which actually owns +/// a completion port. +/// +/// Internally this is just an `Arc`, and this allows handing out references to +/// the internals to I/O handles registered on this selector. This is +/// required to schedule I/O operations independently of being inside the event +/// loop (e.g. when a call to `write` is seen we're not "in the event loop"). +pub struct Selector { + inner: Arc<SelectorInner>, +} + +struct SelectorInner { + /// Unique identifier of the `Selector` + id: usize, + + /// The actual completion port that's used to manage all I/O + port: CompletionPort, + + /// A pool of buffers usable by this selector. + /// + /// Primitives will take buffers from this pool to perform I/O operations, + /// and once complete they'll be put back in. + buffers: Mutex<BufferPool>, +} + +impl Selector { + pub fn new() -> io::Result<Selector> { + // offset by 1 to avoid choosing 0 as the id of a selector + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1; + + CompletionPort::new(0).map(|cp| { + Selector { + inner: Arc::new(SelectorInner { + id: id, + port: cp, + buffers: Mutex::new(BufferPool::new(256)), + }), + } + }) + } + + pub fn select(&self, + events: &mut Events, + awakener: Token, + timeout: Option<Duration>) -> io::Result<bool> { + trace!("select; timeout={:?}", timeout); + + // Clear out the previous list of I/O events and get some more! + events.clear(); + + trace!("polling IOCP"); + let n = match self.inner.port.get_many(&mut events.statuses, timeout) { + Ok(statuses) => statuses.len(), + Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0, + Err(e) => return Err(e), + }; + + let mut ret = false; + for status in events.statuses[..n].iter() { + // This should only ever happen from the awakener, and we should + // only ever have one awakener right now, so assert as such. + if status.overlapped() as usize == 0 { + assert_eq!(status.token(), usize::from(awakener)); + ret = true; + continue; + } + + let callback = unsafe { + (*(status.overlapped() as *mut Overlapped)).callback + }; + + trace!("select; -> got overlapped"); + callback(status.entry()); + } + + trace!("returning"); + Ok(ret) + } + + /// Gets a reference to the underlying `CompletionPort` structure. + pub fn port(&self) -> &CompletionPort { + &self.inner.port + } + + /// Gets a new reference to this selector, although all underlying data + /// structures will refer to the same completion port. + pub fn clone_ref(&self) -> Selector { + Selector { inner: self.inner.clone() } + } + + /// Return the `Selector`'s identifier + pub fn id(&self) -> usize { + self.inner.id + } +} + +impl SelectorInner { + fn identical(&self, other: &SelectorInner) -> bool { + (self as *const SelectorInner) == (other as *const SelectorInner) + } +} + +// A registration is stored in each I/O object which keeps track of how it is +// associated with a `Selector` above. +// +// Once associated with a `Selector`, a registration can never be un-associated +// (due to IOCP requirements). This is actually implemented through the +// `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the +// level/edge/filtering business. +/// A `Binding` is embedded in all I/O objects associated with a `Poll` +/// object. +/// +/// Each registration keeps track of which selector the I/O object is +/// associated with, ensuring that implementations of `Evented` can be +/// conformant for the various methods on Windows. +/// +/// If you're working with custom IOCP-enabled objects then you'll want to +/// ensure that one of these instances is stored in your object and used in the +/// implementation of `Evented`. +/// +/// For more information about how to use this see the `windows` module +/// documentation in this crate. +pub struct Binding { + selector: AtomicLazyCell<Arc<SelectorInner>>, +} + +impl Binding { + /// Creates a new blank binding ready to be inserted into an I/O + /// object. + /// + /// Won't actually do anything until associated with a `Poll` loop. + pub fn new() -> Binding { + Binding { selector: AtomicLazyCell::new() } + } + + /// Registers a new handle with the `Poll` specified, also assigning the + /// `token` specified. + /// + /// This function is intended to be used as part of `Evented::register` for + /// custom IOCP objects. It will add the specified handle to the internal + /// IOCP object with the provided `token`. All future events generated by + /// the handled provided will be received by the `Poll`'s internal IOCP + /// object. + /// + /// # Unsafety + /// + /// This function is unsafe as the `Poll` instance has assumptions about + /// what the `OVERLAPPED` pointer used for each I/O operation looks like. + /// Specifically they must all be instances of the `Overlapped` type in + /// this crate. More information about this can be found on the + /// `windows` module in this crate. + pub unsafe fn register_handle(&self, + handle: &AsRawHandle, + token: Token, + poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + + // Ignore errors, we'll see them on the next line. + drop(self.selector.fill(selector.inner.clone())); + self.check_same_selector(poll)?; + + selector.inner.port.add_handle(usize::from(token), handle) + } + + /// Same as `register_handle` but for sockets. + pub unsafe fn register_socket(&self, + handle: &AsRawSocket, + token: Token, + poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + drop(self.selector.fill(selector.inner.clone())); + self.check_same_selector(poll)?; + selector.inner.port.add_socket(usize::from(token), handle) + } + + /// Reregisters the handle provided from the `Poll` provided. + /// + /// This is intended to be used as part of `Evented::reregister` but note + /// that this function does not currently reregister the provided handle + /// with the `poll` specified. IOCP has a special binding for changing the + /// token which has not yet been implemented. Instead this function should + /// be used to assert that the call to `reregister` happened on the same + /// `Poll` that was passed into to `register`. + /// + /// Eventually, though, the provided `handle` will be re-assigned to have + /// the token `token` on the given `poll` assuming that it's been + /// previously registered with it. + /// + /// # Unsafety + /// + /// This function is unsafe for similar reasons to `register`. That is, + /// there may be pending I/O events and such which aren't handled correctly. + pub unsafe fn reregister_handle(&self, + _handle: &AsRawHandle, + _token: Token, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Same as `reregister_handle`, but for sockets. + pub unsafe fn reregister_socket(&self, + _socket: &AsRawSocket, + _token: Token, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Deregisters the handle provided from the `Poll` provided. + /// + /// This is intended to be used as part of `Evented::deregister` but note + /// that this function does not currently deregister the provided handle + /// from the `poll` specified. IOCP has a special binding for that which has + /// not yet been implemented. Instead this function should be used to assert + /// that the call to `deregister` happened on the same `Poll` that was + /// passed into to `register`. + /// + /// # Unsafety + /// + /// This function is unsafe for similar reasons to `register`. That is, + /// there may be pending I/O events and such which aren't handled correctly. + pub unsafe fn deregister_handle(&self, + _handle: &AsRawHandle, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + /// Same as `deregister_handle`, but for sockets. + pub unsafe fn deregister_socket(&self, + _socket: &AsRawSocket, + poll: &Poll) -> io::Result<()> { + self.check_same_selector(poll) + } + + fn check_same_selector(&self, poll: &Poll) -> io::Result<()> { + let selector = poll::selector(poll); + match self.selector.borrow() { + Some(prev) if prev.identical(&selector.inner) => Ok(()), + Some(_) | + None => Err(other("socket already registered")), + } + } +} + +impl fmt::Debug for Binding { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Binding") + .finish() + } +} + +/// Helper struct used for TCP and UDP which bundles a `binding` with a +/// `SetReadiness` handle. +pub struct ReadyBinding { + binding: Binding, + readiness: Option<poll::SetReadiness>, +} + +impl ReadyBinding { + /// Creates a new blank binding ready to be inserted into an I/O object. + /// + /// Won't actually do anything until associated with an `Selector` loop. + pub fn new() -> ReadyBinding { + ReadyBinding { + binding: Binding::new(), + readiness: None, + } + } + + /// Returns whether this binding has been associated with a selector + /// yet. + pub fn registered(&self) -> bool { + self.readiness.is_some() + } + + /// Acquires a buffer with at least `size` capacity. + /// + /// If associated with a selector, this will attempt to pull a buffer from + /// that buffer pool. If not associated with a selector, this will allocate + /// a fresh buffer. + pub fn get_buffer(&self, size: usize) -> Vec<u8> { + match self.binding.selector.borrow() { + Some(i) => i.buffers.lock().unwrap().get(size), + None => Vec::with_capacity(size), + } + } + + /// Returns a buffer to this binding. + /// + /// If associated with a selector, this will push the buffer back into the + /// selector's pool of buffers. Otherwise this will just drop the buffer. + pub fn put_buffer(&self, buf: Vec<u8>) { + if let Some(i) = self.binding.selector.borrow() { + i.buffers.lock().unwrap().put(buf); + } + } + + /// Sets the readiness of this I/O object to a particular `set`. + /// + /// This is later used to fill out and respond to requests to `poll`. Note + /// that this is all implemented through the `SetReadiness` structure in the + /// `poll` module. + pub fn set_readiness(&self, set: Ready) { + if let Some(ref i) = self.readiness { + trace!("set readiness to {:?}", set); + i.set_readiness(set).expect("event loop disappeared?"); + } + } + + /// Queries what the current readiness of this I/O object is. + /// + /// This is what's being used to generate events returned by `poll`. + pub fn readiness(&self) -> Ready { + match self.readiness { + Some(ref i) => i.readiness(), + None => Ready::empty(), + } + } + + /// Implementation of the `Evented::register` function essentially. + /// + /// Returns an error if we're already registered with another event loop, + /// and otherwise just reassociates ourselves with the event loop to + /// possible change tokens. + pub fn register_socket(&mut self, + socket: &AsRawSocket, + poll: &Poll, + token: Token, + events: Ready, + opts: PollOpt, + registration: &Mutex<Option<poll::Registration>>) + -> io::Result<()> { + trace!("register {:?} {:?}", token, events); + unsafe { + self.binding.register_socket(socket, token, poll)?; + } + + let (r, s) = poll::new_registration(poll, token, events, opts); + self.readiness = Some(s); + *registration.lock().unwrap() = Some(r); + Ok(()) + } + + /// Implementation of `Evented::reregister` function. + pub fn reregister_socket(&mut self, + socket: &AsRawSocket, + poll: &Poll, + token: Token, + events: Ready, + opts: PollOpt, + registration: &Mutex<Option<poll::Registration>>) + -> io::Result<()> { + trace!("reregister {:?} {:?}", token, events); + unsafe { + self.binding.reregister_socket(socket, token, poll)?; + } + + registration.lock().unwrap() + .as_mut().unwrap() + .reregister(poll, token, events, opts) + } + + /// Implementation of the `Evented::deregister` function. + /// + /// Doesn't allow registration with another event loop, just shuts down + /// readiness notifications and such. + pub fn deregister(&mut self, + socket: &AsRawSocket, + poll: &Poll, + registration: &Mutex<Option<poll::Registration>>) + -> io::Result<()> { + trace!("deregistering"); + unsafe { + self.binding.deregister_socket(socket, poll)?; + } + + registration.lock().unwrap() + .as_ref().unwrap() + .deregister(poll) + } +} + +fn other(s: &str) -> io::Error { + io::Error::new(io::ErrorKind::Other, s) +} + +#[derive(Debug)] +pub struct Events { + /// Raw I/O event completions are filled in here by the call to `get_many` + /// on the completion port above. These are then processed to run callbacks + /// which figure out what to do after the event is done. + statuses: Box<[CompletionStatus]>, + + /// Literal events returned by `get` to the upwards `EventLoop`. This file + /// doesn't really modify this (except for the awakener), instead almost all + /// events are filled in by the `ReadinessQueue` from the `poll` module. + events: Vec<Event>, +} + +impl Events { + pub fn with_capacity(cap: usize) -> Events { + // Note that it's possible for the output `events` to grow beyond the + // capacity as it can also include deferred events, but that's certainly + // not the end of the world! + Events { + statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(), + events: Vec::with_capacity(cap), + } + } + + pub fn is_empty(&self) -> bool { + self.events.is_empty() + } + + pub fn len(&self) -> usize { + self.events.len() + } + + pub fn capacity(&self) -> usize { + self.events.capacity() + } + + pub fn get(&self, idx: usize) -> Option<Event> { + self.events.get(idx).map(|e| *e) + } + + pub fn push_event(&mut self, event: Event) { + self.events.push(event); + } + + pub fn clear(&mut self) { + self.events.truncate(0); + } +} + +macro_rules! overlapped2arc { + ($e:expr, $t:ty, $($field:ident).+) => ( + #[allow(deref_nullptr)] + { + let offset = offset_of!($t, $($field).+); + debug_assert!(offset < mem::size_of::<$t>()); + FromRawArc::from_raw(($e as usize - offset) as *mut $t) + } + ) +} + +macro_rules! offset_of { + ($t:ty, $($field:ident).+) => ( + &(*(0 as *const $t)).$($field).+ as *const _ as usize + ) +} + +// See sys::windows module docs for why this exists. +// +// The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are +// actually inside one of these structures so it can use the `Callback` stored +// right after it. +// +// We use repr(C) here to ensure that we can assume the overlapped pointer is +// at the start of the structure so we can just do a cast. +/// A wrapper around an internal instance over `miow::Overlapped` which is in +/// turn a wrapper around the Windows type `OVERLAPPED`. +/// +/// This type is required to be used for all IOCP operations on handles that are +/// registered with an event loop. The event loop will receive notifications +/// over `OVERLAPPED` pointers that have completed, and it will cast that +/// pointer to a pointer to this structure and invoke the associated callback. +#[repr(C)] +pub struct Overlapped { + inner: UnsafeCell<miow::Overlapped>, + callback: fn(&OVERLAPPED_ENTRY), +} + +impl Overlapped { + /// Creates a new `Overlapped` which will invoke the provided `cb` callback + /// whenever it's triggered. + /// + /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all + /// I/O operations that are registered with mio's event loop. When the I/O + /// operation associated with an `OVERLAPPED` pointer completes the event + /// loop will invoke the function pointer provided by `cb`. + pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped { + Overlapped { + inner: UnsafeCell::new(miow::Overlapped::zero()), + callback: cb, + } + } + + /// Get the underlying `Overlapped` instance as a raw pointer. + /// + /// This can be useful when only a shared borrow is held and the overlapped + /// pointer needs to be passed down to winapi. + pub fn as_mut_ptr(&self) -> *mut OVERLAPPED { + unsafe { + (*self.inner.get()).raw() + } + } +} + +impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Overlapped") + .finish() + } +} + +// Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as +// unsafe as they must be used with caution to ensure thread safety. The +// structure itself is safe to send across threads. +unsafe impl Send for Overlapped {} +unsafe impl Sync for Overlapped {} diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs b/third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs new file mode 100644 index 0000000000..236e7866a6 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/tcp.rs @@ -0,0 +1,853 @@ +use std::fmt; +use std::io::{self, Read, ErrorKind}; +use std::mem; +use std::net::{self, SocketAddr, Shutdown}; +use std::os::windows::prelude::*; +use std::sync::{Mutex, MutexGuard}; +use std::time::Duration; + +use miow::iocp::CompletionStatus; +use miow::net::*; +use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt}; +use winapi::um::minwinbase::OVERLAPPED_ENTRY; +use winapi::um::winnt::HANDLE; +use iovec::IoVec; + +use {poll, Ready, Poll, PollOpt, Token}; +use event::Evented; +use sys::windows::from_raw_arc::FromRawArc; +use sys::windows::selector::{Overlapped, ReadyBinding}; +use sys::windows::Family; + +pub struct TcpStream { + /// Separately stored implementation to ensure that the `Drop` + /// implementation on this type is only executed when it's actually dropped + /// (many clones of this `imp` are made). + imp: StreamImp, + registration: Mutex<Option<poll::Registration>>, +} + +pub struct TcpListener { + imp: ListenerImp, + registration: Mutex<Option<poll::Registration>>, +} + +#[derive(Clone)] +struct StreamImp { + /// A stable address and synchronized access for all internals. This serves + /// to ensure that all `Overlapped` pointers are valid for a long period of + /// time as well as allowing completion callbacks to have access to the + /// internals without having ownership. + /// + /// Note that the reference count also allows us "loan out" copies to + /// completion ports while I/O is running to guarantee that this stays alive + /// until the I/O completes. You'll notice a number of calls to + /// `mem::forget` below, and these only happen on successful scheduling of + /// I/O and are paired with `overlapped2arc!` macro invocations in the + /// completion callbacks (to have a decrement match the increment). + inner: FromRawArc<StreamIo>, +} + +#[derive(Clone)] +struct ListenerImp { + inner: FromRawArc<ListenerIo>, +} + +struct StreamIo { + inner: Mutex<StreamInner>, + read: Overlapped, // also used for connect + write: Overlapped, + socket: net::TcpStream, +} + +struct ListenerIo { + inner: Mutex<ListenerInner>, + accept: Overlapped, + family: Family, + socket: net::TcpListener, +} + +struct StreamInner { + iocp: ReadyBinding, + deferred_connect: Option<SocketAddr>, + read: State<(), ()>, + write: State<(Vec<u8>, usize), (Vec<u8>, usize)>, + /// whether we are instantly notified of success + /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS, + /// without a roundtrip through the event loop) + instant_notify: bool, +} + +struct ListenerInner { + iocp: ReadyBinding, + accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>, + accept_buf: AcceptAddrsBuf, +} + +enum State<T, U> { + Empty, // no I/O operation in progress + Pending(T), // an I/O operation is in progress + Ready(U), // I/O has finished with this value + Error(io::Error), // there was an I/O error +} + +impl TcpStream { + fn new(socket: net::TcpStream, + deferred_connect: Option<SocketAddr>) -> TcpStream { + TcpStream { + registration: Mutex::new(None), + imp: StreamImp { + inner: FromRawArc::new(StreamIo { + read: Overlapped::new(read_done), + write: Overlapped::new(write_done), + socket: socket, + inner: Mutex::new(StreamInner { + iocp: ReadyBinding::new(), + deferred_connect: deferred_connect, + read: State::Empty, + write: State::Empty, + instant_notify: false, + }), + }), + }, + } + } + + pub fn connect(socket: net::TcpStream, addr: &SocketAddr) + -> io::Result<TcpStream> { + socket.set_nonblocking(true)?; + Ok(TcpStream::new(socket, Some(*addr))) + } + + pub fn from_stream(stream: net::TcpStream) -> TcpStream { + TcpStream::new(stream, None) + } + + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.imp.inner.socket.peer_addr() + } + + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.imp.inner.socket.local_addr() + } + + pub fn try_clone(&self) -> io::Result<TcpStream> { + self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None)) + } + + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.imp.inner.socket.shutdown(how) + } + + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.imp.inner.socket.set_nodelay(nodelay) + } + + pub fn nodelay(&self) -> io::Result<bool> { + self.imp.inner.socket.nodelay() + } + + pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { + self.imp.inner.socket.set_recv_buffer_size(size) + } + + pub fn recv_buffer_size(&self) -> io::Result<usize> { + self.imp.inner.socket.recv_buffer_size() + } + + pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { + self.imp.inner.socket.set_send_buffer_size(size) + } + + pub fn send_buffer_size(&self) -> io::Result<usize> { + self.imp.inner.socket.send_buffer_size() + } + + pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { + self.imp.inner.socket.set_keepalive(keepalive) + } + + pub fn keepalive(&self) -> io::Result<Option<Duration>> { + self.imp.inner.socket.keepalive() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.imp.inner.socket.set_ttl(ttl) + } + + pub fn ttl(&self) -> io::Result<u32> { + self.imp.inner.socket.ttl() + } + + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.imp.inner.socket.set_only_v6(only_v6) + } + + pub fn only_v6(&self) -> io::Result<bool> { + self.imp.inner.socket.only_v6() + } + + pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { + Net2TcpExt::set_linger(&self.imp.inner.socket, dur) + } + + pub fn linger(&self) -> io::Result<Option<Duration>> { + Net2TcpExt::linger(&self.imp.inner.socket) + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + if let Some(e) = self.imp.inner.socket.take_error()? { + return Ok(Some(e)) + } + + // If the syscall didn't return anything then also check to see if we've + // squirreled away an error elsewhere for example as part of a connect + // operation. + // + // Typically this is used like so: + // + // 1. A `connect` is issued + // 2. Wait for the socket to be writable + // 3. Call `take_error` to see if the connect succeeded. + // + // Right now the `connect` operation finishes in `read_done` below and + // fill will in `State::Error` in the `read` slot if it fails, so we + // extract that here. + let mut me = self.inner(); + match mem::replace(&mut me.read, State::Empty) { + State::Error(e) => { + self.imp.schedule_read(&mut me); + Ok(Some(e)) + } + other => { + me.read = other; + Ok(None) + } + } + } + + fn inner(&self) -> MutexGuard<StreamInner> { + self.imp.inner() + } + + fn before_read(&self) -> io::Result<MutexGuard<StreamInner>> { + let mut me = self.inner(); + + match me.read { + // Empty == we're not associated yet, and if we're pending then + // these are both cases where we return "would block" + State::Empty | + State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()), + + // If we got a delayed error as part of a `read_overlapped` below, + // return that here. Also schedule another read in case it was + // transient. + State::Error(_) => { + let e = match mem::replace(&mut me.read, State::Empty) { + State::Error(e) => e, + _ => panic!(), + }; + self.imp.schedule_read(&mut me); + return Err(e) + } + + // If we're ready for a read then some previous 0-byte read has + // completed. In that case the OS's socket buffer has something for + // us, so we just keep pulling out bytes while we can in the loop + // below. + State::Ready(()) => {} + } + + Ok(me) + } + + fn post_register(&self, interest: Ready, me: &mut StreamInner) { + if interest.is_readable() { + self.imp.schedule_read(me); + } + + // At least with epoll, if a socket is registered with an interest in + // writing and it's immediately writable then a writable event is + // generated immediately, so do so here. + if interest.is_writable() { + if let State::Empty = me.write { + self.imp.add_readiness(me, Ready::writable()); + } + } + } + + pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { + match IoVec::from_bytes_mut(buf) { + Some(vec) => self.readv(&mut [vec]), + None => Ok(0), + } + } + + pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + let mut me = self.before_read()?; + + match (&self.imp.inner.socket).peek(buf) { + Ok(n) => Ok(n), + Err(e) => { + me.read = State::Empty; + self.imp.schedule_read(&mut me); + Err(e) + } + } + } + + pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> { + let mut me = self.before_read()?; + + // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to + // call that instead of looping over all the buffers and calling + // `recv` on each buffer. I'm not sure though if an overlapped + // socket in nonblocking mode would work with that use case, + // however, so for now we just call `recv`. + + let mut amt = 0; + for buf in bufs { + match (&self.imp.inner.socket).read(buf) { + // If we did a partial read, then return what we've read so far + Ok(n) if n < buf.len() => return Ok(amt + n), + + // Otherwise filled this buffer entirely, so try to fill the + // next one as well. + Ok(n) => amt += n, + + // If we hit an error then things get tricky if we've already + // read some data. If the error is "would block" then we just + // return the data we've read so far while scheduling another + // 0-byte read. + // + // If we've read data and the error kind is not "would block", + // then we stash away the error to get returned later and return + // the data that we've read. + // + // Finally if we haven't actually read any data we just + // reschedule a 0-byte read to happen again and then return the + // error upwards. + Err(e) => { + if amt > 0 && e.kind() == io::ErrorKind::WouldBlock { + me.read = State::Empty; + self.imp.schedule_read(&mut me); + return Ok(amt) + } else if amt > 0 { + me.read = State::Error(e); + return Ok(amt) + } else { + me.read = State::Empty; + self.imp.schedule_read(&mut me); + return Err(e) + } + } + } + } + + Ok(amt) + } + + pub fn write(&self, buf: &[u8]) -> io::Result<usize> { + match IoVec::from_bytes(buf) { + Some(vec) => self.writev(&[vec]), + None => Ok(0), + } + } + + pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> { + let mut me = self.inner(); + let me = &mut *me; + + match mem::replace(&mut me.write, State::Empty) { + State::Empty => {} + State::Error(e) => return Err(e), + other => { + me.write = other; + return Err(io::ErrorKind::WouldBlock.into()) + } + } + + if !me.iocp.registered() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + if bufs.is_empty() { + return Ok(0) + } + + let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b); + let mut intermediate = me.iocp.get_buffer(len); + for buf in bufs { + intermediate.extend_from_slice(buf); + } + self.imp.schedule_write(intermediate, 0, me); + Ok(len) + } + + pub fn flush(&self) -> io::Result<()> { + Ok(()) + } +} + +impl StreamImp { + fn inner(&self) -> MutexGuard<StreamInner> { + self.inner.inner.lock().unwrap() + } + + fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> { + unsafe { + trace!("scheduling a connect"); + self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?; + } + // see docs above on StreamImp.inner for rationale on forget + mem::forget(self.clone()); + Ok(()) + } + + /// Schedule a read to happen on this socket, enqueuing us to receive a + /// notification when a read is ready. + /// + /// Note that this does *not* work with a buffer. When reading a TCP stream + /// we actually read into a 0-byte buffer so Windows will send us a + /// notification when the socket is otherwise ready for reading. This allows + /// us to avoid buffer allocations for in-flight reads. + fn schedule_read(&self, me: &mut StreamInner) { + match me.read { + State::Empty => {} + State::Ready(_) | State::Error(_) => { + self.add_readiness(me, Ready::readable()); + return; + } + _ => return, + } + + me.iocp.set_readiness(me.iocp.readiness() - Ready::readable()); + + trace!("scheduling a read"); + let res = unsafe { + self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr()) + }; + match res { + // Note that `Ok(true)` means that this completed immediately and + // our socket is readable. This typically means that the caller of + // this function (likely `read` above) can try again as an + // optimization and return bytes quickly. + // + // Normally, though, although the read completed immediately + // there's still an IOCP completion packet enqueued that we're going + // to receive. + // + // You can configure this behavior (miow) with + // SetFileCompletionNotificationModes to indicate that `Ok(true)` + // does **not** enqueue a completion packet. (This is the case + // for me.instant_notify) + // + // Note that apparently libuv has scary code to work around bugs in + // `WSARecv` for UDP sockets apparently for handles which have had + // the `SetFileCompletionNotificationModes` function called on them, + // worth looking into! + Ok(Some(_)) if me.instant_notify => { + me.read = State::Ready(()); + self.add_readiness(me, Ready::readable()); + } + Ok(_) => { + // see docs above on StreamImp.inner for rationale on forget + me.read = State::Pending(()); + mem::forget(self.clone()); + } + Err(e) => { + me.read = State::Error(e); + self.add_readiness(me, Ready::readable()); + } + } + } + + /// Similar to `schedule_read`, except that this issues, well, writes. + /// + /// This function will continually attempt to write the entire contents of + /// the buffer `buf` until they have all been written. The `pos` argument is + /// the current offset within the buffer up to which the contents have + /// already been written. + /// + /// A new writable event (e.g. allowing another write) will only happen once + /// the buffer has been written completely (or hit an error). + fn schedule_write(&self, + buf: Vec<u8>, + mut pos: usize, + me: &mut StreamInner) { + + // About to write, clear any pending level triggered events + me.iocp.set_readiness(me.iocp.readiness() - Ready::writable()); + + loop { + trace!("scheduling a write of {} bytes", buf[pos..].len()); + let ret = unsafe { + self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr()) + }; + match ret { + Ok(Some(transferred_bytes)) if me.instant_notify => { + trace!("done immediately with {} bytes", transferred_bytes); + if transferred_bytes == buf.len() - pos { + self.add_readiness(me, Ready::writable()); + me.write = State::Empty; + break; + } + pos += transferred_bytes; + } + Ok(_) => { + trace!("scheduled for later"); + // see docs above on StreamImp.inner for rationale on forget + me.write = State::Pending((buf, pos)); + mem::forget(self.clone()); + break; + } + Err(e) => { + trace!("write error: {}", e); + me.write = State::Error(e); + self.add_readiness(me, Ready::writable()); + me.iocp.put_buffer(buf); + break; + } + } + } + } + + /// Pushes an event for this socket onto the selector its registered for. + /// + /// When an event is generated on this socket, if it happened after the + /// socket was closed then we don't want to actually push the event onto our + /// selector as otherwise it's just a spurious notification. + fn add_readiness(&self, me: &mut StreamInner, set: Ready) { + me.iocp.set_readiness(set | me.iocp.readiness()); + } +} + +fn read_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + let me2 = StreamImp { + inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) }, + }; + + let mut me = me2.inner(); + match mem::replace(&mut me.read, State::Empty) { + State::Pending(()) => { + trace!("finished a read: {}", status.bytes_transferred()); + assert_eq!(status.bytes_transferred(), 0); + me.read = State::Ready(()); + return me2.add_readiness(&mut me, Ready::readable()) + } + s => me.read = s, + } + + // If a read didn't complete, then the connect must have just finished. + trace!("finished a connect"); + + // By guarding with socket.result(), we ensure that a connection + // was successfully made before performing operations requiring a + // connected socket. + match unsafe { me2.inner.socket.result(status.overlapped()) } + .and_then(|_| me2.inner.socket.connect_complete()) + { + Ok(()) => { + me2.add_readiness(&mut me, Ready::writable()); + me2.schedule_read(&mut me); + } + Err(e) => { + me2.add_readiness(&mut me, Ready::readable() | Ready::writable()); + me.read = State::Error(e); + } + } +} + +fn write_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + trace!("finished a write {}", status.bytes_transferred()); + let me2 = StreamImp { + inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) }, + }; + let mut me = me2.inner(); + let (buf, pos) = match mem::replace(&mut me.write, State::Empty) { + State::Pending(pair) => pair, + _ => unreachable!(), + }; + let new_pos = pos + (status.bytes_transferred() as usize); + if new_pos == buf.len() { + me2.add_readiness(&mut me, Ready::writable()); + } else { + me2.schedule_write(buf, new_pos, &mut me); + } +} + +impl Evented for TcpStream { + fn register(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + let mut me = self.inner(); + me.iocp.register_socket(&self.imp.inner.socket, poll, token, + interest, opts, &self.registration)?; + + unsafe { + super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?; + me.instant_notify = true; + } + + // If we were connected before being registered process that request + // here and go along our merry ways. Note that the callback for a + // successful connect will worry about generating writable/readable + // events and scheduling a new read. + if let Some(addr) = me.deferred_connect.take() { + return self.imp.schedule_connect(&addr).map(|_| ()) + } + self.post_register(interest, &mut me); + Ok(()) + } + + fn reregister(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + let mut me = self.inner(); + me.iocp.reregister_socket(&self.imp.inner.socket, poll, token, + interest, opts, &self.registration)?; + self.post_register(interest, &mut me); + Ok(()) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("TcpStream") + .finish() + } +} + +impl Drop for TcpStream { + fn drop(&mut self) { + // If we're still internally reading, we're no longer interested. Note + // though that we don't cancel any writes which may have been issued to + // preserve the same semantics as Unix. + // + // Note that "Empty" here may mean that a connect is pending, so we + // cancel even if that happens as well. + unsafe { + match self.inner().read { + State::Pending(_) | State::Empty => { + trace!("cancelling active TCP read"); + drop(super::cancel(&self.imp.inner.socket, + &self.imp.inner.read)); + } + State::Ready(_) | State::Error(_) => {} + } + } + } +} + +impl TcpListener { + pub fn new(socket: net::TcpListener) + -> io::Result<TcpListener> { + let addr = socket.local_addr()?; + Ok(TcpListener::new_family(socket, match addr { + SocketAddr::V4(..) => Family::V4, + SocketAddr::V6(..) => Family::V6, + })) + } + + fn new_family(socket: net::TcpListener, family: Family) -> TcpListener { + TcpListener { + registration: Mutex::new(None), + imp: ListenerImp { + inner: FromRawArc::new(ListenerIo { + accept: Overlapped::new(accept_done), + family: family, + socket: socket, + inner: Mutex::new(ListenerInner { + iocp: ReadyBinding::new(), + accept: State::Empty, + accept_buf: AcceptAddrsBuf::new(), + }), + }), + }, + } + } + + pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> { + let mut me = self.inner(); + + let ret = match mem::replace(&mut me.accept, State::Empty) { + State::Empty => return Err(io::ErrorKind::WouldBlock.into()), + State::Pending(t) => { + me.accept = State::Pending(t); + return Err(io::ErrorKind::WouldBlock.into()); + } + State::Ready((s, a)) => Ok((s, a)), + State::Error(e) => Err(e), + }; + + self.imp.schedule_accept(&mut me); + + return ret + } + + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.imp.inner.socket.local_addr() + } + + pub fn try_clone(&self) -> io::Result<TcpListener> { + self.imp.inner.socket.try_clone().map(|s| { + TcpListener::new_family(s, self.imp.inner.family) + }) + } + + #[allow(deprecated)] + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.imp.inner.socket.set_only_v6(only_v6) + } + + #[allow(deprecated)] + pub fn only_v6(&self) -> io::Result<bool> { + self.imp.inner.socket.only_v6() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.imp.inner.socket.set_ttl(ttl) + } + + pub fn ttl(&self) -> io::Result<u32> { + self.imp.inner.socket.ttl() + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.imp.inner.socket.take_error() + } + + fn inner(&self) -> MutexGuard<ListenerInner> { + self.imp.inner() + } +} + +impl ListenerImp { + fn inner(&self) -> MutexGuard<ListenerInner> { + self.inner.inner.lock().unwrap() + } + + fn schedule_accept(&self, me: &mut ListenerInner) { + match me.accept { + State::Empty => {} + _ => return + } + + me.iocp.set_readiness(me.iocp.readiness() - Ready::readable()); + + let res = match self.inner.family { + Family::V4 => TcpBuilder::new_v4(), + Family::V6 => TcpBuilder::new_v6(), + } + .and_then(|builder| builder.to_tcp_stream()) + .and_then(|stream| unsafe { + trace!("scheduling an accept"); + self.inner + .socket + .accept_overlapped(&stream, &mut me.accept_buf, self.inner.accept.as_mut_ptr()) + .map(|x| (stream, x)) + }); + match res { + Ok((socket, _)) => { + // see docs above on StreamImp.inner for rationale on forget + me.accept = State::Pending(socket); + mem::forget(self.clone()); + } + Err(e) => { + me.accept = State::Error(e); + self.add_readiness(me, Ready::readable()); + } + } + } + + // See comments in StreamImp::push + fn add_readiness(&self, me: &mut ListenerInner, set: Ready) { + me.iocp.set_readiness(set | me.iocp.readiness()); + } +} + +fn accept_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + let me2 = ListenerImp { + inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) }, + }; + + let mut me = me2.inner(); + let socket = match mem::replace(&mut me.accept, State::Empty) { + State::Pending(s) => s, + _ => unreachable!(), + }; + trace!("finished an accept"); + let result = me2.inner.socket.accept_complete(&socket).and_then(|()| { + me.accept_buf.parse(&me2.inner.socket) + }).and_then(|buf| { + buf.remote().ok_or_else(|| { + io::Error::new(ErrorKind::Other, "could not obtain remote address") + }) + }); + me.accept = match result { + Ok(remote_addr) => State::Ready((socket, remote_addr)), + Err(e) => State::Error(e), + }; + me2.add_readiness(&mut me, Ready::readable()); +} + +impl Evented for TcpListener { + fn register(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + let mut me = self.inner(); + me.iocp.register_socket(&self.imp.inner.socket, poll, token, + interest, opts, &self.registration)?; + + unsafe { + super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?; + } + + self.imp.schedule_accept(&mut me); + Ok(()) + } + + fn reregister(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + let mut me = self.inner(); + me.iocp.reregister_socket(&self.imp.inner.socket, poll, token, + interest, opts, &self.registration)?; + self.imp.schedule_accept(&mut me); + Ok(()) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) + } +} + +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("TcpListener") + .finish() + } +} + +impl Drop for TcpListener { + fn drop(&mut self) { + // If we're still internally reading, we're no longer interested. + unsafe { + match self.inner().accept { + State::Pending(_) => { + trace!("cancelling active TCP accept"); + drop(super::cancel(&self.imp.inner.socket, + &self.imp.inner.accept)); + } + State::Empty | + State::Ready(_) | + State::Error(_) => {} + } + } + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/windows/udp.rs b/third_party/rust/mio-0.6.23/src/sys/windows/udp.rs new file mode 100644 index 0000000000..f5ea96c324 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/windows/udp.rs @@ -0,0 +1,414 @@ +//! UDP for IOCP +//! +//! Note that most of this module is quite similar to the TCP module, so if +//! something seems odd you may also want to try the docs over there. + +use std::fmt; +use std::io::prelude::*; +use std::io; +use std::mem; +use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::{Mutex, MutexGuard}; + +#[allow(unused_imports)] +use net2::{UdpBuilder, UdpSocketExt}; +use winapi::shared::winerror::WSAEMSGSIZE; +use winapi::um::minwinbase::OVERLAPPED_ENTRY; +use miow::iocp::CompletionStatus; +use miow::net::SocketAddrBuf; +use miow::net::UdpSocketExt as MiowUdpSocketExt; + +use {poll, Ready, Poll, PollOpt, Token}; +use event::Evented; +use sys::windows::from_raw_arc::FromRawArc; +use sys::windows::selector::{Overlapped, ReadyBinding}; + +pub struct UdpSocket { + imp: Imp, + registration: Mutex<Option<poll::Registration>>, +} + +#[derive(Clone)] +struct Imp { + inner: FromRawArc<Io>, +} + +struct Io { + read: Overlapped, + write: Overlapped, + socket: net::UdpSocket, + inner: Mutex<Inner>, +} + +struct Inner { + iocp: ReadyBinding, + read: State<Vec<u8>, Vec<u8>>, + write: State<Vec<u8>, (Vec<u8>, usize)>, + read_buf: SocketAddrBuf, +} + +enum State<T, U> { + Empty, + Pending(T), + Ready(U), + Error(io::Error), +} + +impl UdpSocket { + pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> { + Ok(UdpSocket { + registration: Mutex::new(None), + imp: Imp { + inner: FromRawArc::new(Io { + read: Overlapped::new(recv_done), + write: Overlapped::new(send_done), + socket: socket, + inner: Mutex::new(Inner { + iocp: ReadyBinding::new(), + read: State::Empty, + write: State::Empty, + read_buf: SocketAddrBuf::new(), + }), + }), + }, + }) + } + + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.imp.inner.socket.local_addr() + } + + pub fn try_clone(&self) -> io::Result<UdpSocket> { + self.imp.inner.socket.try_clone().and_then(UdpSocket::new) + } + + /// Note that unlike `TcpStream::write` this function will not attempt to + /// continue writing `buf` until its entirely written. + /// + /// TODO: This... may be wrong in the long run. We're reporting that we + /// successfully wrote all of the bytes in `buf` but it's possible + /// that we don't actually end up writing all of them! + pub fn send_to(&self, buf: &[u8], target: &SocketAddr) + -> io::Result<usize> { + let mut me = self.inner(); + let me = &mut *me; + + match me.write { + State::Empty => {} + _ => return Err(io::ErrorKind::WouldBlock.into()), + } + + if !me.iocp.registered() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let interest = me.iocp.readiness(); + me.iocp.set_readiness(interest - Ready::writable()); + + let mut owned_buf = me.iocp.get_buffer(64 * 1024); + let amt = owned_buf.write(buf)?; + unsafe { + trace!("scheduling a send"); + self.imp.inner.socket.send_to_overlapped(&owned_buf, target, + self.imp.inner.write.as_mut_ptr()) + }?; + me.write = State::Pending(owned_buf); + mem::forget(self.imp.clone()); + Ok(amt) + } + + /// Note that unlike `TcpStream::write` this function will not attempt to + /// continue writing `buf` until its entirely written. + /// + /// TODO: This... may be wrong in the long run. We're reporting that we + /// successfully wrote all of the bytes in `buf` but it's possible + /// that we don't actually end up writing all of them! + pub fn send(&self, buf: &[u8]) -> io::Result<usize> { + let mut me = self.inner(); + let me = &mut *me; + + match me.write { + State::Empty => {} + _ => return Err(io::ErrorKind::WouldBlock.into()), + } + + if !me.iocp.registered() { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let interest = me.iocp.readiness(); + me.iocp.set_readiness(interest - Ready::writable()); + + let mut owned_buf = me.iocp.get_buffer(64 * 1024); + let amt = owned_buf.write(buf)?; + unsafe { + trace!("scheduling a send"); + self.imp.inner.socket.send_overlapped(&owned_buf, self.imp.inner.write.as_mut_ptr()) + + }?; + me.write = State::Pending(owned_buf); + mem::forget(self.imp.clone()); + Ok(amt) + } + + pub fn recv_from(&self, mut buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + let mut me = self.inner(); + match mem::replace(&mut me.read, State::Empty) { + State::Empty => Err(io::ErrorKind::WouldBlock.into()), + State::Pending(b) => { me.read = State::Pending(b); Err(io::ErrorKind::WouldBlock.into()) } + State::Ready(data) => { + // If we weren't provided enough space to receive the message + // then don't actually read any data, just return an error. + if buf.len() < data.len() { + me.read = State::Ready(data); + Err(io::Error::from_raw_os_error(WSAEMSGSIZE as i32)) + } else { + let r = if let Some(addr) = me.read_buf.to_socket_addr() { + buf.write(&data).unwrap(); + Ok((data.len(), addr)) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to parse socket address")) + }; + me.iocp.put_buffer(data); + self.imp.schedule_read_from(&mut me); + r + } + } + State::Error(e) => { + self.imp.schedule_read_from(&mut me); + Err(e) + } + } + } + + pub fn recv(&self, buf: &mut [u8]) + -> io::Result<usize> { + //Since recv_from can be used on connected sockets just call it and drop the address. + self.recv_from(buf).map(|(size,_)| size) + } + + pub fn connect(&self, addr: SocketAddr) -> io::Result<()> { + self.imp.inner.socket.connect(addr) + } + + pub fn broadcast(&self) -> io::Result<bool> { + self.imp.inner.socket.broadcast() + } + + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.imp.inner.socket.set_broadcast(on) + } + + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + self.imp.inner.socket.multicast_loop_v4() + } + + pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.imp.inner.socket.set_multicast_loop_v4(on) + } + + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + self.imp.inner.socket.multicast_ttl_v4() + } + + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.imp.inner.socket.set_multicast_ttl_v4(ttl) + } + + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + self.imp.inner.socket.multicast_loop_v6() + } + + pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.imp.inner.socket.set_multicast_loop_v6(on) + } + + pub fn ttl(&self) -> io::Result<u32> { + self.imp.inner.socket.ttl() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.imp.inner.socket.set_ttl(ttl) + } + + pub fn join_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.imp.inner.socket.join_multicast_v4(multiaddr, interface) + } + + pub fn join_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.imp.inner.socket.join_multicast_v6(multiaddr, interface) + } + + pub fn leave_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.imp.inner.socket.leave_multicast_v4(multiaddr, interface) + } + + pub fn leave_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.imp.inner.socket.leave_multicast_v6(multiaddr, interface) + } + + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.imp.inner.socket.set_only_v6(only_v6) + } + + pub fn only_v6(&self) -> io::Result<bool> { + self.imp.inner.socket.only_v6() + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.imp.inner.socket.take_error() + } + + fn inner(&self) -> MutexGuard<Inner> { + self.imp.inner() + } + + fn post_register(&self, interest: Ready, me: &mut Inner) { + if interest.is_readable() { + //We use recv_from here since it is well specified for both + //connected and non-connected sockets and we can discard the address + //when calling recv(). + self.imp.schedule_read_from(me); + } + // See comments in TcpSocket::post_register for what's going on here + if interest.is_writable() { + if let State::Empty = me.write { + self.imp.add_readiness(me, Ready::writable()); + } + } + } +} + +impl Imp { + fn inner(&self) -> MutexGuard<Inner> { + self.inner.inner.lock().unwrap() + } + + fn schedule_read_from(&self, me: &mut Inner) { + match me.read { + State::Empty => {} + _ => return, + } + + let interest = me.iocp.readiness(); + me.iocp.set_readiness(interest - Ready::readable()); + + let mut buf = me.iocp.get_buffer(64 * 1024); + let res = unsafe { + trace!("scheduling a read"); + let cap = buf.capacity(); + buf.set_len(cap); + self.inner.socket.recv_from_overlapped(&mut buf, &mut me.read_buf, + self.inner.read.as_mut_ptr()) + }; + match res { + Ok(_) => { + me.read = State::Pending(buf); + mem::forget(self.clone()); + } + Err(e) => { + me.read = State::Error(e); + self.add_readiness(me, Ready::readable()); + me.iocp.put_buffer(buf); + } + } + } + + // See comments in tcp::StreamImp::push + fn add_readiness(&self, me: &Inner, set: Ready) { + me.iocp.set_readiness(set | me.iocp.readiness()); + } +} + +impl Evented for UdpSocket { + fn register(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + let mut me = self.inner(); + me.iocp.register_socket(&self.imp.inner.socket, + poll, token, interest, opts, + &self.registration)?; + self.post_register(interest, &mut me); + Ok(()) + } + + fn reregister(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + let mut me = self.inner(); + me.iocp.reregister_socket(&self.imp.inner.socket, + poll, token, interest, + opts, &self.registration)?; + self.post_register(interest, &mut me); + Ok(()) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.inner().iocp.deregister(&self.imp.inner.socket, + poll, &self.registration) + } +} + +impl fmt::Debug for UdpSocket { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("UdpSocket") + .finish() + } +} + +impl Drop for UdpSocket { + fn drop(&mut self) { + let inner = self.inner(); + + // If we're still internally reading, we're no longer interested. Note + // though that we don't cancel any writes which may have been issued to + // preserve the same semantics as Unix. + unsafe { + match inner.read { + State::Pending(_) => { + drop(super::cancel(&self.imp.inner.socket, + &self.imp.inner.read)); + } + State::Empty | + State::Ready(_) | + State::Error(_) => {} + } + } + } +} + +fn send_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + trace!("finished a send {}", status.bytes_transferred()); + let me2 = Imp { + inner: unsafe { overlapped2arc!(status.overlapped(), Io, write) }, + }; + let mut me = me2.inner(); + me.write = State::Empty; + me2.add_readiness(&mut me, Ready::writable()); +} + +fn recv_done(status: &OVERLAPPED_ENTRY) { + let status = CompletionStatus::from_entry(status); + trace!("finished a recv {}", status.bytes_transferred()); + let me2 = Imp { + inner: unsafe { overlapped2arc!(status.overlapped(), Io, read) }, + }; + let mut me = me2.inner(); + let mut buf = match mem::replace(&mut me.read, State::Empty) { + State::Pending(buf) => buf, + _ => unreachable!(), + }; + unsafe { + buf.set_len(status.bytes_transferred() as usize); + } + me.read = State::Ready(buf); + me2.add_readiness(&mut me, Ready::readable()); +} |