diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/miow/src | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/miow/src')
-rw-r--r-- | third_party/rust/miow/src/handle.rs | 164 | ||||
-rw-r--r-- | third_party/rust/miow/src/iocp.rs | 324 | ||||
-rw-r--r-- | third_party/rust/miow/src/lib.rs | 57 | ||||
-rw-r--r-- | third_party/rust/miow/src/net.rs | 1140 | ||||
-rw-r--r-- | third_party/rust/miow/src/overlapped.rs | 95 | ||||
-rw-r--r-- | third_party/rust/miow/src/pipe.rs | 716 |
6 files changed, 2496 insertions, 0 deletions
diff --git a/third_party/rust/miow/src/handle.rs b/third_party/rust/miow/src/handle.rs new file mode 100644 index 0000000000..809c1c6780 --- /dev/null +++ b/third_party/rust/miow/src/handle.rs @@ -0,0 +1,164 @@ +use std::io; +use std::cmp; +use std::ptr; + +use winapi::shared::minwindef::*; +use winapi::shared::ntdef::{ + BOOLEAN, + FALSE, + HANDLE, + TRUE, +}; +use winapi::shared::winerror::*; +use winapi::um::fileapi::*; +use winapi::um::handleapi::*; +use winapi::um::ioapiset::*; +use winapi::um::minwinbase::*; + +#[derive(Debug)] +pub struct Handle(HANDLE); + +unsafe impl Send for Handle {} +unsafe impl Sync for Handle {} + +impl Handle { + pub fn new(handle: HANDLE) -> Handle { + Handle(handle) + } + + pub fn raw(&self) -> HANDLE { self.0 } + + pub fn into_raw(self) -> HANDLE { + use std::mem; + + let ret = self.0; + mem::forget(self); + ret + } + + pub fn write(&self, buf: &[u8]) -> io::Result<usize> { + let mut bytes = 0; + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + try!(::cvt(unsafe { + WriteFile(self.0, buf.as_ptr() as *const _, len, &mut bytes, + 0 as *mut _) + })); + Ok(bytes as usize) + } + + pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { + let mut bytes = 0; + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + try!(::cvt(unsafe { + ReadFile(self.0, buf.as_mut_ptr() as *mut _, len, &mut bytes, + 0 as *mut _) + })); + Ok(bytes as usize) + } + + pub unsafe fn read_overlapped(&self, buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + self.read_overlapped_helper(buf, overlapped, FALSE) + } + + pub unsafe fn read_overlapped_wait(&self, buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<usize> { + match self.read_overlapped_helper(buf, overlapped, TRUE) { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => panic!("logic error"), + Err(e) => Err(e), + } + } + + pub unsafe fn read_overlapped_helper(&self, buf: &mut [u8], + overlapped: *mut OVERLAPPED, + wait: BOOLEAN) + -> io::Result<Option<usize>> { + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + let res = ::cvt({ + ReadFile(self.0, + buf.as_mut_ptr() as *mut _, + len, + ptr::null_mut(), + overlapped) + }); + match res { + Ok(_) => (), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) + => (), + Err(e) => return Err(e), + } + + let mut bytes = 0; + let res = ::cvt({ + GetOverlappedResult(self.0, + overlapped, + &mut bytes, + wait as BOOL) + }); + match res { + Ok(_) => Ok(Some(bytes as usize)), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE + => Ok(None), + Err(e) => Err(e), + } + } + + pub unsafe fn write_overlapped(&self, buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + self.write_overlapped_helper(buf, overlapped, FALSE) + } + + pub unsafe fn write_overlapped_wait(&self, buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<usize> { + match self.write_overlapped_helper(buf, overlapped, TRUE) { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => panic!("logic error"), + Err(e) => Err(e), + } + } + + unsafe fn write_overlapped_helper(&self, buf: &[u8], + overlapped: *mut OVERLAPPED, + wait: BOOLEAN) + -> io::Result<Option<usize>> { + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + let res = ::cvt({ + WriteFile(self.0, + buf.as_ptr() as *const _, + len, + ptr::null_mut(), + overlapped) + }); + match res { + Ok(_) => (), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) + => (), + Err(e) => return Err(e), + } + + let mut bytes = 0; + let res = ::cvt({ + GetOverlappedResult(self.0, + overlapped, + &mut bytes, + wait as BOOL) + }); + match res { + Ok(_) => Ok(Some(bytes as usize)), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE + => Ok(None), + Err(e) => Err(e), + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + unsafe { CloseHandle(self.0) }; + } +} diff --git a/third_party/rust/miow/src/iocp.rs b/third_party/rust/miow/src/iocp.rs new file mode 100644 index 0000000000..1404be9aab --- /dev/null +++ b/third_party/rust/miow/src/iocp.rs @@ -0,0 +1,324 @@ +//! Bindings to IOCP, I/O Completion Ports + +use std::cmp; +use std::fmt; +use std::io; +use std::mem; +use std::os::windows::io::*; +use std::time::Duration; + +use handle::Handle; +use winapi::shared::basetsd::*; +use winapi::shared::ntdef::*; +use winapi::um::minwinbase::*; +use winapi::um::handleapi::*; +use winapi::um::ioapiset::*; +use Overlapped; + +/// A handle to an Windows I/O Completion Port. +#[derive(Debug)] +pub struct CompletionPort { + handle: Handle, +} + +/// A status message received from an I/O completion port. +/// +/// These statuses can be created via the `new` or `empty` constructors and then +/// provided to a completion port, or they are read out of a completion port. +/// The fields of each status are read through its accessor methods. +#[derive(Clone, Copy)] +pub struct CompletionStatus(OVERLAPPED_ENTRY); + +impl fmt::Debug for CompletionStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "CompletionStatus(OVERLAPPED_ENTRY)") + } +} + +unsafe impl Send for CompletionStatus {} +unsafe impl Sync for CompletionStatus {} + +impl CompletionPort { + /// Creates a new I/O completion port with the specified concurrency value. + /// + /// The number of threads given corresponds to the level of concurrency + /// allowed for threads associated with this port. Consult the Windows + /// documentation for more information about this value. + pub fn new(threads: u32) -> io::Result<CompletionPort> { + let ret = unsafe { + CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _, + 0, threads) + }; + if ret.is_null() { + Err(io::Error::last_os_error()) + } else { + Ok(CompletionPort { handle: Handle::new(ret) }) + } + } + + /// Associates a new `HANDLE` to this I/O completion port. + /// + /// This function will associate the given handle to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `HANDLE` via the `AsRawHandle` + /// trait can be provided to this function, such as `std::fs::File` and + /// friends. + pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, + t: &T) -> io::Result<()> { + self._add(token, t.as_raw_handle()) + } + + /// Associates a new `SOCKET` to this I/O completion port. + /// + /// This function will associate the given socket to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `SOCKET` via the `AsRawSocket` + /// trait can be provided to this function, such as `std::net::TcpStream` + /// and friends. + pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, + t: &T) -> io::Result<()> { + self._add(token, t.as_raw_socket() as HANDLE) + } + + fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> { + assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>()); + let ret = unsafe { + CreateIoCompletionPort(handle, self.handle.raw(), + token as ULONG_PTR, 0) + }; + if ret.is_null() { + Err(io::Error::last_os_error()) + } else { + debug_assert_eq!(ret, self.handle.raw()); + Ok(()) + } + } + + /// Dequeue a completion status from this I/O completion port. + /// + /// This function will associate the calling thread with this completion + /// port and then wait for a status message to become available. The precise + /// semantics on when this function returns depends on the concurrency value + /// specified when the port was created. + /// + /// A timeout can optionally be specified to this function. If `None` is + /// provided this function will not time out, and otherwise it will time out + /// after the specified duration has passed. + /// + /// On success this will return the status message which was dequeued from + /// this completion port. + pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> { + let mut bytes = 0; + let mut token = 0; + let mut overlapped = 0 as *mut _; + let timeout = ::dur2ms(timeout); + let ret = unsafe { + GetQueuedCompletionStatus(self.handle.raw(), + &mut bytes, + &mut token, + &mut overlapped, + timeout) + }; + ::cvt(ret).map(|_| { + CompletionStatus(OVERLAPPED_ENTRY { + dwNumberOfBytesTransferred: bytes, + lpCompletionKey: token, + lpOverlapped: overlapped, + Internal: 0, + }) + }) + } + + /// Dequeues a number of completion statuses from this I/O completion port. + /// + /// This function is the same as `get` except that it may return more than + /// one status. A buffer of "zero" statuses is provided (the contents are + /// not read) and then on success this function will return a sub-slice of + /// statuses which represent those which were dequeued from this port. This + /// function does not wait to fill up the entire list of statuses provided. + /// + /// Like with `get`, a timeout may be specified for this operation. + pub fn get_many<'a>(&self, + list: &'a mut [CompletionStatus], + timeout: Option<Duration>) + -> io::Result<&'a mut [CompletionStatus]> + { + debug_assert_eq!(mem::size_of::<CompletionStatus>(), + mem::size_of::<OVERLAPPED_ENTRY>()); + let mut removed = 0; + let timeout = ::dur2ms(timeout); + let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG; + let ret = unsafe { + GetQueuedCompletionStatusEx(self.handle.raw(), + list.as_ptr() as *mut _, + len, + &mut removed, + timeout, + FALSE as i32) + }; + match ::cvt(ret) { + Ok(_) => Ok(&mut list[..removed as usize]), + Err(e) => Err(e), + } + } + + /// Posts a new completion status onto this I/O completion port. + /// + /// This function will post the given status, with custom parameters, to the + /// port. Threads blocked in `get` or `get_many` will eventually receive + /// this status. + pub fn post(&self, status: CompletionStatus) -> io::Result<()> { + let ret = unsafe { + PostQueuedCompletionStatus(self.handle.raw(), + status.0.dwNumberOfBytesTransferred, + status.0.lpCompletionKey, + status.0.lpOverlapped) + }; + ::cvt(ret).map(|_| ()) + } +} + +impl AsRawHandle for CompletionPort { + fn as_raw_handle(&self) -> HANDLE { + self.handle.raw() + } +} + +impl FromRawHandle for CompletionPort { + unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort { + CompletionPort { handle: Handle::new(handle) } + } +} + +impl IntoRawHandle for CompletionPort { + fn into_raw_handle(self) -> HANDLE { + self.handle.into_raw() + } +} + +impl CompletionStatus { + /// Creates a new completion status with the provided parameters. + /// + /// This function is useful when creating a status to send to a port with + /// the `post` method. The parameters are opaquely passed through and not + /// interpreted by the system at all. + pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) + -> CompletionStatus { + assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>()); + CompletionStatus(OVERLAPPED_ENTRY { + dwNumberOfBytesTransferred: bytes, + lpCompletionKey: token as ULONG_PTR, + lpOverlapped: overlapped as *mut _, + Internal: 0, + }) + } + + /// Creates a new borrowed completion status from the borrowed + /// `OVERLAPPED_ENTRY` argument provided. + /// + /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`, + /// returning the wrapped structure. + pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus { + unsafe { &*(entry as *const _ as *const _) } + } + + /// Creates a new "zero" completion status. + /// + /// This function is useful when creating a stack buffer or vector of + /// completion statuses to be passed to the `get_many` function. + pub fn zero() -> CompletionStatus { + CompletionStatus::new(0, 0, 0 as *mut _) + } + + /// Returns the number of bytes that were transferred for the I/O operation + /// associated with this completion status. + pub fn bytes_transferred(&self) -> u32 { + self.0.dwNumberOfBytesTransferred + } + + /// Returns the completion key value associated with the file handle whose + /// I/O operation has completed. + /// + /// A completion key is a per-handle key that is specified when it is added + /// to an I/O completion port via `add_handle` or `add_socket`. + pub fn token(&self) -> usize { + self.0.lpCompletionKey as usize + } + + /// Returns a pointer to the `Overlapped` structure that was specified when + /// the I/O operation was started. + pub fn overlapped(&self) -> *mut OVERLAPPED { + self.0.lpOverlapped + } + + /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object. + pub fn entry(&self) -> &OVERLAPPED_ENTRY { + &self.0 + } +} + +#[cfg(test)] +mod tests { + use std::mem; + use std::time::Duration; + + use winapi::shared::basetsd::*; + use winapi::shared::winerror::*; + + use iocp::{CompletionPort, CompletionStatus}; + + #[test] + fn is_send_sync() { + fn is_send_sync<T: Send + Sync>() {} + is_send_sync::<CompletionPort>(); + } + + #[test] + fn token_right_size() { + assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>()); + } + + #[test] + fn timeout() { + let c = CompletionPort::new(1).unwrap(); + let err = c.get(Some(Duration::from_millis(1))).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32)); + } + + #[test] + fn get() { + let c = CompletionPort::new(1).unwrap(); + c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap(); + let s = c.get(None).unwrap(); + assert_eq!(s.bytes_transferred(), 1); + assert_eq!(s.token(), 2); + assert_eq!(s.overlapped(), 3 as *mut _); + } + + #[test] + fn get_many() { + let c = CompletionPort::new(1).unwrap(); + + c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap(); + c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap(); + + let mut s = vec![CompletionStatus::zero(); 4]; + { + let s = c.get_many(&mut s, None).unwrap(); + assert_eq!(s.len(), 2); + assert_eq!(s[0].bytes_transferred(), 1); + assert_eq!(s[0].token(), 2); + assert_eq!(s[0].overlapped(), 3 as *mut _); + assert_eq!(s[1].bytes_transferred(), 4); + assert_eq!(s[1].token(), 5); + assert_eq!(s[1].overlapped(), 6 as *mut _); + } + assert_eq!(s[2].bytes_transferred(), 0); + assert_eq!(s[2].token(), 0); + assert_eq!(s[2].overlapped(), 0 as *mut _); + } +} diff --git a/third_party/rust/miow/src/lib.rs b/third_party/rust/miow/src/lib.rs new file mode 100644 index 0000000000..a0d53113c5 --- /dev/null +++ b/third_party/rust/miow/src/lib.rs @@ -0,0 +1,57 @@ +//! A zero overhead Windows I/O library + +#![cfg(windows)] +#![deny(missing_docs)] +#![allow(bad_style)] +#![doc(html_root_url = "https://docs.rs/miow/0.3/x86_64-pc-windows-msvc/")] + +extern crate socket2; +extern crate winapi; + +#[cfg(test)] extern crate rand; + +use std::cmp; +use std::io; +use std::time::Duration; + +use winapi::shared::minwindef::*; +use winapi::um::winbase::*; + +#[cfg(test)] +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +mod handle; +mod overlapped; + +pub mod iocp; +pub mod net; +pub mod pipe; + +pub use overlapped::Overlapped; + +fn cvt(i: BOOL) -> io::Result<BOOL> { + if i == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(i) + } +} + +fn dur2ms(dur: Option<Duration>) -> u32 { + let dur = match dur { + Some(dur) => dur, + None => return INFINITE, + }; + let ms = dur.as_secs().checked_mul(1_000); + let ms_extra = dur.subsec_nanos() / 1_000_000; + ms.and_then(|ms| { + ms.checked_add(ms_extra as u64) + }).map(|ms| { + cmp::min(u32::max_value() as u64, ms) as u32 + }).unwrap_or(INFINITE - 1) +} diff --git a/third_party/rust/miow/src/net.rs b/third_party/rust/miow/src/net.rs new file mode 100644 index 0000000000..37ca51bead --- /dev/null +++ b/third_party/rust/miow/src/net.rs @@ -0,0 +1,1140 @@ +//! Extensions and types for the standard networking primitives. +//! +//! This module contains a number of extension traits for the types in +//! `std::net` for Windows-specific functionality. + +use std::cmp; +use std::io; +use std::mem; +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::net::{TcpStream, UdpSocket, SocketAddr, TcpListener}; +use std::net::{SocketAddrV4, Ipv4Addr, SocketAddrV6, Ipv6Addr}; +use std::os::windows::prelude::*; + +use winapi::ctypes::*; +use winapi::shared::guiddef::*; +use winapi::shared::minwindef::*; +use winapi::shared::minwindef::{FALSE, TRUE}; +use winapi::shared::ntdef::*; +use winapi::shared::ws2def::*; +use winapi::shared::ws2def::SOL_SOCKET; +use winapi::shared::ws2ipdef::*; +use winapi::um::minwinbase::*; +use winapi::um::winsock2::*; + +/// A type to represent a buffer in which a socket address will be stored. +/// +/// This type is used with the `recv_from_overlapped` function on the +/// `UdpSocketExt` trait to provide space for the overlapped I/O operation to +/// fill in the address upon completion. +#[derive(Clone, Copy)] +pub struct SocketAddrBuf { + buf: SOCKADDR_STORAGE, + len: c_int, +} + +/// A type to represent a buffer in which an accepted socket's address will be +/// stored. +/// +/// This type is used with the `accept_overlapped` method on the +/// `TcpListenerExt` trait to provide space for the overlapped I/O operation to +/// fill in the socket addresses upon completion. +#[repr(C)] +pub struct AcceptAddrsBuf { + // For AcceptEx we've got the restriction that the addresses passed in that + // buffer need to be at least 16 bytes more than the maximum address length + // for the protocol in question, so add some extra here and there + local: SOCKADDR_STORAGE, + _pad1: [u8; 16], + remote: SOCKADDR_STORAGE, + _pad2: [u8; 16], +} + +/// The parsed return value of `AcceptAddrsBuf`. +pub struct AcceptAddrs<'a> { + local: LPSOCKADDR, + local_len: c_int, + remote: LPSOCKADDR, + remote_len: c_int, + _data: &'a AcceptAddrsBuf, +} + +struct WsaExtension { + guid: GUID, + val: AtomicUsize, +} + +/// Additional methods for the `TcpStream` type in the standard library. +pub trait TcpStreamExt { + /// Execute an overlapped read I/O operation on this TCP stream. + /// + /// This function will issue an overlapped I/O read (via `WSARecv`) on this + /// socket. The provided buffer will be filled in when the operation + /// completes and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned indicating how + /// many bytes were read. If the operation returns an error indicating that + /// the I/O is currently pending, `Ok(None)` is returned. Otherwise, the + /// error associated with the operation is returned and no overlapped + /// operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Execute an overlapped write I/O operation on this TCP stream. + /// + /// This function will issue an overlapped I/O write (via `WSASend`) on this + /// socket. The provided buffer will be written when the operation completes + /// and the given `OVERLAPPED` instance is used to track the overlapped + /// operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were written. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn write_overlapped(&self, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Attempt to consume the internal socket in this builder by executing an + /// overlapped connect operation. + /// + /// This function will issue a connect operation to the address specified on + /// the underlying socket, flagging it as an overlapped operation which will + /// complete asynchronously. If successful this function will return the + /// corresponding TCP stream. + /// + /// The `buf` argument provided is an initial buffer of data that should be + /// sent after the connection is initiated. It's acceptable to + /// pass an empty slice here. + /// + /// This function will also return whether the connect immediately + /// succeeded or not. If `None` is returned then the I/O operation is still + /// pending and will complete at a later date, and if `Some(bytes)` is + /// returned then that many bytes were transferred. + /// + /// Note that to succeed this requires that the underlying socket has + /// previously been bound via a call to `bind` to a local address. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` and `buf` pointers to be valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for + /// this I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + unsafe fn connect_overlapped(&self, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Once a `connect_overlapped` has finished, this function needs to be + /// called to finish the connect operation. + /// + /// Currently this just calls `setsockopt` with `SO_UPDATE_CONNECT_CONTEXT` + /// to ensure that further functions like `getpeername` and `getsockname` + /// work correctly. + fn connect_complete(&self) -> io::Result<()>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)>; +} + +/// Additional methods for the `UdpSocket` type in the standard library. +pub trait UdpSocketExt { + /// Execute an overlapped receive I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O read (via `WSARecvFrom`) on + /// this socket. The provided buffer will be filled in when the operation + /// completes, the source from where the data came from will be written to + /// `addr`, and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were read. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf`, + /// `addr`, and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn recv_from_overlapped(&self, + buf: &mut [u8], + addr: *mut SocketAddrBuf, + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Execute an overlapped receive I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O read (via `WSARecv`) on + /// this socket. The provided buffer will be filled in when the operation + /// completes, the source from where the data came from will be written to + /// `addr`, and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were read. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf`, + /// and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn recv_overlapped(&self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Execute an overlapped send I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O write (via `WSASendTo`) on + /// this socket to the address specified by `addr`. The provided buffer will + /// be written when the operation completes and the given `OVERLAPPED` + /// instance is used to track the overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte + /// were written. If the operation returns an error indicating that the I/O + /// is currently pending, `Ok(None)` is returned. Otherwise, the error + /// associated with the operation is returned and no overlapped operation + /// is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn send_to_overlapped(&self, + buf: &[u8], + addr: &SocketAddr, + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Execute an overlapped send I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O write (via `WSASend`) on + /// this socket to the address it was previously connected to. The provided + /// buffer will be written when the operation completes and the given `OVERLAPPED` + /// instance is used to track the overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte + /// were written. If the operation returns an error indicating that the I/O + /// is currently pending, `Ok(None)` is returned. Otherwise, the error + /// associated with the operation is returned and no overlapped operation + /// is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn send_overlapped(&self, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)>; +} + +/// Additional methods for the `TcpListener` type in the standard library. +pub trait TcpListenerExt { + /// Perform an accept operation on this listener, accepting a connection in + /// an overlapped fashion. + /// + /// This function will issue an I/O request to accept an incoming connection + /// with the specified overlapped instance. The `socket` provided must be a + /// configured but not bound or connected socket, and if successful this + /// will consume the internal socket of the builder to return a TCP stream. + /// + /// The `addrs` buffer provided will be filled in with the local and remote + /// addresses of the connection upon completion. + /// + /// If the accept succeeds immediately, `Ok(true)` is returned. If + /// the connect indicates that the I/O is currently pending, `Ok(false)` is + /// returned. Otherwise, the error associated with the operation is + /// returned and no overlapped operation is enqueued. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `addrs` and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + unsafe fn accept_overlapped(&self, + socket: &TcpStream, + addrs: &mut AcceptAddrsBuf, + overlapped: *mut OVERLAPPED) + -> io::Result<bool>; + + /// Once an `accept_overlapped` has finished, this function needs to be + /// called to finish the accept operation. + /// + /// Currently this just calls `setsockopt` with `SO_UPDATE_ACCEPT_CONTEXT` + /// to ensure that further functions like `getpeername` and `getsockname` + /// work correctly. + fn accept_complete(&self, socket: &TcpStream) -> io::Result<()>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)>; +} + +#[doc(hidden)] +trait NetInt { + fn from_be(i: Self) -> Self; + fn to_be(&self) -> Self; +} +macro_rules! doit { + ($($t:ident)*) => ($(impl NetInt for $t { + fn from_be(i: Self) -> Self { <$t>::from_be(i) } + fn to_be(&self) -> Self { <$t>::to_be(*self) } + })*) +} +doit! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize } + +// fn hton<I: NetInt>(i: I) -> I { i.to_be() } +fn ntoh<I: NetInt>(i: I) -> I { I::from_be(i) } + +fn last_err() -> io::Result<Option<usize>> { + let err = unsafe { WSAGetLastError() }; + if err == WSA_IO_PENDING as i32 { + Ok(None) + } else { + Err(io::Error::from_raw_os_error(err)) + } +} + +fn cvt(i: c_int, size: DWORD) -> io::Result<Option<usize>> { + if i == SOCKET_ERROR { + last_err() + } else { + Ok(Some(size as usize)) + } +} + +fn socket_addr_to_ptrs(addr: &SocketAddr) -> (*const SOCKADDR, c_int) { + match *addr { + SocketAddr::V4(ref a) => { + (a as *const _ as *const _, mem::size_of::<SOCKADDR_IN>() as c_int) + } + SocketAddr::V6(ref a) => { + (a as *const _ as *const _, mem::size_of::<SOCKADDR_IN6_LH>() as c_int) + } + } +} + +unsafe fn ptrs_to_socket_addr(ptr: *const SOCKADDR, + len: c_int) -> Option<SocketAddr> { + if (len as usize) < mem::size_of::<c_int>() { + return None + } + match (*ptr).sa_family as i32 { + AF_INET if len as usize >= mem::size_of::<SOCKADDR_IN>() => { + let b = &*(ptr as *const SOCKADDR_IN); + let ip = ntoh(*b.sin_addr.S_un.S_addr()); + let ip = Ipv4Addr::new((ip >> 24) as u8, + (ip >> 16) as u8, + (ip >> 8) as u8, + (ip >> 0) as u8); + Some(SocketAddr::V4(SocketAddrV4::new(ip, ntoh(b.sin_port)))) + } + AF_INET6 if len as usize >= mem::size_of::<SOCKADDR_IN6_LH>() => { + let b = &*(ptr as *const SOCKADDR_IN6_LH); + let arr = b.sin6_addr.u.Byte(); + let ip = Ipv6Addr::new( + ((arr[0] as u16) << 8) | (arr[1] as u16), + ((arr[2] as u16) << 8) | (arr[3] as u16), + ((arr[4] as u16) << 8) | (arr[5] as u16), + ((arr[6] as u16) << 8) | (arr[7] as u16), + ((arr[8] as u16) << 8) | (arr[9] as u16), + ((arr[10] as u16) << 8) | (arr[11] as u16), + ((arr[12] as u16) << 8) | (arr[13] as u16), + ((arr[14] as u16) << 8) | (arr[15] as u16)); + let addr = SocketAddrV6::new(ip, ntoh(b.sin6_port), + ntoh(b.sin6_flowinfo), + ntoh(*b.u.sin6_scope_id())); + Some(SocketAddr::V6(addr)) + } + _ => None + } +} + +unsafe fn slice2buf(slice: &[u8]) -> WSABUF { + WSABUF { + len: cmp::min(slice.len(), <u_long>::max_value() as usize) as u_long, + buf: slice.as_ptr() as *mut _, + } +} + +unsafe fn result(socket: SOCKET, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)> { + let mut transferred = 0; + let mut flags = 0; + let r = WSAGetOverlappedResult(socket, + overlapped, + &mut transferred, + FALSE, + &mut flags); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok((transferred as usize, flags)) + } +} + +impl TcpStreamExt for TcpStream { + unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut bytes_read: DWORD = 0; + let r = WSARecv(self.as_raw_socket() as SOCKET, &mut buf, 1, + &mut bytes_read, &mut flags, overlapped, None); + cvt(r, bytes_read) + } + + unsafe fn write_overlapped(&self, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut bytes_written = 0; + + // Note here that we capture the number of bytes written. The + // documentation on MSDN, however, states: + // + // > Use NULL for this parameter if the lpOverlapped parameter is not + // > NULL to avoid potentially erroneous results. This parameter can be + // > NULL only if the lpOverlapped parameter is not NULL. + // + // If we're not passing a null overlapped pointer here, then why are we + // then capturing the number of bytes! Well so it turns out that this is + // clearly faster to learn the bytes here rather than later calling + // `WSAGetOverlappedResult`, and in practice almost all implementations + // use this anyway [1]. + // + // As a result we use this to and report back the result. + // + // [1]: https://github.com/carllerche/mio/pull/520#issuecomment-273983823 + let r = WSASend(self.as_raw_socket() as SOCKET, &mut buf, 1, + &mut bytes_written, 0, overlapped, None); + cvt(r, bytes_written) + } + + unsafe fn connect_overlapped(&self, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + connect_overlapped(self.as_raw_socket() as SOCKET, addr, buf, overlapped) + } + + fn connect_complete(&self) -> io::Result<()> { + const SO_UPDATE_CONNECT_CONTEXT: c_int = 0x7010; + let result = unsafe { + setsockopt(self.as_raw_socket() as SOCKET, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + 0 as *const _, + 0) + }; + if result == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +unsafe fn connect_overlapped(socket: SOCKET, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + static CONNECTEX: WsaExtension = WsaExtension { + guid: GUID { + Data1: 0x25a207b9, + Data2: 0xddf3, + Data3: 0x4660, + Data4: [0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e], + }, + val: ATOMIC_USIZE_INIT, + }; + type ConnectEx = unsafe extern "system" fn(SOCKET, *const SOCKADDR, + c_int, PVOID, DWORD, LPDWORD, + LPOVERLAPPED) -> BOOL; + + let ptr = try!(CONNECTEX.get(socket)); + assert!(ptr != 0); + let connect_ex = mem::transmute::<_, ConnectEx>(ptr); + + let (addr_buf, addr_len) = socket_addr_to_ptrs(addr); + let mut bytes_sent: DWORD = 0; + let r = connect_ex(socket, addr_buf, addr_len, + buf.as_ptr() as *mut _, + buf.len() as u32, + &mut bytes_sent, overlapped); + if r == TRUE { + Ok(Some(bytes_sent as usize)) + } else { + last_err() + } +} + +impl UdpSocketExt for UdpSocket { + unsafe fn recv_from_overlapped(&self, + buf: &mut [u8], + addr: *mut SocketAddrBuf, + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut received_bytes: DWORD = 0; + let r = WSARecvFrom(self.as_raw_socket() as SOCKET, &mut buf, 1, + &mut received_bytes, &mut flags, + &mut (*addr).buf as *mut _ as *mut _, + &mut (*addr).len, + overlapped, None); + cvt(r, received_bytes) + } + + unsafe fn recv_overlapped(&self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut received_bytes: DWORD = 0; + let r = WSARecv(self.as_raw_socket() as SOCKET, &mut buf, 1, + &mut received_bytes, &mut flags, + overlapped, None); + cvt(r, received_bytes) + } + + unsafe fn send_to_overlapped(&self, + buf: &[u8], + addr: &SocketAddr, + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + let (addr_buf, addr_len) = socket_addr_to_ptrs(addr); + let mut buf = slice2buf(buf); + let mut sent_bytes = 0; + let r = WSASendTo(self.as_raw_socket() as SOCKET, &mut buf, 1, + &mut sent_bytes, 0, + addr_buf as *const _, addr_len, + overlapped, None); + cvt(r, sent_bytes) + } + + unsafe fn send_overlapped(&self, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut sent_bytes = 0; + let r = WSASend(self.as_raw_socket() as SOCKET, &mut buf, 1, + &mut sent_bytes, 0, + overlapped, None); + cvt(r, sent_bytes) + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +impl TcpListenerExt for TcpListener { + unsafe fn accept_overlapped(&self, + socket: &TcpStream, + addrs: &mut AcceptAddrsBuf, + overlapped: *mut OVERLAPPED) + -> io::Result<bool> { + static ACCEPTEX: WsaExtension = WsaExtension { + guid: GUID { + Data1: 0xb5367df1, + Data2: 0xcbac, + Data3: 0x11cf, + Data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92], + }, + val: ATOMIC_USIZE_INIT, + }; + type AcceptEx = unsafe extern "system" fn(SOCKET, SOCKET, PVOID, + DWORD, DWORD, DWORD, LPDWORD, + LPOVERLAPPED) -> BOOL; + + let ptr = try!(ACCEPTEX.get(self.as_raw_socket() as SOCKET)); + assert!(ptr != 0); + let accept_ex = mem::transmute::<_, AcceptEx>(ptr); + + let mut bytes = 0; + let (a, b, c, d) = (*addrs).args(); + let r = accept_ex(self.as_raw_socket() as SOCKET, socket.as_raw_socket() as SOCKET, + a, b, c, d, &mut bytes, overlapped); + let succeeded = if r == TRUE { + true + } else { + try!(last_err()); + false + }; + Ok(succeeded) + } + + fn accept_complete(&self, socket: &TcpStream) -> io::Result<()> { + const SO_UPDATE_ACCEPT_CONTEXT: c_int = 0x700B; + let me = self.as_raw_socket(); + let result = unsafe { + setsockopt(socket.as_raw_socket() as SOCKET, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + &me as *const _ as *const _, + mem::size_of_val(&me) as c_int) + }; + if result == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +impl SocketAddrBuf { + /// Creates a new blank socket address buffer. + /// + /// This should be used before a call to `recv_from_overlapped` overlapped + /// to create an instance to pass down. + pub fn new() -> SocketAddrBuf { + SocketAddrBuf { + buf: unsafe { mem::zeroed() }, + len: mem::size_of::<SOCKADDR_STORAGE>() as c_int, + } + } + + /// Parses this buffer to return a standard socket address. + /// + /// This function should be called after the buffer has been filled in with + /// a call to `recv_from_overlapped` being completed. It will interpret the + /// address filled in and return the standard socket address type. + /// + /// If an error is encountered then `None` is returned. + pub fn to_socket_addr(&self) -> Option<SocketAddr> { + unsafe { + ptrs_to_socket_addr(&self.buf as *const _ as *const _, self.len) + } + } +} + +static GETACCEPTEXSOCKADDRS: WsaExtension = WsaExtension { + guid: GUID { + Data1: 0xb5367df2, + Data2: 0xcbac, + Data3: 0x11cf, + Data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92], + }, + val: ATOMIC_USIZE_INIT, +}; +type GetAcceptExSockaddrs = unsafe extern "system" fn(PVOID, DWORD, DWORD, DWORD, + *mut LPSOCKADDR, LPINT, + *mut LPSOCKADDR, LPINT); + +impl AcceptAddrsBuf { + /// Creates a new blank buffer ready to be passed to a call to + /// `accept_overlapped`. + pub fn new() -> AcceptAddrsBuf { + unsafe { mem::zeroed() } + } + + /// Parses the data contained in this address buffer, returning the parsed + /// result if successful. + /// + /// This function can be called after a call to `accept_overlapped` has + /// succeeded to parse out the data that was written in. + pub fn parse(&self, socket: &TcpListener) -> io::Result<AcceptAddrs> { + let mut ret = AcceptAddrs { + local: 0 as *mut _, local_len: 0, + remote: 0 as *mut _, remote_len: 0, + _data: self, + }; + let ptr = try!(GETACCEPTEXSOCKADDRS.get(socket.as_raw_socket() as SOCKET)); + assert!(ptr != 0); + unsafe { + let get_sockaddrs = mem::transmute::<_, GetAcceptExSockaddrs>(ptr); + let (a, b, c, d) = self.args(); + get_sockaddrs(a, b, c, d, + &mut ret.local, &mut ret.local_len, + &mut ret.remote, &mut ret.remote_len); + Ok(ret) + } + } + + fn args(&self) -> (PVOID, DWORD, DWORD, DWORD) { + let remote_offset = unsafe { + &(*(0 as *const AcceptAddrsBuf)).remote as *const _ as usize + }; + (self as *const _ as *mut _, 0, remote_offset as DWORD, + (mem::size_of_val(self) - remote_offset) as DWORD) + } +} + +impl<'a> AcceptAddrs<'a> { + /// Returns the local socket address contained in this buffer. + pub fn local(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(self.local, self.local_len) } + } + + /// Returns the remote socket address contained in this buffer. + pub fn remote(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(self.remote, self.remote_len) } + } +} + +impl WsaExtension { + fn get(&self, socket: SOCKET) -> io::Result<usize> { + let prev = self.val.load(Ordering::SeqCst); + if prev != 0 && !cfg!(debug_assertions) { + return Ok(prev) + } + let mut ret = 0 as usize; + let mut bytes = 0; + let r = unsafe { + WSAIoctl(socket, SIO_GET_EXTENSION_FUNCTION_POINTER, + &self.guid as *const _ as *mut _, + mem::size_of_val(&self.guid) as DWORD, + &mut ret as *mut _ as *mut _, + mem::size_of_val(&ret) as DWORD, + &mut bytes, + 0 as *mut _, None) + }; + cvt(r, 0).map(|_| { + debug_assert_eq!(bytes as usize, mem::size_of_val(&ret)); + debug_assert!(prev == 0 || prev == ret); + self.val.store(ret, Ordering::SeqCst); + ret + }) + + } +} + +#[cfg(test)] +mod tests { + use std::net::{TcpListener, UdpSocket, TcpStream, SocketAddr}; + use std::thread; + use std::io::prelude::*; + + use socket2::{Socket, Type, Domain}; + + use Overlapped; + use iocp::CompletionPort; + use net::{TcpStreamExt, UdpSocketExt, SocketAddrBuf}; + use net::{TcpListenerExt, AcceptAddrsBuf}; + + fn each_ip(f: &mut FnMut(SocketAddr)) { + f(t!("127.0.0.1:0".parse())); + f(t!("[::1]:0".parse())); + } + + #[test] + fn tcp_read() { + each_ip(&mut |addr| { + let l = t!(TcpListener::bind(addr)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let mut a = t!(l.accept()).0; + t!(a.write_all(&[1, 2, 3])); + }); + + let cp = t!(CompletionPort::new(1)); + let s = t!(TcpStream::connect(addr)); + t!(cp.add_socket(1, &s)); + + let mut b = [0; 10]; + let a = Overlapped::zero(); + unsafe { + t!(s.read_overlapped(&mut b, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&b[0..3], &[1, 2, 3]); + + t!(t.join()); + }) + } + + #[test] + fn tcp_write() { + each_ip(&mut |addr| { + let l = t!(TcpListener::bind(addr)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let mut a = t!(l.accept()).0; + let mut b = [0; 10]; + let n = t!(a.read(&mut b)); + assert_eq!(n, 3); + assert_eq!(&b[0..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + let s = t!(TcpStream::connect(addr)); + t!(cp.add_socket(1, &s)); + + let b = [1, 2, 3]; + let a = Overlapped::zero(); + unsafe { + t!(s.write_overlapped(&b, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn tcp_connect() { + each_ip(&mut |addr_template| { + let l = t!(TcpListener::bind(addr_template)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + t!(l.accept()); + }); + + let cp = t!(CompletionPort::new(1)); + let domain = match addr { + SocketAddr::V4(..) => Domain::ipv4(), + SocketAddr::V6(..) => Domain::ipv6(), + }; + let socket = t!(Socket::new(domain, Type::stream(), None)); + t!(socket.bind(&addr_template.into())); + let socket = socket.into_tcp_stream(); + t!(cp.add_socket(1, &socket)); + + let a = Overlapped::zero(); + unsafe { + t!(socket.connect_overlapped(&addr, &[], a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + t!(socket.connect_complete()); + + t!(t.join()); + }) + } + + #[test] + fn udp_recv_from() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + let t = thread::spawn(move || { + t!(a.send_to(&[1, 2, 3], b_addr)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let mut buf = [0; 10]; + let a = Overlapped::zero(); + let mut addr = SocketAddrBuf::new(); + unsafe { + t!(b.recv_from_overlapped(&mut buf, &mut addr, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&buf[..3], &[1, 2, 3]); + assert_eq!(addr.to_socket_addr(), Some(a_addr)); + + t!(t.join()); + }) + } + + #[test] + fn udp_recv() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + assert!(b.connect(a_addr).is_ok()); + assert!(a.connect(b_addr).is_ok()); + let t = thread::spawn(move || { + t!(a.send_to(&[1, 2, 3], b_addr)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let mut buf = [0; 10]; + let a = Overlapped::zero(); + unsafe { + t!(b.recv_overlapped(&mut buf, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&buf[..3], &[1, 2, 3]); + + t!(t.join()); + }) + } + + #[test] + fn udp_send_to() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + let t = thread::spawn(move || { + let mut b = [0; 100]; + let (n, addr) = t!(a.recv_from(&mut b)); + assert_eq!(n, 3); + assert_eq!(addr, b_addr); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let a = Overlapped::zero(); + unsafe { + t!(b.send_to_overlapped(&[1, 2, 3], &a_addr, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn udp_send() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + assert!(b.connect(a_addr).is_ok()); + assert!(a.connect(b_addr).is_ok()); + let t = thread::spawn(move || { + let mut b = [0; 100]; + let (n, addr) = t!(a.recv_from(&mut b)); + assert_eq!(n, 3); + assert_eq!(addr, b_addr); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let a = Overlapped::zero(); + unsafe { + t!(b.send_overlapped(&[1, 2, 3], a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn tcp_accept() { + each_ip(&mut |addr_template| { + let l = t!(TcpListener::bind(addr_template)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let socket = t!(TcpStream::connect(addr)); + (socket.local_addr().unwrap(), socket.peer_addr().unwrap()) + }); + + let cp = t!(CompletionPort::new(1)); + let domain = match addr { + SocketAddr::V4(..) => Domain::ipv4(), + SocketAddr::V6(..) => Domain::ipv6(), + }; + let socket = t!(Socket::new(domain, Type::stream(), None)) + .into_tcp_stream(); + t!(cp.add_socket(1, &l)); + + let a = Overlapped::zero(); + let mut addrs = AcceptAddrsBuf::new(); + unsafe { + t!(l.accept_overlapped(&socket, &mut addrs, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + t!(l.accept_complete(&socket)); + + let (remote, local) = t!(t.join()); + let addrs = addrs.parse(&l).unwrap(); + assert_eq!(addrs.local(), Some(local)); + assert_eq!(addrs.remote(), Some(remote)); + }) + } +} diff --git a/third_party/rust/miow/src/overlapped.rs b/third_party/rust/miow/src/overlapped.rs new file mode 100644 index 0000000000..68c54df3b4 --- /dev/null +++ b/third_party/rust/miow/src/overlapped.rs @@ -0,0 +1,95 @@ +use std::fmt; +use std::io; +use std::mem; +use std::ptr; + +use winapi::shared::ntdef::{ + HANDLE, + NULL, +}; +use winapi::um::minwinbase::*; +use winapi::um::synchapi::*; + +/// A wrapper around `OVERLAPPED` to provide "rustic" accessors and +/// initializers. +pub struct Overlapped(OVERLAPPED); + +impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "OVERLAPPED") + } +} + +unsafe impl Send for Overlapped {} +unsafe impl Sync for Overlapped {} + +impl Overlapped { + /// Creates a new zeroed out instance of an overlapped I/O tracking state. + /// + /// This is suitable for passing to methods which will then later get + /// notified via an I/O Completion Port. + pub fn zero() -> Overlapped { + Overlapped(unsafe { mem::zeroed() }) + } + + /// Creates a new `Overlapped` with an initialized non-null `hEvent`. The caller is + /// responsible for calling `CloseHandle` on the `hEvent` field of the returned + /// `Overlapped`. The event is created with `bManualReset` set to `FALSE`, meaning after a + /// single thread waits on the event, it will be reset. + pub fn initialize_with_autoreset_event() -> io::Result<Overlapped> { + let event = unsafe {CreateEventW(ptr::null_mut(), 0i32, 0i32, ptr::null())}; + if event == NULL { + return Err(io::Error::last_os_error()); + } + let mut overlapped = Self::zero(); + overlapped.set_event(event); + Ok(overlapped) + } + + /// Creates a new `Overlapped` function pointer from the underlying + /// `OVERLAPPED`, wrapping in the "rusty" wrapper for working with + /// accessors. + /// + /// # Unsafety + /// + /// This function doesn't validate `ptr` nor the lifetime of the returned + /// pointer at all, it's recommended to use this method with extreme + /// caution. + pub unsafe fn from_raw<'a>(ptr: *mut OVERLAPPED) -> &'a mut Overlapped { + &mut *(ptr as *mut Overlapped) + } + + /// Gain access to the raw underlying data + pub fn raw(&self) -> *mut OVERLAPPED { + &self.0 as *const _ as *mut _ + } + + /// Sets the offset inside this overlapped structure. + /// + /// Note that for I/O operations in general this only has meaning for I/O + /// handles that are on a seeking device that supports the concept of an + /// offset. + pub fn set_offset(&mut self, offset: u64) { + let s = unsafe { self.0.u.s_mut() }; + s.Offset = offset as u32; + s.OffsetHigh = (offset >> 32) as u32; + } + + /// Reads the offset inside this overlapped structure. + pub fn offset(&self) -> u64 { + let s = unsafe { self.0.u.s() }; + (s.Offset as u64) | ((s.OffsetHigh as u64) << 32) + } + + /// Sets the `hEvent` field of this structure. + /// + /// The event specified can be null. + pub fn set_event(&mut self, event: HANDLE) { + self.0.hEvent = event; + } + + /// Reads the `hEvent` field of this structure, may return null. + pub fn event(&self) -> HANDLE { + self.0.hEvent + } +} diff --git a/third_party/rust/miow/src/pipe.rs b/third_party/rust/miow/src/pipe.rs new file mode 100644 index 0000000000..479789287a --- /dev/null +++ b/third_party/rust/miow/src/pipe.rs @@ -0,0 +1,716 @@ +//! Named pipes + +use std::cell::RefCell; +use std::ffi::OsStr; +use std::fs::{OpenOptions, File}; +use std::io::prelude::*; +use std::io; +use std::os::windows::ffi::*; +use std::os::windows::io::*; +use std::time::Duration; + +use winapi::shared::ntdef::HANDLE; +use winapi::shared::minwindef::*; +use winapi::shared::winerror::*; +use winapi::um::fileapi::*; +use winapi::um::handleapi::*; +use winapi::um::ioapiset::*; +use winapi::um::minwinbase::*; +use winapi::um::namedpipeapi::*; +use winapi::um::winbase::*; +use handle::Handle; +use overlapped::Overlapped; + +/// Readable half of an anonymous pipe. +#[derive(Debug)] +pub struct AnonRead(Handle); + +/// Writable half of an anonymous pipe. +#[derive(Debug)] +pub struct AnonWrite(Handle); + +/// A named pipe that can accept connections. +#[derive(Debug)] +pub struct NamedPipe(Handle); + +/// A builder structure for creating a new named pipe. +#[derive(Debug)] +pub struct NamedPipeBuilder { + name: Vec<u16>, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, +} + +/// Creates a new anonymous in-memory pipe, returning the read/write ends of the +/// pipe. +/// +/// The buffer size for this pipe may also be specified, but the system will +/// normally use this as a suggestion and it's not guaranteed that the buffer +/// will be precisely this size. +pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> { + let mut read = 0 as HANDLE; + let mut write = 0 as HANDLE; + try!(::cvt(unsafe { + CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) + })); + Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write)))) +} + +impl Read for AnonRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) } +} +impl<'a> Read for &'a AnonRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) } +} + +impl AsRawHandle for AnonRead { + fn as_raw_handle(&self) -> HANDLE { self.0.raw() } +} +impl FromRawHandle for AnonRead { + unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead { + AnonRead(Handle::new(handle)) + } +} +impl IntoRawHandle for AnonRead { + fn into_raw_handle(self) -> HANDLE { self.0.into_raw() } +} + +impl Write for AnonWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) } + fn flush(&mut self) -> io::Result<()> { Ok(()) } +} +impl<'a> Write for &'a AnonWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) } + fn flush(&mut self) -> io::Result<()> { Ok(()) } +} + +impl AsRawHandle for AnonWrite { + fn as_raw_handle(&self) -> HANDLE { self.0.raw() } +} +impl FromRawHandle for AnonWrite { + unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite { + AnonWrite(Handle::new(handle)) + } +} +impl IntoRawHandle for AnonWrite { + fn into_raw_handle(self) -> HANDLE { self.0.into_raw() } +} + +/// A convenience function to connect to a named pipe. +/// +/// This function will block the calling process until it can connect to the +/// pipe server specified by `addr`. This will use `NamedPipe::wait` internally +/// to block until it can connect. +pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> { + _connect(addr.as_ref()) +} + +fn _connect(addr: &OsStr) -> io::Result<File> { + let mut r = OpenOptions::new(); + let mut w = OpenOptions::new(); + let mut rw = OpenOptions::new(); + r.read(true); + w.write(true); + rw.read(true).write(true); + loop { + let res = rw.open(addr).or_else(|_| r.open(addr)) + .or_else(|_| w.open(addr)); + match res { + Ok(f) => return Ok(f), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) + => {} + Err(e) => return Err(e), + } + + try!(NamedPipe::wait(addr, Some(Duration::new(20, 0)))); + } +} + +impl NamedPipe { + /// Creates a new initial named pipe. + /// + /// This function is equivalent to: + /// + /// ``` + /// use miow::pipe::NamedPipeBuilder; + /// + /// # let addr = "foo"; + /// NamedPipeBuilder::new(addr) + /// .first(true) + /// .inbound(true) + /// .outbound(true) + /// .out_buffer_size(65536) + /// .in_buffer_size(65536) + /// .create(); + /// ``` + pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> { + NamedPipeBuilder::new(addr).create() + } + + /// Waits until either a time-out interval elapses or an instance of the + /// specified named pipe is available for connection. + /// + /// If this function succeeds the process can create a `File` to connect to + /// the named pipe. + pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) + -> io::Result<()> { + NamedPipe::_wait(addr.as_ref(), timeout) + } + + fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> { + let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>(); + let timeout = ::dur2ms(timeout); + ::cvt(unsafe { + WaitNamedPipeW(addr.as_ptr(), timeout) + }).map(|_| ()) + } + + /// Connects this named pipe to a client, blocking until one becomes + /// available. + /// + /// This function will call the `ConnectNamedPipe` function to await for a + /// client to connect. This can be called immediately after the pipe is + /// created, or after it has been disconnected from a previous client. + pub fn connect(&self) -> io::Result<()> { + match ::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) { + Ok(_) => Ok(()), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) + => Ok(()), + Err(e) => Err(e), + } + } + + /// Issue a connection request with the specified overlapped operation. + /// + /// This function will issue a request to connect a client to this server, + /// returning immediately after starting the overlapped operation. + /// + /// If this function immediately succeeds then `Ok(true)` is returned. If + /// the overlapped operation is enqueued and pending, then `Ok(false)` is + /// returned. Otherwise an error is returned indicating what went wrong. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` pointer is valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) + -> io::Result<bool> { + match ::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) { + Ok(_) => Ok(true), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) + => Ok(true), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) + => Ok(false), + Err(e) => Err(e), + } + } + + /// Disconnects this named pipe from any connected client. + pub fn disconnect(&self) -> io::Result<()> { + ::cvt(unsafe { + DisconnectNamedPipe(self.0.raw()) + }).map(|_| ()) + } + + /// Issues an overlapped read operation to occur on this pipe. + /// + /// This function will issue an asynchronous read to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes read. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + self.0.read_overlapped(buf, overlapped) + } + + /// Issues an overlapped write operation to occur on this pipe. + /// + /// This function will issue an asynchronous write to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes written. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn write_overlapped(&self, + buf: &[u8], + overlapped: *mut OVERLAPPED) + -> io::Result<Option<usize>> { + self.0.write_overlapped(buf, overlapped) + } + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `Overlapped` instance. + /// + /// # Panics + /// + /// This function will panic + pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) + -> io::Result<usize> { + let mut transferred = 0; + let r = GetOverlappedResult(self.0.raw(), + overlapped, + &mut transferred, + FALSE); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(transferred as usize) + } + } +} + +thread_local! { + static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None); +} + +/// Call a function with a threadlocal `Overlapped`. The function `f` should be +/// sure that the event is reset, either manually or by a thread being released. +fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize> + where F: FnOnce(&Overlapped) -> io::Result<usize> +{ + NAMED_PIPE_OVERLAPPED.with(|overlapped| { + let mut mborrow = overlapped.borrow_mut(); + if let None = *mborrow { + let op = Overlapped::initialize_with_autoreset_event()?; + *mborrow = Some(op); + } + f(mborrow.as_ref().unwrap()) + }) +} + +impl Read for NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } +} +impl<'a> Read for &'a NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } +} + +impl Write for NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } + fn flush(&mut self) -> io::Result<()> { + <&NamedPipe as Write>::flush(&mut &*self) + } +} +impl<'a> Write for &'a NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } + fn flush(&mut self) -> io::Result<()> { + ::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ()) + } +} + +impl AsRawHandle for NamedPipe { + fn as_raw_handle(&self) -> HANDLE { self.0.raw() } +} +impl FromRawHandle for NamedPipe { + unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe { + NamedPipe(Handle::new(handle)) + } +} +impl IntoRawHandle for NamedPipe { + fn into_raw_handle(self) -> HANDLE { self.0.into_raw() } +} + +fn flag(slot: &mut DWORD, on: bool, val: DWORD) { + if on { + *slot |= val; + } else { + *slot &= !val; + } +} + +impl NamedPipeBuilder { + /// Creates a new named pipe builder with the default settings. + pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder { + NamedPipeBuilder { + name: addr.as_ref().encode_wide().chain(Some(0)).collect(), + dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | + FILE_FLAG_OVERLAPPED, + dwPipeMode: PIPE_TYPE_BYTE, + nMaxInstances: PIPE_UNLIMITED_INSTANCES, + nOutBufferSize: 65536, + nInBufferSize: 65536, + nDefaultTimeOut: 0, + } + } + + /// Indicates whether data is allowed to flow from the client to the server. + pub fn inbound(&mut self, allowed: bool) -> &mut Self { + flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND); + self + } + + /// Indicates whether data is allowed to flow from the server to the client. + pub fn outbound(&mut self, allowed: bool) -> &mut Self { + flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND); + self + } + + /// Indicates that this pipe must be the first instance. + /// + /// If set to true, then creation will fail if there's already an instance + /// elsewhere. + pub fn first(&mut self, first: bool) -> &mut Self { + flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE); + self + } + + /// Indicates whether this server can accept remote clients or not. + pub fn accept_remote(&mut self, accept: bool) -> &mut Self { + flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS); + self + } + + /// Specifies the maximum number of instances of the server pipe that are + /// allowed. + /// + /// The first instance of a pipe can specify this value. A value of 255 + /// indicates that there is no limit to the number of instances. + pub fn max_instances(&mut self, instances: u8) -> &mut Self { + self.nMaxInstances = instances as DWORD; + self + } + + /// Specifies the number of bytes to reserver for the output buffer + pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self { + self.nOutBufferSize = buffer as DWORD; + self + } + + /// Specifies the number of bytes to reserver for the input buffer + pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self { + self.nInBufferSize = buffer as DWORD; + self + } + + /// Using the options in this builder, attempt to create a new named pipe. + /// + /// This function will call the `CreateNamedPipe` function and return the + /// result. + pub fn create(&mut self) -> io::Result<NamedPipe> { + unsafe { self.with_security_attributes(::std::ptr::null_mut()) } + } + + /// Using the options in the builder and the provided security attributes, attempt to create a + /// new named pipe. This function has to be called with a valid pointer to a + /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a + /// null pointer. + /// + /// This function will call the `CreateNamedPipe` function and return the + /// result. + pub unsafe fn with_security_attributes(&mut self, attrs: *mut SECURITY_ATTRIBUTES) -> io::Result<NamedPipe> { + let h = CreateNamedPipeW(self.name.as_ptr(), + self.dwOpenMode, self.dwPipeMode, + self.nMaxInstances, self.nOutBufferSize, + self.nInBufferSize, self.nDefaultTimeOut, + attrs); + + if h == INVALID_HANDLE_VALUE { + Err(io::Error::last_os_error()) + } else { + Ok(NamedPipe(Handle::new(h))) + } + } +} + +#[cfg(test)] +mod tests { + use std::fs::{File, OpenOptions}; + use std::io::prelude::*; + use std::sync::mpsc::channel; + use std::thread; + use std::time::Duration; + + use rand::{thread_rng, Rng}; + + use super::{anonymous, NamedPipe, NamedPipeBuilder}; + use iocp::CompletionPort; + use Overlapped; + + fn name() -> String { + let name = thread_rng().gen_ascii_chars().take(30).collect::<String>(); + format!(r"\\.\pipe\{}", name) + } + + #[test] + fn anon() { + let (mut read, mut write) = t!(anonymous(256)); + assert_eq!(t!(write.write(&[1, 2, 3])), 3); + let mut b = [0; 10]; + assert_eq!(t!(read.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + } + + #[test] + fn named_not_first() { + let name = name(); + let _a = t!(NamedPipe::new(&name)); + assert!(NamedPipe::new(&name).is_err()); + + t!(NamedPipeBuilder::new(&name).first(false).create()); + } + + #[test] + fn named_connect() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + t!(File::open(name)); + }); + + t!(a.connect()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_wait() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let (tx, rx) = channel(); + let t = thread::spawn(move || { + t!(NamedPipe::wait(&name, None)); + t!(File::open(&name)); + assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err()); + t!(tx.send(())); + }); + + t!(a.connect()); + t!(rx.recv()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_connect_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + t!(File::open(name)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(2, &a)); + + let over = Overlapped::zero(); + unsafe { + t!(a.connect_overlapped(over.raw())); + } + + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 2); + assert_eq!(status.overlapped(), over.raw()); + t!(t.join()); + } + + #[test] + fn named_read_write() { + let name = name(); + let mut a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(OpenOptions::new().read(true).write(true).open(name)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + t!(a.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a.write_all(&[1, 2, 3])); + t!(a.flush()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_read_write_multi() { + for _ in 0..5 { + named_read_write() + } + } + + #[test] + fn named_read_write_multi_same_thread() { + let name1 = name(); + let mut a1 = t!(NamedPipe::new(&name1)); + let name2 = name(); + let mut a2 = t!(NamedPipe::new(&name2)); + + let t = thread::spawn(move || { + let mut f = t!(OpenOptions::new().read(true).write(true).open(name1)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + + let mut f = t!(OpenOptions::new().read(true).write(true).open(name2)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + t!(a1.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a1.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a1.write_all(&[1, 2, 3])); + t!(a1.flush()); + t!(a1.disconnect()); + + t!(a2.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a2.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a2.write_all(&[1, 2, 3])); + t!(a2.flush()); + t!(a2.disconnect()); + + t!(t.join()); + } + + #[test] + fn named_read_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(File::create(name)); + t!(f.write_all(&[1, 2, 3])); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(3, &a)); + t!(a.connect()); + + let mut b = [0; 10]; + let over = Overlapped::zero(); + unsafe { + t!(a.read_overlapped(&mut b, over.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 3); + assert_eq!(status.overlapped(), over.raw()); + assert_eq!(&b[..3], &[1, 2, 3]); + + t!(t.join()); + } + + #[test] + fn named_write_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(super::connect(name)); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]) + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(3, &a)); + t!(a.connect()); + + let over = Overlapped::zero(); + unsafe { + t!(a.write_overlapped(&[1, 2, 3], over.raw())); + } + + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 3); + assert_eq!(status.overlapped(), over.raw()); + + t!(t.join()); + } +} |