summaryrefslogtreecommitdiffstats
path: root/third_party/rust/miow/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/miow/src
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/miow/src')
-rw-r--r--third_party/rust/miow/src/handle.rs164
-rw-r--r--third_party/rust/miow/src/iocp.rs324
-rw-r--r--third_party/rust/miow/src/lib.rs57
-rw-r--r--third_party/rust/miow/src/net.rs1140
-rw-r--r--third_party/rust/miow/src/overlapped.rs95
-rw-r--r--third_party/rust/miow/src/pipe.rs716
6 files changed, 2496 insertions, 0 deletions
diff --git a/third_party/rust/miow/src/handle.rs b/third_party/rust/miow/src/handle.rs
new file mode 100644
index 0000000000..809c1c6780
--- /dev/null
+++ b/third_party/rust/miow/src/handle.rs
@@ -0,0 +1,164 @@
+use std::io;
+use std::cmp;
+use std::ptr;
+
+use winapi::shared::minwindef::*;
+use winapi::shared::ntdef::{
+ BOOLEAN,
+ FALSE,
+ HANDLE,
+ TRUE,
+};
+use winapi::shared::winerror::*;
+use winapi::um::fileapi::*;
+use winapi::um::handleapi::*;
+use winapi::um::ioapiset::*;
+use winapi::um::minwinbase::*;
+
+#[derive(Debug)]
+pub struct Handle(HANDLE);
+
+unsafe impl Send for Handle {}
+unsafe impl Sync for Handle {}
+
+impl Handle {
+ pub fn new(handle: HANDLE) -> Handle {
+ Handle(handle)
+ }
+
+ pub fn raw(&self) -> HANDLE { self.0 }
+
+ pub fn into_raw(self) -> HANDLE {
+ use std::mem;
+
+ let ret = self.0;
+ mem::forget(self);
+ ret
+ }
+
+ pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
+ let mut bytes = 0;
+ let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD;
+ try!(::cvt(unsafe {
+ WriteFile(self.0, buf.as_ptr() as *const _, len, &mut bytes,
+ 0 as *mut _)
+ }));
+ Ok(bytes as usize)
+ }
+
+ pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ let mut bytes = 0;
+ let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD;
+ try!(::cvt(unsafe {
+ ReadFile(self.0, buf.as_mut_ptr() as *mut _, len, &mut bytes,
+ 0 as *mut _)
+ }));
+ Ok(bytes as usize)
+ }
+
+ pub unsafe fn read_overlapped(&self, buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ self.read_overlapped_helper(buf, overlapped, FALSE)
+ }
+
+ pub unsafe fn read_overlapped_wait(&self, buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<usize> {
+ match self.read_overlapped_helper(buf, overlapped, TRUE) {
+ Ok(Some(bytes)) => Ok(bytes),
+ Ok(None) => panic!("logic error"),
+ Err(e) => Err(e),
+ }
+ }
+
+ pub unsafe fn read_overlapped_helper(&self, buf: &mut [u8],
+ overlapped: *mut OVERLAPPED,
+ wait: BOOLEAN)
+ -> io::Result<Option<usize>> {
+ let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD;
+ let res = ::cvt({
+ ReadFile(self.0,
+ buf.as_mut_ptr() as *mut _,
+ len,
+ ptr::null_mut(),
+ overlapped)
+ });
+ match res {
+ Ok(_) => (),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
+ => (),
+ Err(e) => return Err(e),
+ }
+
+ let mut bytes = 0;
+ let res = ::cvt({
+ GetOverlappedResult(self.0,
+ overlapped,
+ &mut bytes,
+ wait as BOOL)
+ });
+ match res {
+ Ok(_) => Ok(Some(bytes as usize)),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE
+ => Ok(None),
+ Err(e) => Err(e),
+ }
+ }
+
+ pub unsafe fn write_overlapped(&self, buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ self.write_overlapped_helper(buf, overlapped, FALSE)
+ }
+
+ pub unsafe fn write_overlapped_wait(&self, buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<usize> {
+ match self.write_overlapped_helper(buf, overlapped, TRUE) {
+ Ok(Some(bytes)) => Ok(bytes),
+ Ok(None) => panic!("logic error"),
+ Err(e) => Err(e),
+ }
+ }
+
+ unsafe fn write_overlapped_helper(&self, buf: &[u8],
+ overlapped: *mut OVERLAPPED,
+ wait: BOOLEAN)
+ -> io::Result<Option<usize>> {
+ let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD;
+ let res = ::cvt({
+ WriteFile(self.0,
+ buf.as_ptr() as *const _,
+ len,
+ ptr::null_mut(),
+ overlapped)
+ });
+ match res {
+ Ok(_) => (),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
+ => (),
+ Err(e) => return Err(e),
+ }
+
+ let mut bytes = 0;
+ let res = ::cvt({
+ GetOverlappedResult(self.0,
+ overlapped,
+ &mut bytes,
+ wait as BOOL)
+ });
+ match res {
+ Ok(_) => Ok(Some(bytes as usize)),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE
+ => Ok(None),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ unsafe { CloseHandle(self.0) };
+ }
+}
diff --git a/third_party/rust/miow/src/iocp.rs b/third_party/rust/miow/src/iocp.rs
new file mode 100644
index 0000000000..1404be9aab
--- /dev/null
+++ b/third_party/rust/miow/src/iocp.rs
@@ -0,0 +1,324 @@
+//! Bindings to IOCP, I/O Completion Ports
+
+use std::cmp;
+use std::fmt;
+use std::io;
+use std::mem;
+use std::os::windows::io::*;
+use std::time::Duration;
+
+use handle::Handle;
+use winapi::shared::basetsd::*;
+use winapi::shared::ntdef::*;
+use winapi::um::minwinbase::*;
+use winapi::um::handleapi::*;
+use winapi::um::ioapiset::*;
+use Overlapped;
+
+/// A handle to an Windows I/O Completion Port.
+#[derive(Debug)]
+pub struct CompletionPort {
+ handle: Handle,
+}
+
+/// A status message received from an I/O completion port.
+///
+/// These statuses can be created via the `new` or `empty` constructors and then
+/// provided to a completion port, or they are read out of a completion port.
+/// The fields of each status are read through its accessor methods.
+#[derive(Clone, Copy)]
+pub struct CompletionStatus(OVERLAPPED_ENTRY);
+
+impl fmt::Debug for CompletionStatus {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
+ }
+}
+
+unsafe impl Send for CompletionStatus {}
+unsafe impl Sync for CompletionStatus {}
+
+impl CompletionPort {
+ /// Creates a new I/O completion port with the specified concurrency value.
+ ///
+ /// The number of threads given corresponds to the level of concurrency
+ /// allowed for threads associated with this port. Consult the Windows
+ /// documentation for more information about this value.
+ pub fn new(threads: u32) -> io::Result<CompletionPort> {
+ let ret = unsafe {
+ CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _,
+ 0, threads)
+ };
+ if ret.is_null() {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(CompletionPort { handle: Handle::new(ret) })
+ }
+ }
+
+ /// Associates a new `HANDLE` to this I/O completion port.
+ ///
+ /// This function will associate the given handle to this port with the
+ /// given `token` to be returned in status messages whenever it receives a
+ /// notification.
+ ///
+ /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
+ /// trait can be provided to this function, such as `std::fs::File` and
+ /// friends.
+ pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize,
+ t: &T) -> io::Result<()> {
+ self._add(token, t.as_raw_handle())
+ }
+
+ /// Associates a new `SOCKET` to this I/O completion port.
+ ///
+ /// This function will associate the given socket to this port with the
+ /// given `token` to be returned in status messages whenever it receives a
+ /// notification.
+ ///
+ /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
+ /// trait can be provided to this function, such as `std::net::TcpStream`
+ /// and friends.
+ pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize,
+ t: &T) -> io::Result<()> {
+ self._add(token, t.as_raw_socket() as HANDLE)
+ }
+
+ fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
+ assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
+ let ret = unsafe {
+ CreateIoCompletionPort(handle, self.handle.raw(),
+ token as ULONG_PTR, 0)
+ };
+ if ret.is_null() {
+ Err(io::Error::last_os_error())
+ } else {
+ debug_assert_eq!(ret, self.handle.raw());
+ Ok(())
+ }
+ }
+
+ /// Dequeue a completion status from this I/O completion port.
+ ///
+ /// This function will associate the calling thread with this completion
+ /// port and then wait for a status message to become available. The precise
+ /// semantics on when this function returns depends on the concurrency value
+ /// specified when the port was created.
+ ///
+ /// A timeout can optionally be specified to this function. If `None` is
+ /// provided this function will not time out, and otherwise it will time out
+ /// after the specified duration has passed.
+ ///
+ /// On success this will return the status message which was dequeued from
+ /// this completion port.
+ pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
+ let mut bytes = 0;
+ let mut token = 0;
+ let mut overlapped = 0 as *mut _;
+ let timeout = ::dur2ms(timeout);
+ let ret = unsafe {
+ GetQueuedCompletionStatus(self.handle.raw(),
+ &mut bytes,
+ &mut token,
+ &mut overlapped,
+ timeout)
+ };
+ ::cvt(ret).map(|_| {
+ CompletionStatus(OVERLAPPED_ENTRY {
+ dwNumberOfBytesTransferred: bytes,
+ lpCompletionKey: token,
+ lpOverlapped: overlapped,
+ Internal: 0,
+ })
+ })
+ }
+
+ /// Dequeues a number of completion statuses from this I/O completion port.
+ ///
+ /// This function is the same as `get` except that it may return more than
+ /// one status. A buffer of "zero" statuses is provided (the contents are
+ /// not read) and then on success this function will return a sub-slice of
+ /// statuses which represent those which were dequeued from this port. This
+ /// function does not wait to fill up the entire list of statuses provided.
+ ///
+ /// Like with `get`, a timeout may be specified for this operation.
+ pub fn get_many<'a>(&self,
+ list: &'a mut [CompletionStatus],
+ timeout: Option<Duration>)
+ -> io::Result<&'a mut [CompletionStatus]>
+ {
+ debug_assert_eq!(mem::size_of::<CompletionStatus>(),
+ mem::size_of::<OVERLAPPED_ENTRY>());
+ let mut removed = 0;
+ let timeout = ::dur2ms(timeout);
+ let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG;
+ let ret = unsafe {
+ GetQueuedCompletionStatusEx(self.handle.raw(),
+ list.as_ptr() as *mut _,
+ len,
+ &mut removed,
+ timeout,
+ FALSE as i32)
+ };
+ match ::cvt(ret) {
+ Ok(_) => Ok(&mut list[..removed as usize]),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Posts a new completion status onto this I/O completion port.
+ ///
+ /// This function will post the given status, with custom parameters, to the
+ /// port. Threads blocked in `get` or `get_many` will eventually receive
+ /// this status.
+ pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
+ let ret = unsafe {
+ PostQueuedCompletionStatus(self.handle.raw(),
+ status.0.dwNumberOfBytesTransferred,
+ status.0.lpCompletionKey,
+ status.0.lpOverlapped)
+ };
+ ::cvt(ret).map(|_| ())
+ }
+}
+
+impl AsRawHandle for CompletionPort {
+ fn as_raw_handle(&self) -> HANDLE {
+ self.handle.raw()
+ }
+}
+
+impl FromRawHandle for CompletionPort {
+ unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort {
+ CompletionPort { handle: Handle::new(handle) }
+ }
+}
+
+impl IntoRawHandle for CompletionPort {
+ fn into_raw_handle(self) -> HANDLE {
+ self.handle.into_raw()
+ }
+}
+
+impl CompletionStatus {
+ /// Creates a new completion status with the provided parameters.
+ ///
+ /// This function is useful when creating a status to send to a port with
+ /// the `post` method. The parameters are opaquely passed through and not
+ /// interpreted by the system at all.
+ pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped)
+ -> CompletionStatus {
+ assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>());
+ CompletionStatus(OVERLAPPED_ENTRY {
+ dwNumberOfBytesTransferred: bytes,
+ lpCompletionKey: token as ULONG_PTR,
+ lpOverlapped: overlapped as *mut _,
+ Internal: 0,
+ })
+ }
+
+ /// Creates a new borrowed completion status from the borrowed
+ /// `OVERLAPPED_ENTRY` argument provided.
+ ///
+ /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
+ /// returning the wrapped structure.
+ pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
+ unsafe { &*(entry as *const _ as *const _) }
+ }
+
+ /// Creates a new "zero" completion status.
+ ///
+ /// This function is useful when creating a stack buffer or vector of
+ /// completion statuses to be passed to the `get_many` function.
+ pub fn zero() -> CompletionStatus {
+ CompletionStatus::new(0, 0, 0 as *mut _)
+ }
+
+ /// Returns the number of bytes that were transferred for the I/O operation
+ /// associated with this completion status.
+ pub fn bytes_transferred(&self) -> u32 {
+ self.0.dwNumberOfBytesTransferred
+ }
+
+ /// Returns the completion key value associated with the file handle whose
+ /// I/O operation has completed.
+ ///
+ /// A completion key is a per-handle key that is specified when it is added
+ /// to an I/O completion port via `add_handle` or `add_socket`.
+ pub fn token(&self) -> usize {
+ self.0.lpCompletionKey as usize
+ }
+
+ /// Returns a pointer to the `Overlapped` structure that was specified when
+ /// the I/O operation was started.
+ pub fn overlapped(&self) -> *mut OVERLAPPED {
+ self.0.lpOverlapped
+ }
+
+ /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
+ pub fn entry(&self) -> &OVERLAPPED_ENTRY {
+ &self.0
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::mem;
+ use std::time::Duration;
+
+ use winapi::shared::basetsd::*;
+ use winapi::shared::winerror::*;
+
+ use iocp::{CompletionPort, CompletionStatus};
+
+ #[test]
+ fn is_send_sync() {
+ fn is_send_sync<T: Send + Sync>() {}
+ is_send_sync::<CompletionPort>();
+ }
+
+ #[test]
+ fn token_right_size() {
+ assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>());
+ }
+
+ #[test]
+ fn timeout() {
+ let c = CompletionPort::new(1).unwrap();
+ let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
+ assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
+ }
+
+ #[test]
+ fn get() {
+ let c = CompletionPort::new(1).unwrap();
+ c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
+ let s = c.get(None).unwrap();
+ assert_eq!(s.bytes_transferred(), 1);
+ assert_eq!(s.token(), 2);
+ assert_eq!(s.overlapped(), 3 as *mut _);
+ }
+
+ #[test]
+ fn get_many() {
+ let c = CompletionPort::new(1).unwrap();
+
+ c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
+ c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
+
+ let mut s = vec![CompletionStatus::zero(); 4];
+ {
+ let s = c.get_many(&mut s, None).unwrap();
+ assert_eq!(s.len(), 2);
+ assert_eq!(s[0].bytes_transferred(), 1);
+ assert_eq!(s[0].token(), 2);
+ assert_eq!(s[0].overlapped(), 3 as *mut _);
+ assert_eq!(s[1].bytes_transferred(), 4);
+ assert_eq!(s[1].token(), 5);
+ assert_eq!(s[1].overlapped(), 6 as *mut _);
+ }
+ assert_eq!(s[2].bytes_transferred(), 0);
+ assert_eq!(s[2].token(), 0);
+ assert_eq!(s[2].overlapped(), 0 as *mut _);
+ }
+}
diff --git a/third_party/rust/miow/src/lib.rs b/third_party/rust/miow/src/lib.rs
new file mode 100644
index 0000000000..a0d53113c5
--- /dev/null
+++ b/third_party/rust/miow/src/lib.rs
@@ -0,0 +1,57 @@
+//! A zero overhead Windows I/O library
+
+#![cfg(windows)]
+#![deny(missing_docs)]
+#![allow(bad_style)]
+#![doc(html_root_url = "https://docs.rs/miow/0.3/x86_64-pc-windows-msvc/")]
+
+extern crate socket2;
+extern crate winapi;
+
+#[cfg(test)] extern crate rand;
+
+use std::cmp;
+use std::io;
+use std::time::Duration;
+
+use winapi::shared::minwindef::*;
+use winapi::um::winbase::*;
+
+#[cfg(test)]
+macro_rules! t {
+ ($e:expr) => (match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ })
+}
+
+mod handle;
+mod overlapped;
+
+pub mod iocp;
+pub mod net;
+pub mod pipe;
+
+pub use overlapped::Overlapped;
+
+fn cvt(i: BOOL) -> io::Result<BOOL> {
+ if i == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(i)
+ }
+}
+
+fn dur2ms(dur: Option<Duration>) -> u32 {
+ let dur = match dur {
+ Some(dur) => dur,
+ None => return INFINITE,
+ };
+ let ms = dur.as_secs().checked_mul(1_000);
+ let ms_extra = dur.subsec_nanos() / 1_000_000;
+ ms.and_then(|ms| {
+ ms.checked_add(ms_extra as u64)
+ }).map(|ms| {
+ cmp::min(u32::max_value() as u64, ms) as u32
+ }).unwrap_or(INFINITE - 1)
+}
diff --git a/third_party/rust/miow/src/net.rs b/third_party/rust/miow/src/net.rs
new file mode 100644
index 0000000000..37ca51bead
--- /dev/null
+++ b/third_party/rust/miow/src/net.rs
@@ -0,0 +1,1140 @@
+//! Extensions and types for the standard networking primitives.
+//!
+//! This module contains a number of extension traits for the types in
+//! `std::net` for Windows-specific functionality.
+
+use std::cmp;
+use std::io;
+use std::mem;
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+use std::net::{TcpStream, UdpSocket, SocketAddr, TcpListener};
+use std::net::{SocketAddrV4, Ipv4Addr, SocketAddrV6, Ipv6Addr};
+use std::os::windows::prelude::*;
+
+use winapi::ctypes::*;
+use winapi::shared::guiddef::*;
+use winapi::shared::minwindef::*;
+use winapi::shared::minwindef::{FALSE, TRUE};
+use winapi::shared::ntdef::*;
+use winapi::shared::ws2def::*;
+use winapi::shared::ws2def::SOL_SOCKET;
+use winapi::shared::ws2ipdef::*;
+use winapi::um::minwinbase::*;
+use winapi::um::winsock2::*;
+
+/// A type to represent a buffer in which a socket address will be stored.
+///
+/// This type is used with the `recv_from_overlapped` function on the
+/// `UdpSocketExt` trait to provide space for the overlapped I/O operation to
+/// fill in the address upon completion.
+#[derive(Clone, Copy)]
+pub struct SocketAddrBuf {
+ buf: SOCKADDR_STORAGE,
+ len: c_int,
+}
+
+/// A type to represent a buffer in which an accepted socket's address will be
+/// stored.
+///
+/// This type is used with the `accept_overlapped` method on the
+/// `TcpListenerExt` trait to provide space for the overlapped I/O operation to
+/// fill in the socket addresses upon completion.
+#[repr(C)]
+pub struct AcceptAddrsBuf {
+ // For AcceptEx we've got the restriction that the addresses passed in that
+ // buffer need to be at least 16 bytes more than the maximum address length
+ // for the protocol in question, so add some extra here and there
+ local: SOCKADDR_STORAGE,
+ _pad1: [u8; 16],
+ remote: SOCKADDR_STORAGE,
+ _pad2: [u8; 16],
+}
+
+/// The parsed return value of `AcceptAddrsBuf`.
+pub struct AcceptAddrs<'a> {
+ local: LPSOCKADDR,
+ local_len: c_int,
+ remote: LPSOCKADDR,
+ remote_len: c_int,
+ _data: &'a AcceptAddrsBuf,
+}
+
+struct WsaExtension {
+ guid: GUID,
+ val: AtomicUsize,
+}
+
+/// Additional methods for the `TcpStream` type in the standard library.
+pub trait TcpStreamExt {
+ /// Execute an overlapped read I/O operation on this TCP stream.
+ ///
+ /// This function will issue an overlapped I/O read (via `WSARecv`) on this
+ /// socket. The provided buffer will be filled in when the operation
+ /// completes and the given `OVERLAPPED` instance is used to track the
+ /// overlapped operation.
+ ///
+ /// If the operation succeeds, `Ok(Some(n))` is returned indicating how
+ /// many bytes were read. If the operation returns an error indicating that
+ /// the I/O is currently pending, `Ok(None)` is returned. Otherwise, the
+ /// error associated with the operation is returned and no overlapped
+ /// operation is enqueued.
+ ///
+ /// The number of bytes read will be returned as part of the completion
+ /// notification when the I/O finishes.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers are valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that these two input
+ /// pointers are valid until the I/O operation is completed, typically via
+ /// completion ports and waiting to receive the completion notification on
+ /// the port.
+ unsafe fn read_overlapped(&self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Execute an overlapped write I/O operation on this TCP stream.
+ ///
+ /// This function will issue an overlapped I/O write (via `WSASend`) on this
+ /// socket. The provided buffer will be written when the operation completes
+ /// and the given `OVERLAPPED` instance is used to track the overlapped
+ /// operation.
+ ///
+ /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the
+ /// number of bytes that were written. If the operation returns an error
+ /// indicating that the I/O is currently pending, `Ok(None)` is returned.
+ /// Otherwise, the error associated with the operation is returned and no
+ /// overlapped operation is enqueued.
+ ///
+ /// The number of bytes written will be returned as part of the completion
+ /// notification when the I/O finishes.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers are valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that these two input
+ /// pointers are valid until the I/O operation is completed, typically via
+ /// completion ports and waiting to receive the completion notification on
+ /// the port.
+ unsafe fn write_overlapped(&self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Attempt to consume the internal socket in this builder by executing an
+ /// overlapped connect operation.
+ ///
+ /// This function will issue a connect operation to the address specified on
+ /// the underlying socket, flagging it as an overlapped operation which will
+ /// complete asynchronously. If successful this function will return the
+ /// corresponding TCP stream.
+ ///
+ /// The `buf` argument provided is an initial buffer of data that should be
+ /// sent after the connection is initiated. It's acceptable to
+ /// pass an empty slice here.
+ ///
+ /// This function will also return whether the connect immediately
+ /// succeeded or not. If `None` is returned then the I/O operation is still
+ /// pending and will complete at a later date, and if `Some(bytes)` is
+ /// returned then that many bytes were transferred.
+ ///
+ /// Note that to succeed this requires that the underlying socket has
+ /// previously been bound via a call to `bind` to a local address.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `overlapped` and `buf` pointers to be valid until the end of the I/O
+ /// operation. The kernel also requires that `overlapped` is unique for
+ /// this I/O operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that this pointer is
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ unsafe fn connect_overlapped(&self,
+ addr: &SocketAddr,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Once a `connect_overlapped` has finished, this function needs to be
+ /// called to finish the connect operation.
+ ///
+ /// Currently this just calls `setsockopt` with `SO_UPDATE_CONNECT_CONTEXT`
+ /// to ensure that further functions like `getpeername` and `getsockname`
+ /// work correctly.
+ fn connect_complete(&self) -> io::Result<()>;
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred, along with the results of the `lpFlags` parameter of
+ /// the relevant operation, if applicable.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `OVERLAPPED` instance.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)>;
+}
+
+/// Additional methods for the `UdpSocket` type in the standard library.
+pub trait UdpSocketExt {
+ /// Execute an overlapped receive I/O operation on this UDP socket.
+ ///
+ /// This function will issue an overlapped I/O read (via `WSARecvFrom`) on
+ /// this socket. The provided buffer will be filled in when the operation
+ /// completes, the source from where the data came from will be written to
+ /// `addr`, and the given `OVERLAPPED` instance is used to track the
+ /// overlapped operation.
+ ///
+ /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the
+ /// number of bytes that were read. If the operation returns an error
+ /// indicating that the I/O is currently pending, `Ok(None)` is returned.
+ /// Otherwise, the error associated with the operation is returned and no
+ /// overlapped operation is enqueued.
+ ///
+ /// The number of bytes read will be returned as part of the completion
+ /// notification when the I/O finishes.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf`,
+ /// `addr`, and `overlapped` pointers are valid until the end of the I/O
+ /// operation. The kernel also requires that `overlapped` is unique for this
+ /// I/O operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that these two input
+ /// pointers are valid until the I/O operation is completed, typically via
+ /// completion ports and waiting to receive the completion notification on
+ /// the port.
+ unsafe fn recv_from_overlapped(&self,
+ buf: &mut [u8],
+ addr: *mut SocketAddrBuf,
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Execute an overlapped receive I/O operation on this UDP socket.
+ ///
+ /// This function will issue an overlapped I/O read (via `WSARecv`) on
+ /// this socket. The provided buffer will be filled in when the operation
+ /// completes, the source from where the data came from will be written to
+ /// `addr`, and the given `OVERLAPPED` instance is used to track the
+ /// overlapped operation.
+ ///
+ /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the
+ /// number of bytes that were read. If the operation returns an error
+ /// indicating that the I/O is currently pending, `Ok(None)` is returned.
+ /// Otherwise, the error associated with the operation is returned and no
+ /// overlapped operation is enqueued.
+ ///
+ /// The number of bytes read will be returned as part of the completion
+ /// notification when the I/O finishes.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf`,
+ /// and `overlapped` pointers are valid until the end of the I/O
+ /// operation. The kernel also requires that `overlapped` is unique for this
+ /// I/O operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that these two input
+ /// pointers are valid until the I/O operation is completed, typically via
+ /// completion ports and waiting to receive the completion notification on
+ /// the port.
+ unsafe fn recv_overlapped(&self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Execute an overlapped send I/O operation on this UDP socket.
+ ///
+ /// This function will issue an overlapped I/O write (via `WSASendTo`) on
+ /// this socket to the address specified by `addr`. The provided buffer will
+ /// be written when the operation completes and the given `OVERLAPPED`
+ /// instance is used to track the overlapped operation.
+ ///
+ /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte
+ /// were written. If the operation returns an error indicating that the I/O
+ /// is currently pending, `Ok(None)` is returned. Otherwise, the error
+ /// associated with the operation is returned and no overlapped operation
+ /// is enqueued.
+ ///
+ /// The number of bytes written will be returned as part of the completion
+ /// notification when the I/O finishes.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers are valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that these two input
+ /// pointers are valid until the I/O operation is completed, typically via
+ /// completion ports and waiting to receive the completion notification on
+ /// the port.
+ unsafe fn send_to_overlapped(&self,
+ buf: &[u8],
+ addr: &SocketAddr,
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Execute an overlapped send I/O operation on this UDP socket.
+ ///
+ /// This function will issue an overlapped I/O write (via `WSASend`) on
+ /// this socket to the address it was previously connected to. The provided
+ /// buffer will be written when the operation completes and the given `OVERLAPPED`
+ /// instance is used to track the overlapped operation.
+ ///
+ /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte
+ /// were written. If the operation returns an error indicating that the I/O
+ /// is currently pending, `Ok(None)` is returned. Otherwise, the error
+ /// associated with the operation is returned and no overlapped operation
+ /// is enqueued.
+ ///
+ /// The number of bytes written will be returned as part of the completion
+ /// notification when the I/O finishes.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers are valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that these two input
+ /// pointers are valid until the I/O operation is completed, typically via
+ /// completion ports and waiting to receive the completion notification on
+ /// the port.
+ unsafe fn send_overlapped(&self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>>;
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred, along with the results of the `lpFlags` parameter of
+ /// the relevant operation, if applicable.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `OVERLAPPED` instance.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)>;
+}
+
+/// Additional methods for the `TcpListener` type in the standard library.
+pub trait TcpListenerExt {
+ /// Perform an accept operation on this listener, accepting a connection in
+ /// an overlapped fashion.
+ ///
+ /// This function will issue an I/O request to accept an incoming connection
+ /// with the specified overlapped instance. The `socket` provided must be a
+ /// configured but not bound or connected socket, and if successful this
+ /// will consume the internal socket of the builder to return a TCP stream.
+ ///
+ /// The `addrs` buffer provided will be filled in with the local and remote
+ /// addresses of the connection upon completion.
+ ///
+ /// If the accept succeeds immediately, `Ok(true)` is returned. If
+ /// the connect indicates that the I/O is currently pending, `Ok(false)` is
+ /// returned. Otherwise, the error associated with the operation is
+ /// returned and no overlapped operation is enqueued.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `addrs` and `overlapped` pointers are valid until the end of the I/O
+ /// operation. The kernel also requires that `overlapped` is unique for this
+ /// I/O operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ unsafe fn accept_overlapped(&self,
+ socket: &TcpStream,
+ addrs: &mut AcceptAddrsBuf,
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<bool>;
+
+ /// Once an `accept_overlapped` has finished, this function needs to be
+ /// called to finish the accept operation.
+ ///
+ /// Currently this just calls `setsockopt` with `SO_UPDATE_ACCEPT_CONTEXT`
+ /// to ensure that further functions like `getpeername` and `getsockname`
+ /// work correctly.
+ fn accept_complete(&self, socket: &TcpStream) -> io::Result<()>;
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred, along with the results of the `lpFlags` parameter of
+ /// the relevant operation, if applicable.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `OVERLAPPED` instance.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)>;
+}
+
+#[doc(hidden)]
+trait NetInt {
+ fn from_be(i: Self) -> Self;
+ fn to_be(&self) -> Self;
+}
+macro_rules! doit {
+ ($($t:ident)*) => ($(impl NetInt for $t {
+ fn from_be(i: Self) -> Self { <$t>::from_be(i) }
+ fn to_be(&self) -> Self { <$t>::to_be(*self) }
+ })*)
+}
+doit! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize }
+
+// fn hton<I: NetInt>(i: I) -> I { i.to_be() }
+fn ntoh<I: NetInt>(i: I) -> I { I::from_be(i) }
+
+fn last_err() -> io::Result<Option<usize>> {
+ let err = unsafe { WSAGetLastError() };
+ if err == WSA_IO_PENDING as i32 {
+ Ok(None)
+ } else {
+ Err(io::Error::from_raw_os_error(err))
+ }
+}
+
+fn cvt(i: c_int, size: DWORD) -> io::Result<Option<usize>> {
+ if i == SOCKET_ERROR {
+ last_err()
+ } else {
+ Ok(Some(size as usize))
+ }
+}
+
+fn socket_addr_to_ptrs(addr: &SocketAddr) -> (*const SOCKADDR, c_int) {
+ match *addr {
+ SocketAddr::V4(ref a) => {
+ (a as *const _ as *const _, mem::size_of::<SOCKADDR_IN>() as c_int)
+ }
+ SocketAddr::V6(ref a) => {
+ (a as *const _ as *const _, mem::size_of::<SOCKADDR_IN6_LH>() as c_int)
+ }
+ }
+}
+
+unsafe fn ptrs_to_socket_addr(ptr: *const SOCKADDR,
+ len: c_int) -> Option<SocketAddr> {
+ if (len as usize) < mem::size_of::<c_int>() {
+ return None
+ }
+ match (*ptr).sa_family as i32 {
+ AF_INET if len as usize >= mem::size_of::<SOCKADDR_IN>() => {
+ let b = &*(ptr as *const SOCKADDR_IN);
+ let ip = ntoh(*b.sin_addr.S_un.S_addr());
+ let ip = Ipv4Addr::new((ip >> 24) as u8,
+ (ip >> 16) as u8,
+ (ip >> 8) as u8,
+ (ip >> 0) as u8);
+ Some(SocketAddr::V4(SocketAddrV4::new(ip, ntoh(b.sin_port))))
+ }
+ AF_INET6 if len as usize >= mem::size_of::<SOCKADDR_IN6_LH>() => {
+ let b = &*(ptr as *const SOCKADDR_IN6_LH);
+ let arr = b.sin6_addr.u.Byte();
+ let ip = Ipv6Addr::new(
+ ((arr[0] as u16) << 8) | (arr[1] as u16),
+ ((arr[2] as u16) << 8) | (arr[3] as u16),
+ ((arr[4] as u16) << 8) | (arr[5] as u16),
+ ((arr[6] as u16) << 8) | (arr[7] as u16),
+ ((arr[8] as u16) << 8) | (arr[9] as u16),
+ ((arr[10] as u16) << 8) | (arr[11] as u16),
+ ((arr[12] as u16) << 8) | (arr[13] as u16),
+ ((arr[14] as u16) << 8) | (arr[15] as u16));
+ let addr = SocketAddrV6::new(ip, ntoh(b.sin6_port),
+ ntoh(b.sin6_flowinfo),
+ ntoh(*b.u.sin6_scope_id()));
+ Some(SocketAddr::V6(addr))
+ }
+ _ => None
+ }
+}
+
+unsafe fn slice2buf(slice: &[u8]) -> WSABUF {
+ WSABUF {
+ len: cmp::min(slice.len(), <u_long>::max_value() as usize) as u_long,
+ buf: slice.as_ptr() as *mut _,
+ }
+}
+
+unsafe fn result(socket: SOCKET, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)> {
+ let mut transferred = 0;
+ let mut flags = 0;
+ let r = WSAGetOverlappedResult(socket,
+ overlapped,
+ &mut transferred,
+ FALSE,
+ &mut flags);
+ if r == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok((transferred as usize, flags))
+ }
+}
+
+impl TcpStreamExt for TcpStream {
+ unsafe fn read_overlapped(&self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ let mut buf = slice2buf(buf);
+ let mut flags = 0;
+ let mut bytes_read: DWORD = 0;
+ let r = WSARecv(self.as_raw_socket() as SOCKET, &mut buf, 1,
+ &mut bytes_read, &mut flags, overlapped, None);
+ cvt(r, bytes_read)
+ }
+
+ unsafe fn write_overlapped(&self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ let mut buf = slice2buf(buf);
+ let mut bytes_written = 0;
+
+ // Note here that we capture the number of bytes written. The
+ // documentation on MSDN, however, states:
+ //
+ // > Use NULL for this parameter if the lpOverlapped parameter is not
+ // > NULL to avoid potentially erroneous results. This parameter can be
+ // > NULL only if the lpOverlapped parameter is not NULL.
+ //
+ // If we're not passing a null overlapped pointer here, then why are we
+ // then capturing the number of bytes! Well so it turns out that this is
+ // clearly faster to learn the bytes here rather than later calling
+ // `WSAGetOverlappedResult`, and in practice almost all implementations
+ // use this anyway [1].
+ //
+ // As a result we use this to and report back the result.
+ //
+ // [1]: https://github.com/carllerche/mio/pull/520#issuecomment-273983823
+ let r = WSASend(self.as_raw_socket() as SOCKET, &mut buf, 1,
+ &mut bytes_written, 0, overlapped, None);
+ cvt(r, bytes_written)
+ }
+
+ unsafe fn connect_overlapped(&self,
+ addr: &SocketAddr,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ connect_overlapped(self.as_raw_socket() as SOCKET, addr, buf, overlapped)
+ }
+
+ fn connect_complete(&self) -> io::Result<()> {
+ const SO_UPDATE_CONNECT_CONTEXT: c_int = 0x7010;
+ let result = unsafe {
+ setsockopt(self.as_raw_socket() as SOCKET,
+ SOL_SOCKET,
+ SO_UPDATE_CONNECT_CONTEXT,
+ 0 as *const _,
+ 0)
+ };
+ if result == 0 {
+ Ok(())
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)> {
+ result(self.as_raw_socket() as SOCKET, overlapped)
+ }
+}
+
+unsafe fn connect_overlapped(socket: SOCKET,
+ addr: &SocketAddr,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ static CONNECTEX: WsaExtension = WsaExtension {
+ guid: GUID {
+ Data1: 0x25a207b9,
+ Data2: 0xddf3,
+ Data3: 0x4660,
+ Data4: [0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e],
+ },
+ val: ATOMIC_USIZE_INIT,
+ };
+ type ConnectEx = unsafe extern "system" fn(SOCKET, *const SOCKADDR,
+ c_int, PVOID, DWORD, LPDWORD,
+ LPOVERLAPPED) -> BOOL;
+
+ let ptr = try!(CONNECTEX.get(socket));
+ assert!(ptr != 0);
+ let connect_ex = mem::transmute::<_, ConnectEx>(ptr);
+
+ let (addr_buf, addr_len) = socket_addr_to_ptrs(addr);
+ let mut bytes_sent: DWORD = 0;
+ let r = connect_ex(socket, addr_buf, addr_len,
+ buf.as_ptr() as *mut _,
+ buf.len() as u32,
+ &mut bytes_sent, overlapped);
+ if r == TRUE {
+ Ok(Some(bytes_sent as usize))
+ } else {
+ last_err()
+ }
+}
+
+impl UdpSocketExt for UdpSocket {
+ unsafe fn recv_from_overlapped(&self,
+ buf: &mut [u8],
+ addr: *mut SocketAddrBuf,
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ let mut buf = slice2buf(buf);
+ let mut flags = 0;
+ let mut received_bytes: DWORD = 0;
+ let r = WSARecvFrom(self.as_raw_socket() as SOCKET, &mut buf, 1,
+ &mut received_bytes, &mut flags,
+ &mut (*addr).buf as *mut _ as *mut _,
+ &mut (*addr).len,
+ overlapped, None);
+ cvt(r, received_bytes)
+ }
+
+ unsafe fn recv_overlapped(&self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ let mut buf = slice2buf(buf);
+ let mut flags = 0;
+ let mut received_bytes: DWORD = 0;
+ let r = WSARecv(self.as_raw_socket() as SOCKET, &mut buf, 1,
+ &mut received_bytes, &mut flags,
+ overlapped, None);
+ cvt(r, received_bytes)
+ }
+
+ unsafe fn send_to_overlapped(&self,
+ buf: &[u8],
+ addr: &SocketAddr,
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ let (addr_buf, addr_len) = socket_addr_to_ptrs(addr);
+ let mut buf = slice2buf(buf);
+ let mut sent_bytes = 0;
+ let r = WSASendTo(self.as_raw_socket() as SOCKET, &mut buf, 1,
+ &mut sent_bytes, 0,
+ addr_buf as *const _, addr_len,
+ overlapped, None);
+ cvt(r, sent_bytes)
+ }
+
+ unsafe fn send_overlapped(&self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ let mut buf = slice2buf(buf);
+ let mut sent_bytes = 0;
+ let r = WSASend(self.as_raw_socket() as SOCKET, &mut buf, 1,
+ &mut sent_bytes, 0,
+ overlapped, None);
+ cvt(r, sent_bytes)
+ }
+
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)> {
+ result(self.as_raw_socket() as SOCKET, overlapped)
+ }
+}
+
+impl TcpListenerExt for TcpListener {
+ unsafe fn accept_overlapped(&self,
+ socket: &TcpStream,
+ addrs: &mut AcceptAddrsBuf,
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<bool> {
+ static ACCEPTEX: WsaExtension = WsaExtension {
+ guid: GUID {
+ Data1: 0xb5367df1,
+ Data2: 0xcbac,
+ Data3: 0x11cf,
+ Data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92],
+ },
+ val: ATOMIC_USIZE_INIT,
+ };
+ type AcceptEx = unsafe extern "system" fn(SOCKET, SOCKET, PVOID,
+ DWORD, DWORD, DWORD, LPDWORD,
+ LPOVERLAPPED) -> BOOL;
+
+ let ptr = try!(ACCEPTEX.get(self.as_raw_socket() as SOCKET));
+ assert!(ptr != 0);
+ let accept_ex = mem::transmute::<_, AcceptEx>(ptr);
+
+ let mut bytes = 0;
+ let (a, b, c, d) = (*addrs).args();
+ let r = accept_ex(self.as_raw_socket() as SOCKET, socket.as_raw_socket() as SOCKET,
+ a, b, c, d, &mut bytes, overlapped);
+ let succeeded = if r == TRUE {
+ true
+ } else {
+ try!(last_err());
+ false
+ };
+ Ok(succeeded)
+ }
+
+ fn accept_complete(&self, socket: &TcpStream) -> io::Result<()> {
+ const SO_UPDATE_ACCEPT_CONTEXT: c_int = 0x700B;
+ let me = self.as_raw_socket();
+ let result = unsafe {
+ setsockopt(socket.as_raw_socket() as SOCKET,
+ SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ &me as *const _ as *const _,
+ mem::size_of_val(&me) as c_int)
+ };
+ if result == 0 {
+ Ok(())
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+
+ unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<(usize, u32)> {
+ result(self.as_raw_socket() as SOCKET, overlapped)
+ }
+}
+
+impl SocketAddrBuf {
+ /// Creates a new blank socket address buffer.
+ ///
+ /// This should be used before a call to `recv_from_overlapped` overlapped
+ /// to create an instance to pass down.
+ pub fn new() -> SocketAddrBuf {
+ SocketAddrBuf {
+ buf: unsafe { mem::zeroed() },
+ len: mem::size_of::<SOCKADDR_STORAGE>() as c_int,
+ }
+ }
+
+ /// Parses this buffer to return a standard socket address.
+ ///
+ /// This function should be called after the buffer has been filled in with
+ /// a call to `recv_from_overlapped` being completed. It will interpret the
+ /// address filled in and return the standard socket address type.
+ ///
+ /// If an error is encountered then `None` is returned.
+ pub fn to_socket_addr(&self) -> Option<SocketAddr> {
+ unsafe {
+ ptrs_to_socket_addr(&self.buf as *const _ as *const _, self.len)
+ }
+ }
+}
+
+static GETACCEPTEXSOCKADDRS: WsaExtension = WsaExtension {
+ guid: GUID {
+ Data1: 0xb5367df2,
+ Data2: 0xcbac,
+ Data3: 0x11cf,
+ Data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92],
+ },
+ val: ATOMIC_USIZE_INIT,
+};
+type GetAcceptExSockaddrs = unsafe extern "system" fn(PVOID, DWORD, DWORD, DWORD,
+ *mut LPSOCKADDR, LPINT,
+ *mut LPSOCKADDR, LPINT);
+
+impl AcceptAddrsBuf {
+ /// Creates a new blank buffer ready to be passed to a call to
+ /// `accept_overlapped`.
+ pub fn new() -> AcceptAddrsBuf {
+ unsafe { mem::zeroed() }
+ }
+
+ /// Parses the data contained in this address buffer, returning the parsed
+ /// result if successful.
+ ///
+ /// This function can be called after a call to `accept_overlapped` has
+ /// succeeded to parse out the data that was written in.
+ pub fn parse(&self, socket: &TcpListener) -> io::Result<AcceptAddrs> {
+ let mut ret = AcceptAddrs {
+ local: 0 as *mut _, local_len: 0,
+ remote: 0 as *mut _, remote_len: 0,
+ _data: self,
+ };
+ let ptr = try!(GETACCEPTEXSOCKADDRS.get(socket.as_raw_socket() as SOCKET));
+ assert!(ptr != 0);
+ unsafe {
+ let get_sockaddrs = mem::transmute::<_, GetAcceptExSockaddrs>(ptr);
+ let (a, b, c, d) = self.args();
+ get_sockaddrs(a, b, c, d,
+ &mut ret.local, &mut ret.local_len,
+ &mut ret.remote, &mut ret.remote_len);
+ Ok(ret)
+ }
+ }
+
+ fn args(&self) -> (PVOID, DWORD, DWORD, DWORD) {
+ let remote_offset = unsafe {
+ &(*(0 as *const AcceptAddrsBuf)).remote as *const _ as usize
+ };
+ (self as *const _ as *mut _, 0, remote_offset as DWORD,
+ (mem::size_of_val(self) - remote_offset) as DWORD)
+ }
+}
+
+impl<'a> AcceptAddrs<'a> {
+ /// Returns the local socket address contained in this buffer.
+ pub fn local(&self) -> Option<SocketAddr> {
+ unsafe { ptrs_to_socket_addr(self.local, self.local_len) }
+ }
+
+ /// Returns the remote socket address contained in this buffer.
+ pub fn remote(&self) -> Option<SocketAddr> {
+ unsafe { ptrs_to_socket_addr(self.remote, self.remote_len) }
+ }
+}
+
+impl WsaExtension {
+ fn get(&self, socket: SOCKET) -> io::Result<usize> {
+ let prev = self.val.load(Ordering::SeqCst);
+ if prev != 0 && !cfg!(debug_assertions) {
+ return Ok(prev)
+ }
+ let mut ret = 0 as usize;
+ let mut bytes = 0;
+ let r = unsafe {
+ WSAIoctl(socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &self.guid as *const _ as *mut _,
+ mem::size_of_val(&self.guid) as DWORD,
+ &mut ret as *mut _ as *mut _,
+ mem::size_of_val(&ret) as DWORD,
+ &mut bytes,
+ 0 as *mut _, None)
+ };
+ cvt(r, 0).map(|_| {
+ debug_assert_eq!(bytes as usize, mem::size_of_val(&ret));
+ debug_assert!(prev == 0 || prev == ret);
+ self.val.store(ret, Ordering::SeqCst);
+ ret
+ })
+
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::net::{TcpListener, UdpSocket, TcpStream, SocketAddr};
+ use std::thread;
+ use std::io::prelude::*;
+
+ use socket2::{Socket, Type, Domain};
+
+ use Overlapped;
+ use iocp::CompletionPort;
+ use net::{TcpStreamExt, UdpSocketExt, SocketAddrBuf};
+ use net::{TcpListenerExt, AcceptAddrsBuf};
+
+ fn each_ip(f: &mut FnMut(SocketAddr)) {
+ f(t!("127.0.0.1:0".parse()));
+ f(t!("[::1]:0".parse()));
+ }
+
+ #[test]
+ fn tcp_read() {
+ each_ip(&mut |addr| {
+ let l = t!(TcpListener::bind(addr));
+ let addr = t!(l.local_addr());
+ let t = thread::spawn(move || {
+ let mut a = t!(l.accept()).0;
+ t!(a.write_all(&[1, 2, 3]));
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ let s = t!(TcpStream::connect(addr));
+ t!(cp.add_socket(1, &s));
+
+ let mut b = [0; 10];
+ let a = Overlapped::zero();
+ unsafe {
+ t!(s.read_overlapped(&mut b, a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+ assert_eq!(&b[0..3], &[1, 2, 3]);
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn tcp_write() {
+ each_ip(&mut |addr| {
+ let l = t!(TcpListener::bind(addr));
+ let addr = t!(l.local_addr());
+ let t = thread::spawn(move || {
+ let mut a = t!(l.accept()).0;
+ let mut b = [0; 10];
+ let n = t!(a.read(&mut b));
+ assert_eq!(n, 3);
+ assert_eq!(&b[0..3], &[1, 2, 3]);
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ let s = t!(TcpStream::connect(addr));
+ t!(cp.add_socket(1, &s));
+
+ let b = [1, 2, 3];
+ let a = Overlapped::zero();
+ unsafe {
+ t!(s.write_overlapped(&b, a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn tcp_connect() {
+ each_ip(&mut |addr_template| {
+ let l = t!(TcpListener::bind(addr_template));
+ let addr = t!(l.local_addr());
+ let t = thread::spawn(move || {
+ t!(l.accept());
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ let domain = match addr {
+ SocketAddr::V4(..) => Domain::ipv4(),
+ SocketAddr::V6(..) => Domain::ipv6(),
+ };
+ let socket = t!(Socket::new(domain, Type::stream(), None));
+ t!(socket.bind(&addr_template.into()));
+ let socket = socket.into_tcp_stream();
+ t!(cp.add_socket(1, &socket));
+
+ let a = Overlapped::zero();
+ unsafe {
+ t!(socket.connect_overlapped(&addr, &[], a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 0);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+ t!(socket.connect_complete());
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn udp_recv_from() {
+ each_ip(&mut |addr| {
+ let a = t!(UdpSocket::bind(addr));
+ let b = t!(UdpSocket::bind(addr));
+ let a_addr = t!(a.local_addr());
+ let b_addr = t!(b.local_addr());
+ let t = thread::spawn(move || {
+ t!(a.send_to(&[1, 2, 3], b_addr));
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_socket(1, &b));
+
+ let mut buf = [0; 10];
+ let a = Overlapped::zero();
+ let mut addr = SocketAddrBuf::new();
+ unsafe {
+ t!(b.recv_from_overlapped(&mut buf, &mut addr, a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+ assert_eq!(&buf[..3], &[1, 2, 3]);
+ assert_eq!(addr.to_socket_addr(), Some(a_addr));
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn udp_recv() {
+ each_ip(&mut |addr| {
+ let a = t!(UdpSocket::bind(addr));
+ let b = t!(UdpSocket::bind(addr));
+ let a_addr = t!(a.local_addr());
+ let b_addr = t!(b.local_addr());
+ assert!(b.connect(a_addr).is_ok());
+ assert!(a.connect(b_addr).is_ok());
+ let t = thread::spawn(move || {
+ t!(a.send_to(&[1, 2, 3], b_addr));
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_socket(1, &b));
+
+ let mut buf = [0; 10];
+ let a = Overlapped::zero();
+ unsafe {
+ t!(b.recv_overlapped(&mut buf, a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+ assert_eq!(&buf[..3], &[1, 2, 3]);
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn udp_send_to() {
+ each_ip(&mut |addr| {
+ let a = t!(UdpSocket::bind(addr));
+ let b = t!(UdpSocket::bind(addr));
+ let a_addr = t!(a.local_addr());
+ let b_addr = t!(b.local_addr());
+ let t = thread::spawn(move || {
+ let mut b = [0; 100];
+ let (n, addr) = t!(a.recv_from(&mut b));
+ assert_eq!(n, 3);
+ assert_eq!(addr, b_addr);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_socket(1, &b));
+
+ let a = Overlapped::zero();
+ unsafe {
+ t!(b.send_to_overlapped(&[1, 2, 3], &a_addr, a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn udp_send() {
+ each_ip(&mut |addr| {
+ let a = t!(UdpSocket::bind(addr));
+ let b = t!(UdpSocket::bind(addr));
+ let a_addr = t!(a.local_addr());
+ let b_addr = t!(b.local_addr());
+ assert!(b.connect(a_addr).is_ok());
+ assert!(a.connect(b_addr).is_ok());
+ let t = thread::spawn(move || {
+ let mut b = [0; 100];
+ let (n, addr) = t!(a.recv_from(&mut b));
+ assert_eq!(n, 3);
+ assert_eq!(addr, b_addr);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_socket(1, &b));
+
+ let a = Overlapped::zero();
+ unsafe {
+ t!(b.send_overlapped(&[1, 2, 3], a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+
+ t!(t.join());
+ })
+ }
+
+ #[test]
+ fn tcp_accept() {
+ each_ip(&mut |addr_template| {
+ let l = t!(TcpListener::bind(addr_template));
+ let addr = t!(l.local_addr());
+ let t = thread::spawn(move || {
+ let socket = t!(TcpStream::connect(addr));
+ (socket.local_addr().unwrap(), socket.peer_addr().unwrap())
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ let domain = match addr {
+ SocketAddr::V4(..) => Domain::ipv4(),
+ SocketAddr::V6(..) => Domain::ipv6(),
+ };
+ let socket = t!(Socket::new(domain, Type::stream(), None))
+ .into_tcp_stream();
+ t!(cp.add_socket(1, &l));
+
+ let a = Overlapped::zero();
+ let mut addrs = AcceptAddrsBuf::new();
+ unsafe {
+ t!(l.accept_overlapped(&socket, &mut addrs, a.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 0);
+ assert_eq!(status.token(), 1);
+ assert_eq!(status.overlapped(), a.raw());
+ t!(l.accept_complete(&socket));
+
+ let (remote, local) = t!(t.join());
+ let addrs = addrs.parse(&l).unwrap();
+ assert_eq!(addrs.local(), Some(local));
+ assert_eq!(addrs.remote(), Some(remote));
+ })
+ }
+}
diff --git a/third_party/rust/miow/src/overlapped.rs b/third_party/rust/miow/src/overlapped.rs
new file mode 100644
index 0000000000..68c54df3b4
--- /dev/null
+++ b/third_party/rust/miow/src/overlapped.rs
@@ -0,0 +1,95 @@
+use std::fmt;
+use std::io;
+use std::mem;
+use std::ptr;
+
+use winapi::shared::ntdef::{
+ HANDLE,
+ NULL,
+};
+use winapi::um::minwinbase::*;
+use winapi::um::synchapi::*;
+
+/// A wrapper around `OVERLAPPED` to provide "rustic" accessors and
+/// initializers.
+pub struct Overlapped(OVERLAPPED);
+
+impl fmt::Debug for Overlapped {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "OVERLAPPED")
+ }
+}
+
+unsafe impl Send for Overlapped {}
+unsafe impl Sync for Overlapped {}
+
+impl Overlapped {
+ /// Creates a new zeroed out instance of an overlapped I/O tracking state.
+ ///
+ /// This is suitable for passing to methods which will then later get
+ /// notified via an I/O Completion Port.
+ pub fn zero() -> Overlapped {
+ Overlapped(unsafe { mem::zeroed() })
+ }
+
+ /// Creates a new `Overlapped` with an initialized non-null `hEvent`. The caller is
+ /// responsible for calling `CloseHandle` on the `hEvent` field of the returned
+ /// `Overlapped`. The event is created with `bManualReset` set to `FALSE`, meaning after a
+ /// single thread waits on the event, it will be reset.
+ pub fn initialize_with_autoreset_event() -> io::Result<Overlapped> {
+ let event = unsafe {CreateEventW(ptr::null_mut(), 0i32, 0i32, ptr::null())};
+ if event == NULL {
+ return Err(io::Error::last_os_error());
+ }
+ let mut overlapped = Self::zero();
+ overlapped.set_event(event);
+ Ok(overlapped)
+ }
+
+ /// Creates a new `Overlapped` function pointer from the underlying
+ /// `OVERLAPPED`, wrapping in the "rusty" wrapper for working with
+ /// accessors.
+ ///
+ /// # Unsafety
+ ///
+ /// This function doesn't validate `ptr` nor the lifetime of the returned
+ /// pointer at all, it's recommended to use this method with extreme
+ /// caution.
+ pub unsafe fn from_raw<'a>(ptr: *mut OVERLAPPED) -> &'a mut Overlapped {
+ &mut *(ptr as *mut Overlapped)
+ }
+
+ /// Gain access to the raw underlying data
+ pub fn raw(&self) -> *mut OVERLAPPED {
+ &self.0 as *const _ as *mut _
+ }
+
+ /// Sets the offset inside this overlapped structure.
+ ///
+ /// Note that for I/O operations in general this only has meaning for I/O
+ /// handles that are on a seeking device that supports the concept of an
+ /// offset.
+ pub fn set_offset(&mut self, offset: u64) {
+ let s = unsafe { self.0.u.s_mut() };
+ s.Offset = offset as u32;
+ s.OffsetHigh = (offset >> 32) as u32;
+ }
+
+ /// Reads the offset inside this overlapped structure.
+ pub fn offset(&self) -> u64 {
+ let s = unsafe { self.0.u.s() };
+ (s.Offset as u64) | ((s.OffsetHigh as u64) << 32)
+ }
+
+ /// Sets the `hEvent` field of this structure.
+ ///
+ /// The event specified can be null.
+ pub fn set_event(&mut self, event: HANDLE) {
+ self.0.hEvent = event;
+ }
+
+ /// Reads the `hEvent` field of this structure, may return null.
+ pub fn event(&self) -> HANDLE {
+ self.0.hEvent
+ }
+}
diff --git a/third_party/rust/miow/src/pipe.rs b/third_party/rust/miow/src/pipe.rs
new file mode 100644
index 0000000000..479789287a
--- /dev/null
+++ b/third_party/rust/miow/src/pipe.rs
@@ -0,0 +1,716 @@
+//! Named pipes
+
+use std::cell::RefCell;
+use std::ffi::OsStr;
+use std::fs::{OpenOptions, File};
+use std::io::prelude::*;
+use std::io;
+use std::os::windows::ffi::*;
+use std::os::windows::io::*;
+use std::time::Duration;
+
+use winapi::shared::ntdef::HANDLE;
+use winapi::shared::minwindef::*;
+use winapi::shared::winerror::*;
+use winapi::um::fileapi::*;
+use winapi::um::handleapi::*;
+use winapi::um::ioapiset::*;
+use winapi::um::minwinbase::*;
+use winapi::um::namedpipeapi::*;
+use winapi::um::winbase::*;
+use handle::Handle;
+use overlapped::Overlapped;
+
+/// Readable half of an anonymous pipe.
+#[derive(Debug)]
+pub struct AnonRead(Handle);
+
+/// Writable half of an anonymous pipe.
+#[derive(Debug)]
+pub struct AnonWrite(Handle);
+
+/// A named pipe that can accept connections.
+#[derive(Debug)]
+pub struct NamedPipe(Handle);
+
+/// A builder structure for creating a new named pipe.
+#[derive(Debug)]
+pub struct NamedPipeBuilder {
+ name: Vec<u16>,
+ dwOpenMode: DWORD,
+ dwPipeMode: DWORD,
+ nMaxInstances: DWORD,
+ nOutBufferSize: DWORD,
+ nInBufferSize: DWORD,
+ nDefaultTimeOut: DWORD,
+}
+
+/// Creates a new anonymous in-memory pipe, returning the read/write ends of the
+/// pipe.
+///
+/// The buffer size for this pipe may also be specified, but the system will
+/// normally use this as a suggestion and it's not guaranteed that the buffer
+/// will be precisely this size.
+pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
+ let mut read = 0 as HANDLE;
+ let mut write = 0 as HANDLE;
+ try!(::cvt(unsafe {
+ CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size)
+ }));
+ Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
+}
+
+impl Read for AnonRead {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
+}
+impl<'a> Read for &'a AnonRead {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
+}
+
+impl AsRawHandle for AnonRead {
+ fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
+}
+impl FromRawHandle for AnonRead {
+ unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
+ AnonRead(Handle::new(handle))
+ }
+}
+impl IntoRawHandle for AnonRead {
+ fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
+}
+
+impl Write for AnonWrite {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
+ fn flush(&mut self) -> io::Result<()> { Ok(()) }
+}
+impl<'a> Write for &'a AnonWrite {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
+ fn flush(&mut self) -> io::Result<()> { Ok(()) }
+}
+
+impl AsRawHandle for AnonWrite {
+ fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
+}
+impl FromRawHandle for AnonWrite {
+ unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
+ AnonWrite(Handle::new(handle))
+ }
+}
+impl IntoRawHandle for AnonWrite {
+ fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
+}
+
+/// A convenience function to connect to a named pipe.
+///
+/// This function will block the calling process until it can connect to the
+/// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
+/// to block until it can connect.
+pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
+ _connect(addr.as_ref())
+}
+
+fn _connect(addr: &OsStr) -> io::Result<File> {
+ let mut r = OpenOptions::new();
+ let mut w = OpenOptions::new();
+ let mut rw = OpenOptions::new();
+ r.read(true);
+ w.write(true);
+ rw.read(true).write(true);
+ loop {
+ let res = rw.open(addr).or_else(|_| r.open(addr))
+ .or_else(|_| w.open(addr));
+ match res {
+ Ok(f) => return Ok(f),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32)
+ => {}
+ Err(e) => return Err(e),
+ }
+
+ try!(NamedPipe::wait(addr, Some(Duration::new(20, 0))));
+ }
+}
+
+impl NamedPipe {
+ /// Creates a new initial named pipe.
+ ///
+ /// This function is equivalent to:
+ ///
+ /// ```
+ /// use miow::pipe::NamedPipeBuilder;
+ ///
+ /// # let addr = "foo";
+ /// NamedPipeBuilder::new(addr)
+ /// .first(true)
+ /// .inbound(true)
+ /// .outbound(true)
+ /// .out_buffer_size(65536)
+ /// .in_buffer_size(65536)
+ /// .create();
+ /// ```
+ pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
+ NamedPipeBuilder::new(addr).create()
+ }
+
+ /// Waits until either a time-out interval elapses or an instance of the
+ /// specified named pipe is available for connection.
+ ///
+ /// If this function succeeds the process can create a `File` to connect to
+ /// the named pipe.
+ pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>)
+ -> io::Result<()> {
+ NamedPipe::_wait(addr.as_ref(), timeout)
+ }
+
+ fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
+ let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
+ let timeout = ::dur2ms(timeout);
+ ::cvt(unsafe {
+ WaitNamedPipeW(addr.as_ptr(), timeout)
+ }).map(|_| ())
+ }
+
+ /// Connects this named pipe to a client, blocking until one becomes
+ /// available.
+ ///
+ /// This function will call the `ConnectNamedPipe` function to await for a
+ /// client to connect. This can be called immediately after the pipe is
+ /// created, or after it has been disconnected from a previous client.
+ pub fn connect(&self) -> io::Result<()> {
+ match ::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
+ Ok(_) => Ok(()),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32)
+ => Ok(()),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Issue a connection request with the specified overlapped operation.
+ ///
+ /// This function will issue a request to connect a client to this server,
+ /// returning immediately after starting the overlapped operation.
+ ///
+ /// If this function immediately succeeds then `Ok(true)` is returned. If
+ /// the overlapped operation is enqueued and pending, then `Ok(false)` is
+ /// returned. Otherwise an error is returned indicating what went wrong.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the
+ /// `overlapped` pointer is valid until the end of the I/O operation. The
+ /// kernel also requires that `overlapped` is unique for this I/O operation
+ /// and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that this pointer is
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<bool> {
+ match ::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
+ Ok(_) => Ok(true),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32)
+ => Ok(true),
+ Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
+ => Ok(false),
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Disconnects this named pipe from any connected client.
+ pub fn disconnect(&self) -> io::Result<()> {
+ ::cvt(unsafe {
+ DisconnectNamedPipe(self.0.raw())
+ }).map(|_| ())
+ }
+
+ /// Issues an overlapped read operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous read to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes read. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn read_overlapped(&self,
+ buf: &mut [u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ self.0.read_overlapped(buf, overlapped)
+ }
+
+ /// Issues an overlapped write operation to occur on this pipe.
+ ///
+ /// This function will issue an asynchronous write to occur in an overlapped
+ /// fashion, returning immediately. The `buf` provided will be filled in
+ /// with data and the request is tracked by the `overlapped` function
+ /// provided.
+ ///
+ /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
+ /// `n` is the number of bytes written. If an asynchronous operation is
+ /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
+ /// it is returned.
+ ///
+ /// When this operation completes (or if it completes immediately), another
+ /// mechanism must be used to learn how many bytes were transferred (such as
+ /// looking at the filed in the IOCP status message).
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe because the kernel requires that the `buf` and
+ /// `overlapped` pointers to be valid until the end of the I/O operation.
+ /// The kernel also requires that `overlapped` is unique for this I/O
+ /// operation and is not in use for any other I/O.
+ ///
+ /// To safely use this function callers must ensure that the pointers are
+ /// valid until the I/O operation is completed, typically via completion
+ /// ports and waiting to receive the completion notification on the port.
+ pub unsafe fn write_overlapped(&self,
+ buf: &[u8],
+ overlapped: *mut OVERLAPPED)
+ -> io::Result<Option<usize>> {
+ self.0.write_overlapped(buf, overlapped)
+ }
+
+ /// Calls the `GetOverlappedResult` function to get the result of an
+ /// overlapped operation for this handle.
+ ///
+ /// This function takes the `OVERLAPPED` argument which must have been used
+ /// to initiate an overlapped I/O operation, and returns either the
+ /// successful number of bytes transferred during the operation or an error
+ /// if one occurred.
+ ///
+ /// # Unsafety
+ ///
+ /// This function is unsafe as `overlapped` must have previously been used
+ /// to execute an operation for this handle, and it must also be a valid
+ /// pointer to an `Overlapped` instance.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic
+ pub unsafe fn result(&self, overlapped: *mut OVERLAPPED)
+ -> io::Result<usize> {
+ let mut transferred = 0;
+ let r = GetOverlappedResult(self.0.raw(),
+ overlapped,
+ &mut transferred,
+ FALSE);
+ if r == 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(transferred as usize)
+ }
+ }
+}
+
+thread_local! {
+ static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
+}
+
+/// Call a function with a threadlocal `Overlapped`. The function `f` should be
+/// sure that the event is reset, either manually or by a thread being released.
+fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
+ where F: FnOnce(&Overlapped) -> io::Result<usize>
+{
+ NAMED_PIPE_OVERLAPPED.with(|overlapped| {
+ let mut mborrow = overlapped.borrow_mut();
+ if let None = *mborrow {
+ let op = Overlapped::initialize_with_autoreset_event()?;
+ *mborrow = Some(op);
+ }
+ f(mborrow.as_ref().unwrap())
+ })
+}
+
+impl Read for NamedPipe {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
+ with_threadlocal_overlapped(|overlapped| unsafe {
+ self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
+ })
+ }
+}
+impl<'a> Read for &'a NamedPipe {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
+ with_threadlocal_overlapped(|overlapped| unsafe {
+ self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
+ })
+ }
+}
+
+impl Write for NamedPipe {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
+ with_threadlocal_overlapped(|overlapped| unsafe {
+ self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
+ })
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ <&NamedPipe as Write>::flush(&mut &*self)
+ }
+}
+impl<'a> Write for &'a NamedPipe {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
+ with_threadlocal_overlapped(|overlapped| unsafe {
+ self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
+ })
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ ::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
+ }
+}
+
+impl AsRawHandle for NamedPipe {
+ fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
+}
+impl FromRawHandle for NamedPipe {
+ unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
+ NamedPipe(Handle::new(handle))
+ }
+}
+impl IntoRawHandle for NamedPipe {
+ fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
+}
+
+fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
+ if on {
+ *slot |= val;
+ } else {
+ *slot &= !val;
+ }
+}
+
+impl NamedPipeBuilder {
+ /// Creates a new named pipe builder with the default settings.
+ pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
+ NamedPipeBuilder {
+ name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
+ dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE |
+ FILE_FLAG_OVERLAPPED,
+ dwPipeMode: PIPE_TYPE_BYTE,
+ nMaxInstances: PIPE_UNLIMITED_INSTANCES,
+ nOutBufferSize: 65536,
+ nInBufferSize: 65536,
+ nDefaultTimeOut: 0,
+ }
+ }
+
+ /// Indicates whether data is allowed to flow from the client to the server.
+ pub fn inbound(&mut self, allowed: bool) -> &mut Self {
+ flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
+ self
+ }
+
+ /// Indicates whether data is allowed to flow from the server to the client.
+ pub fn outbound(&mut self, allowed: bool) -> &mut Self {
+ flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
+ self
+ }
+
+ /// Indicates that this pipe must be the first instance.
+ ///
+ /// If set to true, then creation will fail if there's already an instance
+ /// elsewhere.
+ pub fn first(&mut self, first: bool) -> &mut Self {
+ flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
+ self
+ }
+
+ /// Indicates whether this server can accept remote clients or not.
+ pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
+ flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
+ self
+ }
+
+ /// Specifies the maximum number of instances of the server pipe that are
+ /// allowed.
+ ///
+ /// The first instance of a pipe can specify this value. A value of 255
+ /// indicates that there is no limit to the number of instances.
+ pub fn max_instances(&mut self, instances: u8) -> &mut Self {
+ self.nMaxInstances = instances as DWORD;
+ self
+ }
+
+ /// Specifies the number of bytes to reserver for the output buffer
+ pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
+ self.nOutBufferSize = buffer as DWORD;
+ self
+ }
+
+ /// Specifies the number of bytes to reserver for the input buffer
+ pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
+ self.nInBufferSize = buffer as DWORD;
+ self
+ }
+
+ /// Using the options in this builder, attempt to create a new named pipe.
+ ///
+ /// This function will call the `CreateNamedPipe` function and return the
+ /// result.
+ pub fn create(&mut self) -> io::Result<NamedPipe> {
+ unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
+ }
+
+ /// Using the options in the builder and the provided security attributes, attempt to create a
+ /// new named pipe. This function has to be called with a valid pointer to a
+ /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
+ /// null pointer.
+ ///
+ /// This function will call the `CreateNamedPipe` function and return the
+ /// result.
+ pub unsafe fn with_security_attributes(&mut self, attrs: *mut SECURITY_ATTRIBUTES) -> io::Result<NamedPipe> {
+ let h = CreateNamedPipeW(self.name.as_ptr(),
+ self.dwOpenMode, self.dwPipeMode,
+ self.nMaxInstances, self.nOutBufferSize,
+ self.nInBufferSize, self.nDefaultTimeOut,
+ attrs);
+
+ if h == INVALID_HANDLE_VALUE {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(NamedPipe(Handle::new(h)))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::fs::{File, OpenOptions};
+ use std::io::prelude::*;
+ use std::sync::mpsc::channel;
+ use std::thread;
+ use std::time::Duration;
+
+ use rand::{thread_rng, Rng};
+
+ use super::{anonymous, NamedPipe, NamedPipeBuilder};
+ use iocp::CompletionPort;
+ use Overlapped;
+
+ fn name() -> String {
+ let name = thread_rng().gen_ascii_chars().take(30).collect::<String>();
+ format!(r"\\.\pipe\{}", name)
+ }
+
+ #[test]
+ fn anon() {
+ let (mut read, mut write) = t!(anonymous(256));
+ assert_eq!(t!(write.write(&[1, 2, 3])), 3);
+ let mut b = [0; 10];
+ assert_eq!(t!(read.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ }
+
+ #[test]
+ fn named_not_first() {
+ let name = name();
+ let _a = t!(NamedPipe::new(&name));
+ assert!(NamedPipe::new(&name).is_err());
+
+ t!(NamedPipeBuilder::new(&name).first(false).create());
+ }
+
+ #[test]
+ fn named_connect() {
+ let name = name();
+ let a = t!(NamedPipe::new(&name));
+
+ let t = thread::spawn(move || {
+ t!(File::open(name));
+ });
+
+ t!(a.connect());
+ t!(a.disconnect());
+ t!(t.join());
+ }
+
+ #[test]
+ fn named_wait() {
+ let name = name();
+ let a = t!(NamedPipe::new(&name));
+
+ let (tx, rx) = channel();
+ let t = thread::spawn(move || {
+ t!(NamedPipe::wait(&name, None));
+ t!(File::open(&name));
+ assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
+ t!(tx.send(()));
+ });
+
+ t!(a.connect());
+ t!(rx.recv());
+ t!(a.disconnect());
+ t!(t.join());
+ }
+
+ #[test]
+ fn named_connect_overlapped() {
+ let name = name();
+ let a = t!(NamedPipe::new(&name));
+
+ let t = thread::spawn(move || {
+ t!(File::open(name));
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_handle(2, &a));
+
+ let over = Overlapped::zero();
+ unsafe {
+ t!(a.connect_overlapped(over.raw()));
+ }
+
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 0);
+ assert_eq!(status.token(), 2);
+ assert_eq!(status.overlapped(), over.raw());
+ t!(t.join());
+ }
+
+ #[test]
+ fn named_read_write() {
+ let name = name();
+ let mut a = t!(NamedPipe::new(&name));
+
+ let t = thread::spawn(move || {
+ let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
+ t!(f.write_all(&[1, 2, 3]));
+ let mut b = [0; 10];
+ assert_eq!(t!(f.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ });
+
+ t!(a.connect());
+ let mut b = [0; 10];
+ assert_eq!(t!(a.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ t!(a.write_all(&[1, 2, 3]));
+ t!(a.flush());
+ t!(a.disconnect());
+ t!(t.join());
+ }
+
+ #[test]
+ fn named_read_write_multi() {
+ for _ in 0..5 {
+ named_read_write()
+ }
+ }
+
+ #[test]
+ fn named_read_write_multi_same_thread() {
+ let name1 = name();
+ let mut a1 = t!(NamedPipe::new(&name1));
+ let name2 = name();
+ let mut a2 = t!(NamedPipe::new(&name2));
+
+ let t = thread::spawn(move || {
+ let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
+ t!(f.write_all(&[1, 2, 3]));
+ let mut b = [0; 10];
+ assert_eq!(t!(f.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+
+ let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
+ t!(f.write_all(&[1, 2, 3]));
+ let mut b = [0; 10];
+ assert_eq!(t!(f.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ });
+
+ t!(a1.connect());
+ let mut b = [0; 10];
+ assert_eq!(t!(a1.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ t!(a1.write_all(&[1, 2, 3]));
+ t!(a1.flush());
+ t!(a1.disconnect());
+
+ t!(a2.connect());
+ let mut b = [0; 10];
+ assert_eq!(t!(a2.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3]);
+ t!(a2.write_all(&[1, 2, 3]));
+ t!(a2.flush());
+ t!(a2.disconnect());
+
+ t!(t.join());
+ }
+
+ #[test]
+ fn named_read_overlapped() {
+ let name = name();
+ let a = t!(NamedPipe::new(&name));
+
+ let t = thread::spawn(move || {
+ let mut f = t!(File::create(name));
+ t!(f.write_all(&[1, 2, 3]));
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_handle(3, &a));
+ t!(a.connect());
+
+ let mut b = [0; 10];
+ let over = Overlapped::zero();
+ unsafe {
+ t!(a.read_overlapped(&mut b, over.raw()));
+ }
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 3);
+ assert_eq!(status.overlapped(), over.raw());
+ assert_eq!(&b[..3], &[1, 2, 3]);
+
+ t!(t.join());
+ }
+
+ #[test]
+ fn named_write_overlapped() {
+ let name = name();
+ let a = t!(NamedPipe::new(&name));
+
+ let t = thread::spawn(move || {
+ let mut f = t!(super::connect(name));
+ let mut b = [0; 10];
+ assert_eq!(t!(f.read(&mut b)), 3);
+ assert_eq!(&b[..3], &[1, 2, 3])
+ });
+
+ let cp = t!(CompletionPort::new(1));
+ t!(cp.add_handle(3, &a));
+ t!(a.connect());
+
+ let over = Overlapped::zero();
+ unsafe {
+ t!(a.write_overlapped(&[1, 2, 3], over.raw()));
+ }
+
+ let status = t!(cp.get(None));
+ assert_eq!(status.bytes_transferred(), 3);
+ assert_eq!(status.token(), 3);
+ assert_eq!(status.overlapped(), over.raw());
+
+ t!(t.join());
+ }
+}