diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/miow/src | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/miow/src')
-rw-r--r-- | third_party/rust/miow/src/handle.rs | 177 | ||||
-rw-r--r-- | third_party/rust/miow/src/iocp.rs | 328 | ||||
-rw-r--r-- | third_party/rust/miow/src/lib.rs | 52 | ||||
-rw-r--r-- | third_party/rust/miow/src/net.rs | 1332 | ||||
-rw-r--r-- | third_party/rust/miow/src/overlapped.rs | 92 | ||||
-rw-r--r-- | third_party/rust/miow/src/pipe.rs | 788 |
6 files changed, 2769 insertions, 0 deletions
diff --git a/third_party/rust/miow/src/handle.rs b/third_party/rust/miow/src/handle.rs new file mode 100644 index 0000000000..a749fb3269 --- /dev/null +++ b/third_party/rust/miow/src/handle.rs @@ -0,0 +1,177 @@ +use std::cmp; +use std::io; +use std::ptr; + +use winapi::shared::minwindef::*; +use winapi::shared::ntdef::{BOOLEAN, FALSE, HANDLE, TRUE}; +use winapi::shared::winerror::*; +use winapi::um::fileapi::*; +use winapi::um::handleapi::*; +use winapi::um::ioapiset::*; +use winapi::um::minwinbase::*; + +#[derive(Debug)] +pub struct Handle(HANDLE); + +unsafe impl Send for Handle {} +unsafe impl Sync for Handle {} + +impl Handle { + pub fn new(handle: HANDLE) -> Handle { + Handle(handle) + } + + pub fn raw(&self) -> HANDLE { + self.0 + } + + pub fn into_raw(self) -> HANDLE { + use std::mem; + + let ret = self.0; + mem::forget(self); + ret + } + + pub fn write(&self, buf: &[u8]) -> io::Result<usize> { + let mut bytes = 0; + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + crate::cvt(unsafe { + WriteFile( + self.0, + buf.as_ptr() as *const _, + len, + &mut bytes, + 0 as *mut _, + ) + })?; + Ok(bytes as usize) + } + + pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { + let mut bytes = 0; + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + crate::cvt(unsafe { + ReadFile( + self.0, + buf.as_mut_ptr() as *mut _, + len, + &mut bytes, + 0 as *mut _, + ) + })?; + Ok(bytes as usize) + } + + pub unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.read_overlapped_helper(buf, overlapped, FALSE) + } + + pub unsafe fn read_overlapped_wait( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<usize> { + match self.read_overlapped_helper(buf, overlapped, TRUE) { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => panic!("logic error"), + Err(e) => Err(e), + } + } + + pub unsafe fn read_overlapped_helper( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + wait: BOOLEAN, + ) -> io::Result<Option<usize>> { + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + let res = crate::cvt({ + ReadFile( + self.0, + buf.as_mut_ptr() as *mut _, + len, + ptr::null_mut(), + overlapped, + ) + }); + match res { + Ok(_) => (), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => (), + Err(e) => return Err(e), + } + + let mut bytes = 0; + let res = crate::cvt({ GetOverlappedResult(self.0, overlapped, &mut bytes, wait as BOOL) }); + match res { + Ok(_) => Ok(Some(bytes as usize)), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE => { + Ok(None) + } + Err(e) => Err(e), + } + } + + pub unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.write_overlapped_helper(buf, overlapped, FALSE) + } + + pub unsafe fn write_overlapped_wait( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<usize> { + match self.write_overlapped_helper(buf, overlapped, TRUE) { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => panic!("logic error"), + Err(e) => Err(e), + } + } + + unsafe fn write_overlapped_helper( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + wait: BOOLEAN, + ) -> io::Result<Option<usize>> { + let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD; + let res = crate::cvt({ + WriteFile( + self.0, + buf.as_ptr() as *const _, + len, + ptr::null_mut(), + overlapped, + ) + }); + match res { + Ok(_) => (), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => (), + Err(e) => return Err(e), + } + + let mut bytes = 0; + let res = crate::cvt({ GetOverlappedResult(self.0, overlapped, &mut bytes, wait as BOOL) }); + match res { + Ok(_) => Ok(Some(bytes as usize)), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE => { + Ok(None) + } + Err(e) => Err(e), + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + unsafe { CloseHandle(self.0) }; + } +} diff --git a/third_party/rust/miow/src/iocp.rs b/third_party/rust/miow/src/iocp.rs new file mode 100644 index 0000000000..d862d6bcfa --- /dev/null +++ b/third_party/rust/miow/src/iocp.rs @@ -0,0 +1,328 @@ +//! Bindings to IOCP, I/O Completion Ports + +use std::cmp; +use std::fmt; +use std::io; +use std::mem; +use std::os::windows::io::*; +use std::time::Duration; + +use crate::handle::Handle; +use crate::Overlapped; +use winapi::shared::basetsd::*; +use winapi::shared::ntdef::*; +use winapi::um::handleapi::*; +use winapi::um::ioapiset::*; +use winapi::um::minwinbase::*; + +/// A handle to an Windows I/O Completion Port. +#[derive(Debug)] +pub struct CompletionPort { + handle: Handle, +} + +/// A status message received from an I/O completion port. +/// +/// These statuses can be created via the `new` or `empty` constructors and then +/// provided to a completion port, or they are read out of a completion port. +/// The fields of each status are read through its accessor methods. +#[derive(Clone, Copy)] +pub struct CompletionStatus(OVERLAPPED_ENTRY); + +impl fmt::Debug for CompletionStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "CompletionStatus(OVERLAPPED_ENTRY)") + } +} + +unsafe impl Send for CompletionStatus {} +unsafe impl Sync for CompletionStatus {} + +impl CompletionPort { + /// Creates a new I/O completion port with the specified concurrency value. + /// + /// The number of threads given corresponds to the level of concurrency + /// allowed for threads associated with this port. Consult the Windows + /// documentation for more information about this value. + pub fn new(threads: u32) -> io::Result<CompletionPort> { + let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _, 0, threads) }; + if ret.is_null() { + Err(io::Error::last_os_error()) + } else { + Ok(CompletionPort { + handle: Handle::new(ret), + }) + } + } + + /// Associates a new `HANDLE` to this I/O completion port. + /// + /// This function will associate the given handle to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `HANDLE` via the `AsRawHandle` + /// trait can be provided to this function, such as `std::fs::File` and + /// friends. + pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> { + self._add(token, t.as_raw_handle()) + } + + /// Associates a new `SOCKET` to this I/O completion port. + /// + /// This function will associate the given socket to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `SOCKET` via the `AsRawSocket` + /// trait can be provided to this function, such as `std::net::TcpStream` + /// and friends. + pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> { + self._add(token, t.as_raw_socket() as HANDLE) + } + + fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> { + assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>()); + let ret = + unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token as ULONG_PTR, 0) }; + if ret.is_null() { + Err(io::Error::last_os_error()) + } else { + debug_assert_eq!(ret, self.handle.raw()); + Ok(()) + } + } + + /// Dequeue a completion status from this I/O completion port. + /// + /// This function will associate the calling thread with this completion + /// port and then wait for a status message to become available. The precise + /// semantics on when this function returns depends on the concurrency value + /// specified when the port was created. + /// + /// A timeout can optionally be specified to this function. If `None` is + /// provided this function will not time out, and otherwise it will time out + /// after the specified duration has passed. + /// + /// On success this will return the status message which was dequeued from + /// this completion port. + pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> { + let mut bytes = 0; + let mut token = 0; + let mut overlapped = 0 as *mut _; + let timeout = crate::dur2ms(timeout); + let ret = unsafe { + GetQueuedCompletionStatus( + self.handle.raw(), + &mut bytes, + &mut token, + &mut overlapped, + timeout, + ) + }; + crate::cvt(ret).map(|_| { + CompletionStatus(OVERLAPPED_ENTRY { + dwNumberOfBytesTransferred: bytes, + lpCompletionKey: token, + lpOverlapped: overlapped, + Internal: 0, + }) + }) + } + + /// Dequeues a number of completion statuses from this I/O completion port. + /// + /// This function is the same as `get` except that it may return more than + /// one status. A buffer of "zero" statuses is provided (the contents are + /// not read) and then on success this function will return a sub-slice of + /// statuses which represent those which were dequeued from this port. This + /// function does not wait to fill up the entire list of statuses provided. + /// + /// Like with `get`, a timeout may be specified for this operation. + pub fn get_many<'a>( + &self, + list: &'a mut [CompletionStatus], + timeout: Option<Duration>, + ) -> io::Result<&'a mut [CompletionStatus]> { + debug_assert_eq!( + mem::size_of::<CompletionStatus>(), + mem::size_of::<OVERLAPPED_ENTRY>() + ); + let mut removed = 0; + let timeout = crate::dur2ms(timeout); + let len = cmp::min(list.len(), <ULONG>::max_value() as usize) as ULONG; + let ret = unsafe { + GetQueuedCompletionStatusEx( + self.handle.raw(), + list.as_ptr() as *mut _, + len, + &mut removed, + timeout, + FALSE as i32, + ) + }; + match crate::cvt(ret) { + Ok(_) => Ok(&mut list[..removed as usize]), + Err(e) => Err(e), + } + } + + /// Posts a new completion status onto this I/O completion port. + /// + /// This function will post the given status, with custom parameters, to the + /// port. Threads blocked in `get` or `get_many` will eventually receive + /// this status. + pub fn post(&self, status: CompletionStatus) -> io::Result<()> { + let ret = unsafe { + PostQueuedCompletionStatus( + self.handle.raw(), + status.0.dwNumberOfBytesTransferred, + status.0.lpCompletionKey, + status.0.lpOverlapped, + ) + }; + crate::cvt(ret).map(|_| ()) + } +} + +impl AsRawHandle for CompletionPort { + fn as_raw_handle(&self) -> HANDLE { + self.handle.raw() + } +} + +impl FromRawHandle for CompletionPort { + unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort { + CompletionPort { + handle: Handle::new(handle), + } + } +} + +impl IntoRawHandle for CompletionPort { + fn into_raw_handle(self) -> HANDLE { + self.handle.into_raw() + } +} + +impl CompletionStatus { + /// Creates a new completion status with the provided parameters. + /// + /// This function is useful when creating a status to send to a port with + /// the `post` method. The parameters are opaquely passed through and not + /// interpreted by the system at all. + pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus { + assert_eq!(mem::size_of_val(&token), mem::size_of::<ULONG_PTR>()); + CompletionStatus(OVERLAPPED_ENTRY { + dwNumberOfBytesTransferred: bytes, + lpCompletionKey: token as ULONG_PTR, + lpOverlapped: overlapped as *mut _, + Internal: 0, + }) + } + + /// Creates a new borrowed completion status from the borrowed + /// `OVERLAPPED_ENTRY` argument provided. + /// + /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`, + /// returning the wrapped structure. + pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus { + unsafe { &*(entry as *const _ as *const _) } + } + + /// Creates a new "zero" completion status. + /// + /// This function is useful when creating a stack buffer or vector of + /// completion statuses to be passed to the `get_many` function. + pub fn zero() -> CompletionStatus { + CompletionStatus::new(0, 0, 0 as *mut _) + } + + /// Returns the number of bytes that were transferred for the I/O operation + /// associated with this completion status. + pub fn bytes_transferred(&self) -> u32 { + self.0.dwNumberOfBytesTransferred + } + + /// Returns the completion key value associated with the file handle whose + /// I/O operation has completed. + /// + /// A completion key is a per-handle key that is specified when it is added + /// to an I/O completion port via `add_handle` or `add_socket`. + pub fn token(&self) -> usize { + self.0.lpCompletionKey as usize + } + + /// Returns a pointer to the `Overlapped` structure that was specified when + /// the I/O operation was started. + pub fn overlapped(&self) -> *mut OVERLAPPED { + self.0.lpOverlapped + } + + /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object. + pub fn entry(&self) -> &OVERLAPPED_ENTRY { + &self.0 + } +} + +#[cfg(test)] +mod tests { + use std::mem; + use std::time::Duration; + + use winapi::shared::basetsd::*; + use winapi::shared::winerror::*; + + use crate::iocp::{CompletionPort, CompletionStatus}; + + #[test] + fn is_send_sync() { + fn is_send_sync<T: Send + Sync>() {} + is_send_sync::<CompletionPort>(); + } + + #[test] + fn token_right_size() { + assert_eq!(mem::size_of::<usize>(), mem::size_of::<ULONG_PTR>()); + } + + #[test] + fn timeout() { + let c = CompletionPort::new(1).unwrap(); + let err = c.get(Some(Duration::from_millis(1))).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32)); + } + + #[test] + fn get() { + let c = CompletionPort::new(1).unwrap(); + c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap(); + let s = c.get(None).unwrap(); + assert_eq!(s.bytes_transferred(), 1); + assert_eq!(s.token(), 2); + assert_eq!(s.overlapped(), 3 as *mut _); + } + + #[test] + fn get_many() { + let c = CompletionPort::new(1).unwrap(); + + c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap(); + c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap(); + + let mut s = vec![CompletionStatus::zero(); 4]; + { + let s = c.get_many(&mut s, None).unwrap(); + assert_eq!(s.len(), 2); + assert_eq!(s[0].bytes_transferred(), 1); + assert_eq!(s[0].token(), 2); + assert_eq!(s[0].overlapped(), 3 as *mut _); + assert_eq!(s[1].bytes_transferred(), 4); + assert_eq!(s[1].token(), 5); + assert_eq!(s[1].overlapped(), 6 as *mut _); + } + assert_eq!(s[2].bytes_transferred(), 0); + assert_eq!(s[2].token(), 0); + assert_eq!(s[2].overlapped(), 0 as *mut _); + } +} diff --git a/third_party/rust/miow/src/lib.rs b/third_party/rust/miow/src/lib.rs new file mode 100644 index 0000000000..53c01aeeae --- /dev/null +++ b/third_party/rust/miow/src/lib.rs @@ -0,0 +1,52 @@ +//! A zero overhead Windows I/O library + +#![cfg(windows)] +#![deny(missing_docs)] +#![allow(bad_style)] +#![doc(html_root_url = "https://docs.rs/miow/0.3/x86_64-pc-windows-msvc/")] + +use std::cmp; +use std::io; +use std::time::Duration; + +use winapi::shared::minwindef::*; +use winapi::um::winbase::*; + +#[cfg(test)] +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +mod handle; +mod overlapped; + +pub mod iocp; +pub mod net; +pub mod pipe; + +pub use crate::overlapped::Overlapped; + +fn cvt(i: BOOL) -> io::Result<BOOL> { + if i == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(i) + } +} + +fn dur2ms(dur: Option<Duration>) -> u32 { + let dur = match dur { + Some(dur) => dur, + None => return INFINITE, + }; + let ms = dur.as_secs().checked_mul(1_000); + let ms_extra = dur.subsec_nanos() / 1_000_000; + ms.and_then(|ms| ms.checked_add(ms_extra as u64)) + .map(|ms| cmp::min(u32::max_value() as u64, ms) as u32) + .unwrap_or(INFINITE - 1) +} diff --git a/third_party/rust/miow/src/net.rs b/third_party/rust/miow/src/net.rs new file mode 100644 index 0000000000..e30bc2d2bf --- /dev/null +++ b/third_party/rust/miow/src/net.rs @@ -0,0 +1,1332 @@ +//! Extensions and types for the standard networking primitives. +//! +//! This module contains a number of extension traits for the types in +//! `std::net` for Windows-specific functionality. + +use std::cmp; +use std::io; +use std::mem; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}; +use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket}; +use std::os::windows::prelude::*; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use winapi::ctypes::*; +use winapi::shared::guiddef::*; +use winapi::shared::in6addr::{in6_addr_u, IN6_ADDR}; +use winapi::shared::inaddr::{in_addr_S_un, IN_ADDR}; +use winapi::shared::minwindef::*; +use winapi::shared::minwindef::{FALSE, TRUE}; +use winapi::shared::ntdef::*; +use winapi::shared::ws2def::SOL_SOCKET; +use winapi::shared::ws2def::*; +use winapi::shared::ws2ipdef::*; +use winapi::um::minwinbase::*; +use winapi::um::winsock2::*; + +/// A type to represent a buffer in which a socket address will be stored. +/// +/// This type is used with the `recv_from_overlapped` function on the +/// `UdpSocketExt` trait to provide space for the overlapped I/O operation to +/// fill in the address upon completion. +#[derive(Clone, Copy)] +pub struct SocketAddrBuf { + buf: SOCKADDR_STORAGE, + len: c_int, +} + +/// A type to represent a buffer in which an accepted socket's address will be +/// stored. +/// +/// This type is used with the `accept_overlapped` method on the +/// `TcpListenerExt` trait to provide space for the overlapped I/O operation to +/// fill in the socket addresses upon completion. +#[repr(C)] +pub struct AcceptAddrsBuf { + // For AcceptEx we've got the restriction that the addresses passed in that + // buffer need to be at least 16 bytes more than the maximum address length + // for the protocol in question, so add some extra here and there + local: SOCKADDR_STORAGE, + _pad1: [u8; 16], + remote: SOCKADDR_STORAGE, + _pad2: [u8; 16], +} + +/// The parsed return value of `AcceptAddrsBuf`. +pub struct AcceptAddrs<'a> { + local: LPSOCKADDR, + local_len: c_int, + remote: LPSOCKADDR, + remote_len: c_int, + _data: &'a AcceptAddrsBuf, +} + +struct WsaExtension { + guid: GUID, + val: AtomicUsize, +} + +/// Additional methods for the `TcpStream` type in the standard library. +pub trait TcpStreamExt { + /// Execute an overlapped read I/O operation on this TCP stream. + /// + /// This function will issue an overlapped I/O read (via `WSARecv`) on this + /// socket. The provided buffer will be filled in when the operation + /// completes and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned indicating how + /// many bytes were read. If the operation returns an error indicating that + /// the I/O is currently pending, `Ok(None)` is returned. Otherwise, the + /// error associated with the operation is returned and no overlapped + /// operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped write I/O operation on this TCP stream. + /// + /// This function will issue an overlapped I/O write (via `WSASend`) on this + /// socket. The provided buffer will be written when the operation completes + /// and the given `OVERLAPPED` instance is used to track the overlapped + /// operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were written. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Attempt to consume the internal socket in this builder by executing an + /// overlapped connect operation. + /// + /// This function will issue a connect operation to the address specified on + /// the underlying socket, flagging it as an overlapped operation which will + /// complete asynchronously. If successful this function will return the + /// corresponding TCP stream. + /// + /// The `buf` argument provided is an initial buffer of data that should be + /// sent after the connection is initiated. It's acceptable to + /// pass an empty slice here. + /// + /// This function will also return whether the connect immediately + /// succeeded or not. If `None` is returned then the I/O operation is still + /// pending and will complete at a later date, and if `Some(bytes)` is + /// returned then that many bytes were transferred. + /// + /// Note that to succeed this requires that the underlying socket has + /// previously been bound via a call to `bind` to a local address. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` and `buf` pointers to be valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for + /// this I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + unsafe fn connect_overlapped( + &self, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Once a `connect_overlapped` has finished, this function needs to be + /// called to finish the connect operation. + /// + /// Currently this just calls `setsockopt` with `SO_UPDATE_CONNECT_CONTEXT` + /// to ensure that further functions like `getpeername` and `getsockname` + /// work correctly. + fn connect_complete(&self) -> io::Result<()>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>; +} + +/// Additional methods for the `UdpSocket` type in the standard library. +pub trait UdpSocketExt { + /// Execute an overlapped receive I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O read (via `WSARecvFrom`) on + /// this socket. The provided buffer will be filled in when the operation + /// completes, the source from where the data came from will be written to + /// `addr`, and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were read. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf`, + /// `addr`, and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn recv_from_overlapped( + &self, + buf: &mut [u8], + addr: *mut SocketAddrBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped receive I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O read (via `WSARecv`) on + /// this socket. The provided buffer will be filled in when the operation + /// completes, the source from where the data came from will be written to + /// `addr`, and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were read. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf`, + /// and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn recv_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped send I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O write (via `WSASendTo`) on + /// this socket to the address specified by `addr`. The provided buffer will + /// be written when the operation completes and the given `OVERLAPPED` + /// instance is used to track the overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte + /// were written. If the operation returns an error indicating that the I/O + /// is currently pending, `Ok(None)` is returned. Otherwise, the error + /// associated with the operation is returned and no overlapped operation + /// is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn send_to_overlapped( + &self, + buf: &[u8], + addr: &SocketAddr, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped send I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O write (via `WSASend`) on + /// this socket to the address it was previously connected to. The provided + /// buffer will be written when the operation completes and the given `OVERLAPPED` + /// instance is used to track the overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte + /// were written. If the operation returns an error indicating that the I/O + /// is currently pending, `Ok(None)` is returned. Otherwise, the error + /// associated with the operation is returned and no overlapped operation + /// is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn send_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>; +} + +/// Additional methods for the `TcpListener` type in the standard library. +pub trait TcpListenerExt { + /// Perform an accept operation on this listener, accepting a connection in + /// an overlapped fashion. + /// + /// This function will issue an I/O request to accept an incoming connection + /// with the specified overlapped instance. The `socket` provided must be a + /// configured but not bound or connected socket, and if successful this + /// will consume the internal socket of the builder to return a TCP stream. + /// + /// The `addrs` buffer provided will be filled in with the local and remote + /// addresses of the connection upon completion. + /// + /// If the accept succeeds immediately, `Ok(true)` is returned. If + /// the connect indicates that the I/O is currently pending, `Ok(false)` is + /// returned. Otherwise, the error associated with the operation is + /// returned and no overlapped operation is enqueued. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `addrs` and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + unsafe fn accept_overlapped( + &self, + socket: &TcpStream, + addrs: &mut AcceptAddrsBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<bool>; + + /// Once an `accept_overlapped` has finished, this function needs to be + /// called to finish the accept operation. + /// + /// Currently this just calls `setsockopt` with `SO_UPDATE_ACCEPT_CONTEXT` + /// to ensure that further functions like `getpeername` and `getsockname` + /// work correctly. + fn accept_complete(&self, socket: &TcpStream) -> io::Result<()>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>; +} + +#[doc(hidden)] +trait NetInt { + fn from_be(i: Self) -> Self; + fn to_be(&self) -> Self; +} +macro_rules! doit { + ($($t:ident)*) => ($(impl NetInt for $t { + fn from_be(i: Self) -> Self { <$t>::from_be(i) } + fn to_be(&self) -> Self { <$t>::to_be(*self) } + })*) +} +doit! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize } + +// fn hton<I: NetInt>(i: I) -> I { i.to_be() } +fn ntoh<I: NetInt>(i: I) -> I { + I::from_be(i) +} + +fn last_err() -> io::Result<Option<usize>> { + let err = unsafe { WSAGetLastError() }; + if err == WSA_IO_PENDING as i32 { + Ok(None) + } else { + Err(io::Error::from_raw_os_error(err)) + } +} + +fn cvt(i: c_int, size: DWORD) -> io::Result<Option<usize>> { + if i == SOCKET_ERROR { + last_err() + } else { + Ok(Some(size as usize)) + } +} + +/// A type with the same memory layout as `SOCKADDR`. Used in converting Rust level +/// SocketAddr* types into their system representation. The benefit of this specific +/// type over using `SOCKADDR_STORAGE` is that this type is exactly as large as it +/// needs to be and not a lot larger. And it can be initialized cleaner from Rust. +#[repr(C)] +pub(crate) union SocketAddrCRepr { + v4: SOCKADDR_IN, + v6: SOCKADDR_IN6_LH, +} + +impl SocketAddrCRepr { + pub(crate) fn as_ptr(&self) -> *const SOCKADDR { + self as *const _ as *const SOCKADDR + } +} + +fn socket_addr_to_ptrs(addr: &SocketAddr) -> (SocketAddrCRepr, c_int) { + match *addr { + SocketAddr::V4(ref a) => { + let sin_addr = unsafe { + let mut s_un = mem::zeroed::<in_addr_S_un>(); + *s_un.S_addr_mut() = u32::from_ne_bytes(a.ip().octets()); + IN_ADDR { S_un: s_un } + }; + + let sockaddr_in = SOCKADDR_IN { + sin_family: AF_INET as ADDRESS_FAMILY, + sin_port: a.port().to_be(), + sin_addr, + sin_zero: [0; 8], + }; + + let sockaddr = SocketAddrCRepr { v4: sockaddr_in }; + (sockaddr, mem::size_of::<SOCKADDR_IN>() as c_int) + } + SocketAddr::V6(ref a) => { + let sin6_addr = unsafe { + let mut u = mem::zeroed::<in6_addr_u>(); + *u.Byte_mut() = a.ip().octets(); + IN6_ADDR { u } + }; + let u = unsafe { + let mut u = mem::zeroed::<SOCKADDR_IN6_LH_u>(); + *u.sin6_scope_id_mut() = a.scope_id(); + u + }; + + let sockaddr_in6 = SOCKADDR_IN6_LH { + sin6_family: AF_INET6 as ADDRESS_FAMILY, + sin6_port: a.port().to_be(), + sin6_addr, + sin6_flowinfo: a.flowinfo(), + u, + }; + + let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 }; + (sockaddr, mem::size_of::<SOCKADDR_IN6_LH>() as c_int) + } + } +} + +unsafe fn ptrs_to_socket_addr(ptr: *const SOCKADDR, len: c_int) -> Option<SocketAddr> { + if (len as usize) < mem::size_of::<c_int>() { + return None; + } + match (*ptr).sa_family as i32 { + AF_INET if len as usize >= mem::size_of::<SOCKADDR_IN>() => { + let b = &*(ptr as *const SOCKADDR_IN); + let ip = ntoh(*b.sin_addr.S_un.S_addr()); + let ip = Ipv4Addr::new( + (ip >> 24) as u8, + (ip >> 16) as u8, + (ip >> 8) as u8, + (ip >> 0) as u8, + ); + Some(SocketAddr::V4(SocketAddrV4::new(ip, ntoh(b.sin_port)))) + } + AF_INET6 if len as usize >= mem::size_of::<SOCKADDR_IN6_LH>() => { + let b = &*(ptr as *const SOCKADDR_IN6_LH); + let arr = b.sin6_addr.u.Byte(); + let ip = Ipv6Addr::new( + ((arr[0] as u16) << 8) | (arr[1] as u16), + ((arr[2] as u16) << 8) | (arr[3] as u16), + ((arr[4] as u16) << 8) | (arr[5] as u16), + ((arr[6] as u16) << 8) | (arr[7] as u16), + ((arr[8] as u16) << 8) | (arr[9] as u16), + ((arr[10] as u16) << 8) | (arr[11] as u16), + ((arr[12] as u16) << 8) | (arr[13] as u16), + ((arr[14] as u16) << 8) | (arr[15] as u16), + ); + let addr = SocketAddrV6::new( + ip, + ntoh(b.sin6_port), + ntoh(b.sin6_flowinfo), + ntoh(*b.u.sin6_scope_id()), + ); + Some(SocketAddr::V6(addr)) + } + _ => None, + } +} + +unsafe fn slice2buf(slice: &[u8]) -> WSABUF { + WSABUF { + len: cmp::min(slice.len(), <u_long>::max_value() as usize) as u_long, + buf: slice.as_ptr() as *mut _, + } +} + +unsafe fn result(socket: SOCKET, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + let mut transferred = 0; + let mut flags = 0; + let r = WSAGetOverlappedResult(socket, overlapped, &mut transferred, FALSE, &mut flags); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok((transferred as usize, flags)) + } +} + +impl TcpStreamExt for TcpStream { + unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut bytes_read: DWORD = 0; + let r = WSARecv( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut bytes_read, + &mut flags, + overlapped, + None, + ); + cvt(r, bytes_read) + } + + unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut bytes_written = 0; + + // Note here that we capture the number of bytes written. The + // documentation on MSDN, however, states: + // + // > Use NULL for this parameter if the lpOverlapped parameter is not + // > NULL to avoid potentially erroneous results. This parameter can be + // > NULL only if the lpOverlapped parameter is not NULL. + // + // If we're not passing a null overlapped pointer here, then why are we + // then capturing the number of bytes! Well so it turns out that this is + // clearly faster to learn the bytes here rather than later calling + // `WSAGetOverlappedResult`, and in practice almost all implementations + // use this anyway [1]. + // + // As a result we use this to and report back the result. + // + // [1]: https://github.com/carllerche/mio/pull/520#issuecomment-273983823 + let r = WSASend( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut bytes_written, + 0, + overlapped, + None, + ); + cvt(r, bytes_written) + } + + unsafe fn connect_overlapped( + &self, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + connect_overlapped(self.as_raw_socket() as SOCKET, addr, buf, overlapped) + } + + fn connect_complete(&self) -> io::Result<()> { + const SO_UPDATE_CONNECT_CONTEXT: c_int = 0x7010; + let result = unsafe { + setsockopt( + self.as_raw_socket() as SOCKET, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + 0 as *const _, + 0, + ) + }; + if result == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +unsafe fn connect_overlapped( + socket: SOCKET, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED, +) -> io::Result<Option<usize>> { + static CONNECTEX: WsaExtension = WsaExtension { + guid: GUID { + Data1: 0x25a207b9, + Data2: 0xddf3, + Data3: 0x4660, + Data4: [0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e], + }, + val: AtomicUsize::new(0), + }; + type ConnectEx = unsafe extern "system" fn( + SOCKET, + *const SOCKADDR, + c_int, + PVOID, + DWORD, + LPDWORD, + LPOVERLAPPED, + ) -> BOOL; + + let ptr = CONNECTEX.get(socket)?; + assert!(ptr != 0); + let connect_ex = mem::transmute::<_, ConnectEx>(ptr); + + let (addr_buf, addr_len) = socket_addr_to_ptrs(addr); + let mut bytes_sent: DWORD = 0; + let r = connect_ex( + socket, + addr_buf.as_ptr(), + addr_len, + buf.as_ptr() as *mut _, + buf.len() as u32, + &mut bytes_sent, + overlapped, + ); + if r == TRUE { + Ok(Some(bytes_sent as usize)) + } else { + last_err() + } +} + +impl UdpSocketExt for UdpSocket { + unsafe fn recv_from_overlapped( + &self, + buf: &mut [u8], + addr: *mut SocketAddrBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut received_bytes: DWORD = 0; + let r = WSARecvFrom( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut received_bytes, + &mut flags, + &mut (*addr).buf as *mut _ as *mut _, + &mut (*addr).len, + overlapped, + None, + ); + cvt(r, received_bytes) + } + + unsafe fn recv_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut received_bytes: DWORD = 0; + let r = WSARecv( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut received_bytes, + &mut flags, + overlapped, + None, + ); + cvt(r, received_bytes) + } + + unsafe fn send_to_overlapped( + &self, + buf: &[u8], + addr: &SocketAddr, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let (addr_buf, addr_len) = socket_addr_to_ptrs(addr); + let mut buf = slice2buf(buf); + let mut sent_bytes = 0; + let r = WSASendTo( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut sent_bytes, + 0, + addr_buf.as_ptr() as *const _, + addr_len, + overlapped, + None, + ); + cvt(r, sent_bytes) + } + + unsafe fn send_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut sent_bytes = 0; + let r = WSASend( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut sent_bytes, + 0, + overlapped, + None, + ); + cvt(r, sent_bytes) + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +impl TcpListenerExt for TcpListener { + unsafe fn accept_overlapped( + &self, + socket: &TcpStream, + addrs: &mut AcceptAddrsBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<bool> { + static ACCEPTEX: WsaExtension = WsaExtension { + guid: GUID { + Data1: 0xb5367df1, + Data2: 0xcbac, + Data3: 0x11cf, + Data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92], + }, + val: AtomicUsize::new(0), + }; + type AcceptEx = unsafe extern "system" fn( + SOCKET, + SOCKET, + PVOID, + DWORD, + DWORD, + DWORD, + LPDWORD, + LPOVERLAPPED, + ) -> BOOL; + + let ptr = ACCEPTEX.get(self.as_raw_socket() as SOCKET)?; + assert!(ptr != 0); + let accept_ex = mem::transmute::<_, AcceptEx>(ptr); + + let mut bytes = 0; + let (a, b, c, d) = (*addrs).args(); + let r = accept_ex( + self.as_raw_socket() as SOCKET, + socket.as_raw_socket() as SOCKET, + a, + b, + c, + d, + &mut bytes, + overlapped, + ); + let succeeded = if r == TRUE { + true + } else { + last_err()?; + false + }; + Ok(succeeded) + } + + fn accept_complete(&self, socket: &TcpStream) -> io::Result<()> { + const SO_UPDATE_ACCEPT_CONTEXT: c_int = 0x700B; + let me = self.as_raw_socket(); + let result = unsafe { + setsockopt( + socket.as_raw_socket() as SOCKET, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + &me as *const _ as *const _, + mem::size_of_val(&me) as c_int, + ) + }; + if result == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +impl SocketAddrBuf { + /// Creates a new blank socket address buffer. + /// + /// This should be used before a call to `recv_from_overlapped` overlapped + /// to create an instance to pass down. + pub fn new() -> SocketAddrBuf { + SocketAddrBuf { + buf: unsafe { mem::zeroed() }, + len: mem::size_of::<SOCKADDR_STORAGE>() as c_int, + } + } + + /// Parses this buffer to return a standard socket address. + /// + /// This function should be called after the buffer has been filled in with + /// a call to `recv_from_overlapped` being completed. It will interpret the + /// address filled in and return the standard socket address type. + /// + /// If an error is encountered then `None` is returned. + pub fn to_socket_addr(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(&self.buf as *const _ as *const _, self.len) } + } +} + +static GETACCEPTEXSOCKADDRS: WsaExtension = WsaExtension { + guid: GUID { + Data1: 0xb5367df2, + Data2: 0xcbac, + Data3: 0x11cf, + Data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92], + }, + val: AtomicUsize::new(0), +}; +type GetAcceptExSockaddrs = unsafe extern "system" fn( + PVOID, + DWORD, + DWORD, + DWORD, + *mut LPSOCKADDR, + LPINT, + *mut LPSOCKADDR, + LPINT, +); + +impl AcceptAddrsBuf { + /// Creates a new blank buffer ready to be passed to a call to + /// `accept_overlapped`. + pub fn new() -> AcceptAddrsBuf { + unsafe { mem::zeroed() } + } + + /// Parses the data contained in this address buffer, returning the parsed + /// result if successful. + /// + /// This function can be called after a call to `accept_overlapped` has + /// succeeded to parse out the data that was written in. + pub fn parse(&self, socket: &TcpListener) -> io::Result<AcceptAddrs> { + let mut ret = AcceptAddrs { + local: 0 as *mut _, + local_len: 0, + remote: 0 as *mut _, + remote_len: 0, + _data: self, + }; + let ptr = GETACCEPTEXSOCKADDRS.get(socket.as_raw_socket() as SOCKET)?; + assert!(ptr != 0); + unsafe { + let get_sockaddrs = mem::transmute::<_, GetAcceptExSockaddrs>(ptr); + let (a, b, c, d) = self.args(); + get_sockaddrs( + a, + b, + c, + d, + &mut ret.local, + &mut ret.local_len, + &mut ret.remote, + &mut ret.remote_len, + ); + Ok(ret) + } + } + + fn args(&self) -> (PVOID, DWORD, DWORD, DWORD) { + let remote_offset = unsafe { &(*(0 as *const AcceptAddrsBuf)).remote as *const _ as usize }; + ( + self as *const _ as *mut _, + 0, + remote_offset as DWORD, + (mem::size_of_val(self) - remote_offset) as DWORD, + ) + } +} + +impl<'a> AcceptAddrs<'a> { + /// Returns the local socket address contained in this buffer. + pub fn local(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(self.local, self.local_len) } + } + + /// Returns the remote socket address contained in this buffer. + pub fn remote(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(self.remote, self.remote_len) } + } +} + +impl WsaExtension { + fn get(&self, socket: SOCKET) -> io::Result<usize> { + let prev = self.val.load(Ordering::SeqCst); + if prev != 0 && !cfg!(debug_assertions) { + return Ok(prev); + } + let mut ret = 0 as usize; + let mut bytes = 0; + let r = unsafe { + WSAIoctl( + socket, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &self.guid as *const _ as *mut _, + mem::size_of_val(&self.guid) as DWORD, + &mut ret as *mut _ as *mut _, + mem::size_of_val(&ret) as DWORD, + &mut bytes, + 0 as *mut _, + None, + ) + }; + cvt(r, 0).map(|_| { + debug_assert_eq!(bytes as usize, mem::size_of_val(&ret)); + debug_assert!(prev == 0 || prev == ret); + self.val.store(ret, Ordering::SeqCst); + ret + }) + } +} + +#[cfg(test)] +mod tests { + use std::io::prelude::*; + use std::net::{ + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket, + }; + use std::slice; + use std::thread; + + use socket2::{Domain, Socket, Type}; + + use crate::iocp::CompletionPort; + use crate::net::{AcceptAddrsBuf, TcpListenerExt}; + use crate::net::{SocketAddrBuf, TcpStreamExt, UdpSocketExt}; + use crate::Overlapped; + + fn each_ip(f: &mut dyn FnMut(SocketAddr)) { + f(t!("127.0.0.1:0".parse())); + f(t!("[::1]:0".parse())); + } + + #[test] + fn tcp_read() { + each_ip(&mut |addr| { + let l = t!(TcpListener::bind(addr)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let mut a = t!(l.accept()).0; + t!(a.write_all(&[1, 2, 3])); + }); + + let cp = t!(CompletionPort::new(1)); + let s = t!(TcpStream::connect(addr)); + t!(cp.add_socket(1, &s)); + + let mut b = [0; 10]; + let a = Overlapped::zero(); + unsafe { + t!(s.read_overlapped(&mut b, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&b[0..3], &[1, 2, 3]); + + t!(t.join()); + }) + } + + #[test] + fn tcp_write() { + each_ip(&mut |addr| { + let l = t!(TcpListener::bind(addr)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let mut a = t!(l.accept()).0; + let mut b = [0; 10]; + let n = t!(a.read(&mut b)); + assert_eq!(n, 3); + assert_eq!(&b[0..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + let s = t!(TcpStream::connect(addr)); + t!(cp.add_socket(1, &s)); + + let b = [1, 2, 3]; + let a = Overlapped::zero(); + unsafe { + t!(s.write_overlapped(&b, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn tcp_connect() { + each_ip(&mut |addr_template| { + let l = t!(TcpListener::bind(addr_template)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + t!(l.accept()); + }); + + let cp = t!(CompletionPort::new(1)); + let domain = Domain::for_address(addr); + let socket = t!(Socket::new(domain, Type::STREAM, None)); + t!(socket.bind(&addr_template.into())); + let socket = TcpStream::from(socket); + t!(cp.add_socket(1, &socket)); + + let a = Overlapped::zero(); + unsafe { + t!(socket.connect_overlapped(&addr, &[], a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + t!(socket.connect_complete()); + + t!(t.join()); + }) + } + + #[test] + fn udp_recv_from() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + let t = thread::spawn(move || { + t!(a.send_to(&[1, 2, 3], b_addr)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let mut buf = [0; 10]; + let a = Overlapped::zero(); + let mut addr = SocketAddrBuf::new(); + unsafe { + t!(b.recv_from_overlapped(&mut buf, &mut addr, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&buf[..3], &[1, 2, 3]); + assert_eq!(addr.to_socket_addr(), Some(a_addr)); + + t!(t.join()); + }) + } + + #[test] + fn udp_recv() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + assert!(b.connect(a_addr).is_ok()); + assert!(a.connect(b_addr).is_ok()); + let t = thread::spawn(move || { + t!(a.send_to(&[1, 2, 3], b_addr)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let mut buf = [0; 10]; + let a = Overlapped::zero(); + unsafe { + t!(b.recv_overlapped(&mut buf, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&buf[..3], &[1, 2, 3]); + + t!(t.join()); + }) + } + + #[test] + fn udp_send_to() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + let t = thread::spawn(move || { + let mut b = [0; 100]; + let (n, addr) = t!(a.recv_from(&mut b)); + assert_eq!(n, 3); + assert_eq!(addr, b_addr); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let a = Overlapped::zero(); + unsafe { + t!(b.send_to_overlapped(&[1, 2, 3], &a_addr, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn udp_send() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + assert!(b.connect(a_addr).is_ok()); + assert!(a.connect(b_addr).is_ok()); + let t = thread::spawn(move || { + let mut b = [0; 100]; + let (n, addr) = t!(a.recv_from(&mut b)); + assert_eq!(n, 3); + assert_eq!(addr, b_addr); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let a = Overlapped::zero(); + unsafe { + t!(b.send_overlapped(&[1, 2, 3], a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn tcp_accept() { + each_ip(&mut |addr_template| { + let l = t!(TcpListener::bind(addr_template)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let socket = t!(TcpStream::connect(addr)); + (socket.local_addr().unwrap(), socket.peer_addr().unwrap()) + }); + + let cp = t!(CompletionPort::new(1)); + let domain = Domain::for_address(addr); + let socket = TcpStream::from(t!(Socket::new(domain, Type::STREAM, None))); + t!(cp.add_socket(1, &l)); + + let a = Overlapped::zero(); + let mut addrs = AcceptAddrsBuf::new(); + unsafe { + t!(l.accept_overlapped(&socket, &mut addrs, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + t!(l.accept_complete(&socket)); + + let (remote, local) = t!(t.join()); + let addrs = addrs.parse(&l).unwrap(); + assert_eq!(addrs.local(), Some(local)); + assert_eq!(addrs.remote(), Some(remote)); + }) + } + + #[test] + fn sockaddr_convert_4() { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(3, 4, 5, 6)), 0xabcd); + let (raw_addr, addr_len) = super::socket_addr_to_ptrs(&addr); + assert_eq!(addr_len, 16); + let addr_bytes = + unsafe { slice::from_raw_parts(raw_addr.as_ptr() as *const u8, addr_len as usize) }; + assert_eq!( + addr_bytes, + &[2, 0, 0xab, 0xcd, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0] + ); + } + + #[test] + fn sockaddr_convert_v6() { + let port = 0xabcd; + let flowinfo = 0x12345678; + let scope_id = 0x87654321; + let addr = SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new( + 0x0102, 0x0304, 0x0506, 0x0708, 0x090a, 0x0b0c, 0x0d0e, 0x0f10, + ), + port, + flowinfo, + scope_id, + )); + let (raw_addr, addr_len) = super::socket_addr_to_ptrs(&addr); + assert_eq!(addr_len, 28); + let addr_bytes = + unsafe { slice::from_raw_parts(raw_addr.as_ptr() as *const u8, addr_len as usize) }; + assert_eq!( + addr_bytes, + &[ + 23, 0, // AF_INET6 + 0xab, 0xcd, // Port + 0x78, 0x56, 0x34, 0x12, // flowinfo + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, + 0x0f, 0x10, // IP + 0x21, 0x43, 0x65, 0x87, // scope_id + ] + ); + } +} diff --git a/third_party/rust/miow/src/overlapped.rs b/third_party/rust/miow/src/overlapped.rs new file mode 100644 index 0000000000..abe2d37cbb --- /dev/null +++ b/third_party/rust/miow/src/overlapped.rs @@ -0,0 +1,92 @@ +use std::fmt; +use std::io; +use std::mem; +use std::ptr; + +use winapi::shared::ntdef::{HANDLE, NULL}; +use winapi::um::minwinbase::*; +use winapi::um::synchapi::*; + +/// A wrapper around `OVERLAPPED` to provide "rustic" accessors and +/// initializers. +pub struct Overlapped(OVERLAPPED); + +impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "OVERLAPPED") + } +} + +unsafe impl Send for Overlapped {} +unsafe impl Sync for Overlapped {} + +impl Overlapped { + /// Creates a new zeroed out instance of an overlapped I/O tracking state. + /// + /// This is suitable for passing to methods which will then later get + /// notified via an I/O Completion Port. + pub fn zero() -> Overlapped { + Overlapped(unsafe { mem::zeroed() }) + } + + /// Creates a new `Overlapped` with an initialized non-null `hEvent`. The caller is + /// responsible for calling `CloseHandle` on the `hEvent` field of the returned + /// `Overlapped`. The event is created with `bManualReset` set to `FALSE`, meaning after a + /// single thread waits on the event, it will be reset. + pub fn initialize_with_autoreset_event() -> io::Result<Overlapped> { + let event = unsafe { CreateEventW(ptr::null_mut(), 0i32, 0i32, ptr::null()) }; + if event == NULL { + return Err(io::Error::last_os_error()); + } + let mut overlapped = Self::zero(); + overlapped.set_event(event); + Ok(overlapped) + } + + /// Creates a new `Overlapped` function pointer from the underlying + /// `OVERLAPPED`, wrapping in the "rusty" wrapper for working with + /// accessors. + /// + /// # Unsafety + /// + /// This function doesn't validate `ptr` nor the lifetime of the returned + /// pointer at all, it's recommended to use this method with extreme + /// caution. + pub unsafe fn from_raw<'a>(ptr: *mut OVERLAPPED) -> &'a mut Overlapped { + &mut *(ptr as *mut Overlapped) + } + + /// Gain access to the raw underlying data + pub fn raw(&self) -> *mut OVERLAPPED { + &self.0 as *const _ as *mut _ + } + + /// Sets the offset inside this overlapped structure. + /// + /// Note that for I/O operations in general this only has meaning for I/O + /// handles that are on a seeking device that supports the concept of an + /// offset. + pub fn set_offset(&mut self, offset: u64) { + let s = unsafe { self.0.u.s_mut() }; + s.Offset = offset as u32; + s.OffsetHigh = (offset >> 32) as u32; + } + + /// Reads the offset inside this overlapped structure. + pub fn offset(&self) -> u64 { + let s = unsafe { self.0.u.s() }; + (s.Offset as u64) | ((s.OffsetHigh as u64) << 32) + } + + /// Sets the `hEvent` field of this structure. + /// + /// The event specified can be null. + pub fn set_event(&mut self, event: HANDLE) { + self.0.hEvent = event; + } + + /// Reads the `hEvent` field of this structure, may return null. + pub fn event(&self) -> HANDLE { + self.0.hEvent + } +} diff --git a/third_party/rust/miow/src/pipe.rs b/third_party/rust/miow/src/pipe.rs new file mode 100644 index 0000000000..5088021a86 --- /dev/null +++ b/third_party/rust/miow/src/pipe.rs @@ -0,0 +1,788 @@ +//! Interprocess Communication pipes +//! +//! A pipe is a section of shared memory that processes use for communication. +//! The process that creates a pipe is the _pipe server_. A process that connects +//! to a pipe is a _pipe client_. One process writes information to the pipe, then +//! the other process reads the information from the pipe. This overview +//! describes how to create, manage, and use pipes. +//! +//! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and +//! [named pipes](#fn.named.html). Anonymous pipes require less overhead than +//! named pipes, but offer limited services. +//! +//! # Anonymous pipes +//! +//! An anonymous pipe is an unnamed, one-way pipe that typically transfers data +//! between a parent process and a child process. Anonymous pipes are always +//! local; they cannot be used for communication over a network. +//! +//! # Named pipes +//! +//! A *named pipe* is a named, one-way or duplex pipe for communication between +//! the pipe server and one or more pipe clients. All instances of a named pipe +//! share the same pipe name, but each instance has its own buffers and handles, +//! and provides a separate conduit for client/server communication. The use of +//! instances enables multiple pipe clients to use the same named pipe +//! simultaneously. +//! +//! Any process can access named pipes, subject to security checks, making named +//! pipes an easy form of communication between related or unrelated processes. +//! +//! Any process can act as both a server and a client, making peer-to-peer +//! communication possible. As used here, the term pipe server refers to a +//! process that creates a named pipe, and the term pipe client refers to a +//! process that connects to an instance of a named pipe. +//! +//! Named pipes can be used to provide communication between processes on the +//! same computer or between processes on different computers across a network. +//! If the server service is running, all named pipes are accessible remotely. If +//! you intend to use a named pipe locally only, deny access to NT +//! AUTHORITY\\NETWORK or switch to local RPC. +//! +//! # References +//! +//! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md) + +use std::cell::RefCell; +use std::ffi::OsStr; +use std::fs::{File, OpenOptions}; +use std::io; +use std::io::prelude::*; +use std::os::windows::ffi::*; +use std::os::windows::io::*; +use std::time::Duration; + +use crate::handle::Handle; +use crate::overlapped::Overlapped; +use winapi::shared::minwindef::*; +use winapi::shared::ntdef::HANDLE; +use winapi::shared::winerror::*; +use winapi::um::fileapi::*; +use winapi::um::handleapi::*; +use winapi::um::ioapiset::*; +use winapi::um::minwinbase::*; +use winapi::um::namedpipeapi::*; +use winapi::um::winbase::*; + +/// Readable half of an anonymous pipe. +#[derive(Debug)] +pub struct AnonRead(Handle); + +/// Writable half of an anonymous pipe. +#[derive(Debug)] +pub struct AnonWrite(Handle); + +/// A named pipe that can accept connections. +#[derive(Debug)] +pub struct NamedPipe(Handle); + +/// A builder structure for creating a new named pipe. +#[derive(Debug)] +pub struct NamedPipeBuilder { + name: Vec<u16>, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, +} + +/// Creates a new anonymous in-memory pipe, returning the read/write ends of the +/// pipe. +/// +/// The buffer size for this pipe may also be specified, but the system will +/// normally use this as a suggestion and it's not guaranteed that the buffer +/// will be precisely this size. +pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> { + let mut read = 0 as HANDLE; + let mut write = 0 as HANDLE; + crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?; + Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write)))) +} + +impl Read for AnonRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } +} +impl<'a> Read for &'a AnonRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } +} + +impl AsRawHandle for AnonRead { + fn as_raw_handle(&self) -> HANDLE { + self.0.raw() + } +} +impl FromRawHandle for AnonRead { + unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead { + AnonRead(Handle::new(handle)) + } +} +impl IntoRawHandle for AnonRead { + fn into_raw_handle(self) -> HANDLE { + self.0.into_raw() + } +} + +impl Write for AnonWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} +impl<'a> Write for &'a AnonWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsRawHandle for AnonWrite { + fn as_raw_handle(&self) -> HANDLE { + self.0.raw() + } +} +impl FromRawHandle for AnonWrite { + unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite { + AnonWrite(Handle::new(handle)) + } +} +impl IntoRawHandle for AnonWrite { + fn into_raw_handle(self) -> HANDLE { + self.0.into_raw() + } +} + +/// A convenience function to connect to a named pipe. +/// +/// This function will block the calling process until it can connect to the +/// pipe server specified by `addr`. This will use `NamedPipe::wait` internally +/// to block until it can connect. +pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> { + _connect(addr.as_ref()) +} + +fn _connect(addr: &OsStr) -> io::Result<File> { + let mut r = OpenOptions::new(); + let mut w = OpenOptions::new(); + let mut rw = OpenOptions::new(); + r.read(true); + w.write(true); + rw.read(true).write(true); + loop { + let res = rw + .open(addr) + .or_else(|_| r.open(addr)) + .or_else(|_| w.open(addr)); + match res { + Ok(f) => return Ok(f), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {} + Err(e) => return Err(e), + } + + NamedPipe::wait(addr, Some(Duration::new(20, 0)))?; + } +} + +impl NamedPipe { + /// Creates a new initial named pipe. + /// + /// This function is equivalent to: + /// + /// ``` + /// use miow::pipe::NamedPipeBuilder; + /// + /// # let addr = "foo"; + /// NamedPipeBuilder::new(addr) + /// .first(true) + /// .inbound(true) + /// .outbound(true) + /// .out_buffer_size(65536) + /// .in_buffer_size(65536) + /// .create(); + /// ``` + pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> { + NamedPipeBuilder::new(addr).create() + } + + /// Waits until either a time-out interval elapses or an instance of the + /// specified named pipe is available for connection. + /// + /// If this function succeeds the process can create a `File` to connect to + /// the named pipe. + pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> { + NamedPipe::_wait(addr.as_ref(), timeout) + } + + fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> { + let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>(); + let timeout = crate::dur2ms(timeout); + crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr(), timeout) }).map(|_| ()) + } + + /// Connects this named pipe to a client, blocking until one becomes + /// available. + /// + /// This function will call the `ConnectNamedPipe` function to await for a + /// client to connect. This can be called immediately after the pipe is + /// created, or after it has been disconnected from a previous client. + pub fn connect(&self) -> io::Result<()> { + match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) { + Ok(_) => Ok(()), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()), + Err(e) => Err(e), + } + } + + /// Issue a connection request with the specified overlapped operation. + /// + /// This function will issue a request to connect a client to this server, + /// returning immediately after starting the overlapped operation. + /// + /// If this function immediately succeeds then `Ok(true)` is returned. If + /// the overlapped operation is enqueued and pending, then `Ok(false)` is + /// returned. Otherwise an error is returned indicating what went wrong. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` pointer is valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> { + match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) { + Ok(_) => Ok(true), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false), + Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true), + Err(e) => Err(e), + } + } + + /// Disconnects this named pipe from any connected client. + pub fn disconnect(&self) -> io::Result<()> { + crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ()) + } + + /// Issues an overlapped read operation to occur on this pipe. + /// + /// This function will issue an asynchronous read to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes read. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.0.read_overlapped(buf, overlapped) + } + + /// Issues an overlapped write operation to occur on this pipe. + /// + /// This function will issue an asynchronous write to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes written. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.0.write_overlapped(buf, overlapped) + } + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `Overlapped` instance. + /// + /// # Panics + /// + /// This function will panic + pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> { + let mut transferred = 0; + let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(transferred as usize) + } + } +} + +thread_local! { + static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None); +} + +/// Call a function with a threadlocal `Overlapped`. The function `f` should be +/// sure that the event is reset, either manually or by a thread being released. +fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize> +where + F: FnOnce(&Overlapped) -> io::Result<usize>, +{ + NAMED_PIPE_OVERLAPPED.with(|overlapped| { + let mut mborrow = overlapped.borrow_mut(); + if let None = *mborrow { + let op = Overlapped::initialize_with_autoreset_event()?; + *mborrow = Some(op); + } + f(mborrow.as_ref().unwrap()) + }) +} + +impl Read for NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } +} +impl<'a> Read for &'a NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } +} + +impl Write for NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } + fn flush(&mut self) -> io::Result<()> { + <&NamedPipe as Write>::flush(&mut &*self) + } +} +impl<'a> Write for &'a NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } + fn flush(&mut self) -> io::Result<()> { + crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ()) + } +} + +impl AsRawHandle for NamedPipe { + fn as_raw_handle(&self) -> HANDLE { + self.0.raw() + } +} +impl FromRawHandle for NamedPipe { + unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe { + NamedPipe(Handle::new(handle)) + } +} +impl IntoRawHandle for NamedPipe { + fn into_raw_handle(self) -> HANDLE { + self.0.into_raw() + } +} + +fn flag(slot: &mut DWORD, on: bool, val: DWORD) { + if on { + *slot |= val; + } else { + *slot &= !val; + } +} + +impl NamedPipeBuilder { + /// Creates a new named pipe builder with the default settings. + pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder { + NamedPipeBuilder { + name: addr.as_ref().encode_wide().chain(Some(0)).collect(), + dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, + dwPipeMode: PIPE_TYPE_BYTE, + nMaxInstances: PIPE_UNLIMITED_INSTANCES, + nOutBufferSize: 65536, + nInBufferSize: 65536, + nDefaultTimeOut: 0, + } + } + + /// Indicates whether data is allowed to flow from the client to the server. + pub fn inbound(&mut self, allowed: bool) -> &mut Self { + flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND); + self + } + + /// Indicates whether data is allowed to flow from the server to the client. + pub fn outbound(&mut self, allowed: bool) -> &mut Self { + flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND); + self + } + + /// Indicates that this pipe must be the first instance. + /// + /// If set to true, then creation will fail if there's already an instance + /// elsewhere. + pub fn first(&mut self, first: bool) -> &mut Self { + flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE); + self + } + + /// Indicates whether this server can accept remote clients or not. + pub fn accept_remote(&mut self, accept: bool) -> &mut Self { + flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS); + self + } + + /// Specifies the maximum number of instances of the server pipe that are + /// allowed. + /// + /// The first instance of a pipe can specify this value. A value of 255 + /// indicates that there is no limit to the number of instances. + pub fn max_instances(&mut self, instances: u8) -> &mut Self { + self.nMaxInstances = instances as DWORD; + self + } + + /// Specifies the number of bytes to reserver for the output buffer + pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self { + self.nOutBufferSize = buffer as DWORD; + self + } + + /// Specifies the number of bytes to reserver for the input buffer + pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self { + self.nInBufferSize = buffer as DWORD; + self + } + + /// Using the options in this builder, attempt to create a new named pipe. + /// + /// This function will call the `CreateNamedPipe` function and return the + /// result. + pub fn create(&mut self) -> io::Result<NamedPipe> { + unsafe { self.with_security_attributes(::std::ptr::null_mut()) } + } + + /// Using the options in the builder and the provided security attributes, attempt to create a + /// new named pipe. This function has to be called with a valid pointer to a + /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a + /// null pointer. + /// + /// This function will call the `CreateNamedPipe` function and return the + /// result. + pub unsafe fn with_security_attributes( + &mut self, + attrs: *mut SECURITY_ATTRIBUTES, + ) -> io::Result<NamedPipe> { + let h = CreateNamedPipeW( + self.name.as_ptr(), + self.dwOpenMode, + self.dwPipeMode, + self.nMaxInstances, + self.nOutBufferSize, + self.nInBufferSize, + self.nDefaultTimeOut, + attrs, + ); + + if h == INVALID_HANDLE_VALUE { + Err(io::Error::last_os_error()) + } else { + Ok(NamedPipe(Handle::new(h))) + } + } +} + +#[cfg(test)] +mod tests { + use std::fs::{File, OpenOptions}; + use std::io::prelude::*; + use std::sync::mpsc::channel; + use std::thread; + use std::time::Duration; + + use rand::{distributions::Alphanumeric, thread_rng, Rng}; + + use super::{anonymous, NamedPipe, NamedPipeBuilder}; + use crate::iocp::CompletionPort; + use crate::Overlapped; + + fn name() -> String { + let name = thread_rng() + .sample_iter(Alphanumeric) + .take(30) + .map(char::from) + .collect::<String>(); + format!(r"\\.\pipe\{}", name) + } + + #[test] + fn anon() { + let (mut read, mut write) = t!(anonymous(256)); + assert_eq!(t!(write.write(&[1, 2, 3])), 3); + let mut b = [0; 10]; + assert_eq!(t!(read.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + } + + #[test] + fn named_not_first() { + let name = name(); + let _a = t!(NamedPipe::new(&name)); + assert!(NamedPipe::new(&name).is_err()); + + t!(NamedPipeBuilder::new(&name).first(false).create()); + } + + #[test] + fn named_connect() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + t!(File::open(name)); + }); + + t!(a.connect()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_wait() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let (tx, rx) = channel(); + let t = thread::spawn(move || { + t!(NamedPipe::wait(&name, None)); + t!(File::open(&name)); + assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err()); + t!(tx.send(())); + }); + + t!(a.connect()); + t!(rx.recv()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_connect_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + t!(File::open(name)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(2, &a)); + + let over = Overlapped::zero(); + unsafe { + t!(a.connect_overlapped(over.raw())); + } + + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 2); + assert_eq!(status.overlapped(), over.raw()); + t!(t.join()); + } + + #[test] + fn named_read_write() { + let name = name(); + let mut a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(OpenOptions::new().read(true).write(true).open(name)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + t!(a.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a.write_all(&[1, 2, 3])); + t!(a.flush()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_read_write_multi() { + for _ in 0..5 { + named_read_write() + } + } + + #[test] + fn named_read_write_multi_same_thread() { + let name1 = name(); + let mut a1 = t!(NamedPipe::new(&name1)); + let name2 = name(); + let mut a2 = t!(NamedPipe::new(&name2)); + + let t = thread::spawn(move || { + let mut f = t!(OpenOptions::new().read(true).write(true).open(name1)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + + let mut f = t!(OpenOptions::new().read(true).write(true).open(name2)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + t!(a1.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a1.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a1.write_all(&[1, 2, 3])); + t!(a1.flush()); + t!(a1.disconnect()); + + t!(a2.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a2.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a2.write_all(&[1, 2, 3])); + t!(a2.flush()); + t!(a2.disconnect()); + + t!(t.join()); + } + + #[test] + fn named_read_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(File::create(name)); + t!(f.write_all(&[1, 2, 3])); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(3, &a)); + t!(a.connect()); + + let mut b = [0; 10]; + let over = Overlapped::zero(); + unsafe { + t!(a.read_overlapped(&mut b, over.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 3); + assert_eq!(status.overlapped(), over.raw()); + assert_eq!(&b[..3], &[1, 2, 3]); + + t!(t.join()); + } + + #[test] + fn named_write_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(super::connect(name)); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]) + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(3, &a)); + t!(a.connect()); + + let over = Overlapped::zero(); + unsafe { + t!(a.write_overlapped(&[1, 2, 3], over.raw())); + } + + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 3); + assert_eq!(status.overlapped(), over.raw()); + + t!(t.join()); + } +} |