diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:18:25 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:18:25 +0000 |
commit | 5363f350887b1e5b5dd21a86f88c8af9d7fea6da (patch) | |
tree | 35ca005eb6e0e9a1ba3bb5dbc033209ad445dc17 /vendor/miow/src | |
parent | Adding debian version 1.66.0+dfsg1-1. (diff) | |
download | rustc-5363f350887b1e5b5dd21a86f88c8af9d7fea6da.tar.xz rustc-5363f350887b1e5b5dd21a86f88c8af9d7fea6da.zip |
Merging upstream version 1.67.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/miow/src')
-rw-r--r-- | vendor/miow/src/handle.rs | 347 | ||||
-rw-r--r-- | vendor/miow/src/iocp.rs | 654 | ||||
-rw-r--r-- | vendor/miow/src/lib.rs | 108 | ||||
-rw-r--r-- | vendor/miow/src/net.rs | 2596 | ||||
-rw-r--r-- | vendor/miow/src/overlapped.rs | 186 | ||||
-rw-r--r-- | vendor/miow/src/pipe.rs | 1580 |
6 files changed, 2745 insertions, 2726 deletions
diff --git a/vendor/miow/src/handle.rs b/vendor/miow/src/handle.rs index f50c49d50..5302246ea 100644 --- a/vendor/miow/src/handle.rs +++ b/vendor/miow/src/handle.rs @@ -1,173 +1,174 @@ -use crate::*;
-use std::cmp;
-use std::io;
-use std::ptr;
-
-use windows_sys::Win32::Storage::FileSystem::*;
-use windows_sys::Win32::System::IO::*;
-
-#[derive(Debug)]
-pub struct Handle(HANDLE);
-
-unsafe impl Send for Handle {}
-unsafe impl Sync for Handle {}
-
-impl Handle {
- pub fn new(handle: HANDLE) -> Handle {
- Handle(handle)
- }
-
- pub fn raw(&self) -> HANDLE {
- self.0
- }
-
- pub fn into_raw(self) -> HANDLE {
- use std::mem;
-
- let ret = self.0;
- mem::forget(self);
- ret
- }
-
- pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
- let mut bytes = 0;
- let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32;
- crate::cvt(unsafe {
- WriteFile(
- self.0,
- buf.as_ptr() as *const _,
- len,
- &mut bytes,
- 0 as *mut _,
- )
- })?;
- Ok(bytes as usize)
- }
-
- pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
- let mut bytes = 0;
- let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32;
- crate::cvt(unsafe {
- ReadFile(
- self.0,
- buf.as_mut_ptr() as *mut _,
- len,
- &mut bytes,
- 0 as *mut _,
- )
- })?;
- Ok(bytes as usize)
- }
-
- pub unsafe fn read_overlapped(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- self.read_overlapped_helper(buf, overlapped, FALSE)
- }
-
- pub unsafe fn read_overlapped_wait(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<usize> {
- match self.read_overlapped_helper(buf, overlapped, TRUE) {
- Ok(Some(bytes)) => Ok(bytes),
- Ok(None) => panic!("logic error"),
- Err(e) => Err(e),
- }
- }
-
- pub unsafe fn read_overlapped_helper(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- wait: BOOL,
- ) -> io::Result<Option<usize>> {
- let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32;
- let res = crate::cvt({
- ReadFile(
- self.0,
- buf.as_mut_ptr() as *mut _,
- len,
- ptr::null_mut(),
- overlapped,
- )
- });
- match res {
- Ok(_) => (),
- Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => (),
- Err(e) => return Err(e),
- }
-
- let mut bytes = 0;
- let res = crate::cvt(GetOverlappedResult(self.0, overlapped, &mut bytes, wait));
- match res {
- Ok(_) => Ok(Some(bytes as usize)),
- Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE => {
- Ok(None)
- }
- Err(e) => Err(e),
- }
- }
-
- pub unsafe fn write_overlapped(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- self.write_overlapped_helper(buf, overlapped, FALSE)
- }
-
- pub unsafe fn write_overlapped_wait(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<usize> {
- match self.write_overlapped_helper(buf, overlapped, TRUE) {
- Ok(Some(bytes)) => Ok(bytes),
- Ok(None) => panic!("logic error"),
- Err(e) => Err(e),
- }
- }
-
- unsafe fn write_overlapped_helper(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- wait: BOOL,
- ) -> io::Result<Option<usize>> {
- let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32;
- let res = crate::cvt({
- WriteFile(
- self.0,
- buf.as_ptr() as *const _,
- len,
- ptr::null_mut(),
- overlapped,
- )
- });
- match res {
- Ok(_) => (),
- Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => (),
- Err(e) => return Err(e),
- }
-
- let mut bytes = 0;
- let res = crate::cvt(GetOverlappedResult(self.0, overlapped, &mut bytes, wait));
- match res {
- Ok(_) => Ok(Some(bytes as usize)),
- Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE => {
- Ok(None)
- }
- Err(e) => Err(e),
- }
- }
-}
-
-impl Drop for Handle {
- fn drop(&mut self) {
- unsafe { CloseHandle(self.0) };
- }
-}
+use crate::{BOOL, FALSE, TRUE}; +use std::cmp; +use std::io; +use std::ptr; + +use windows_sys::Win32::Foundation::{CloseHandle, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, HANDLE}; +use windows_sys::Win32::Storage::FileSystem::{ReadFile, WriteFile}; +use windows_sys::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; + +#[derive(Debug)] +pub struct Handle(HANDLE); + +unsafe impl Send for Handle {} +unsafe impl Sync for Handle {} + +impl Handle { + pub fn new(handle: HANDLE) -> Handle { + Handle(handle) + } + + pub fn raw(&self) -> HANDLE { + self.0 + } + + pub fn into_raw(self) -> HANDLE { + use std::mem; + + let ret = self.0; + mem::forget(self); + ret + } + + pub fn write(&self, buf: &[u8]) -> io::Result<usize> { + let mut bytes = 0; + let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32; + crate::cvt(unsafe { + WriteFile( + self.0, + buf.as_ptr() as *const _, + len, + &mut bytes, + 0 as *mut _, + ) + })?; + Ok(bytes as usize) + } + + pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { + let mut bytes = 0; + let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32; + crate::cvt(unsafe { + ReadFile( + self.0, + buf.as_mut_ptr() as *mut _, + len, + &mut bytes, + 0 as *mut _, + ) + })?; + Ok(bytes as usize) + } + + pub unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.read_overlapped_helper(buf, overlapped, FALSE) + } + + pub unsafe fn read_overlapped_wait( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<usize> { + match self.read_overlapped_helper(buf, overlapped, TRUE) { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => panic!("logic error"), + Err(e) => Err(e), + } + } + + pub unsafe fn read_overlapped_helper( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + wait: BOOL, + ) -> io::Result<Option<usize>> { + let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32; + let res = crate::cvt({ + ReadFile( + self.0, + buf.as_mut_ptr() as *mut _, + len, + ptr::null_mut(), + overlapped, + ) + }); + match res { + Ok(_) => (), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => (), + Err(e) => return Err(e), + } + + let mut bytes = 0; + let res = crate::cvt(GetOverlappedResult(self.0, overlapped, &mut bytes, wait)); + match res { + Ok(_) => Ok(Some(bytes as usize)), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE => { + Ok(None) + } + Err(e) => Err(e), + } + } + + pub unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.write_overlapped_helper(buf, overlapped, FALSE) + } + + pub unsafe fn write_overlapped_wait( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<usize> { + match self.write_overlapped_helper(buf, overlapped, TRUE) { + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => panic!("logic error"), + Err(e) => Err(e), + } + } + + unsafe fn write_overlapped_helper( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + wait: BOOL, + ) -> io::Result<Option<usize>> { + let len = cmp::min(buf.len(), <u32>::max_value() as usize) as u32; + let res = crate::cvt({ + WriteFile( + self.0, + buf.as_ptr() as *const _, + len, + ptr::null_mut(), + overlapped, + ) + }); + match res { + Ok(_) => (), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => (), + Err(e) => return Err(e), + } + + let mut bytes = 0; + let res = crate::cvt(GetOverlappedResult(self.0, overlapped, &mut bytes, wait)); + match res { + Ok(_) => Ok(Some(bytes as usize)), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE => { + Ok(None) + } + Err(e) => Err(e), + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + unsafe { CloseHandle(self.0) }; + } +} diff --git a/vendor/miow/src/iocp.rs b/vendor/miow/src/iocp.rs index 1e08b0495..c246c3a4d 100644 --- a/vendor/miow/src/iocp.rs +++ b/vendor/miow/src/iocp.rs @@ -1,325 +1,329 @@ -//! Bindings to IOCP, I/O Completion Ports
-
-use crate::*;
-use std::cmp;
-use std::fmt;
-use std::io;
-use std::mem;
-use std::os::windows::io::*;
-use std::time::Duration;
-
-use crate::handle::Handle;
-use crate::Overlapped;
-use windows_sys::Win32::System::IO::*;
-
-/// A handle to an Windows I/O Completion Port.
-#[derive(Debug)]
-pub struct CompletionPort {
- handle: Handle,
-}
-
-/// A status message received from an I/O completion port.
-///
-/// These statuses can be created via the `new` or `empty` constructors and then
-/// provided to a completion port, or they are read out of a completion port.
-/// The fields of each status are read through its accessor methods.
-#[derive(Clone, Copy)]
-#[repr(transparent)]
-pub struct CompletionStatus(OVERLAPPED_ENTRY);
-
-impl fmt::Debug for CompletionStatus {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "CompletionStatus(OVERLAPPED_ENTRY)")
- }
-}
-
-unsafe impl Send for CompletionStatus {}
-unsafe impl Sync for CompletionStatus {}
-
-impl CompletionPort {
- /// Creates a new I/O completion port with the specified concurrency value.
- ///
- /// The number of threads given corresponds to the level of concurrency
- /// allowed for threads associated with this port. Consult the Windows
- /// documentation for more information about this value.
- pub fn new(threads: u32) -> io::Result<CompletionPort> {
- let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0 as *mut _, 0, threads) };
- if ret.is_null() {
- Err(io::Error::last_os_error())
- } else {
- Ok(CompletionPort {
- handle: Handle::new(ret),
- })
- }
- }
-
- /// Associates a new `HANDLE` to this I/O completion port.
- ///
- /// This function will associate the given handle to this port with the
- /// given `token` to be returned in status messages whenever it receives a
- /// notification.
- ///
- /// Any object which is convertible to a `HANDLE` via the `AsRawHandle`
- /// trait can be provided to this function, such as `std::fs::File` and
- /// friends.
- pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
- self._add(token, t.as_raw_handle())
- }
-
- /// Associates a new `SOCKET` to this I/O completion port.
- ///
- /// This function will associate the given socket to this port with the
- /// given `token` to be returned in status messages whenever it receives a
- /// notification.
- ///
- /// Any object which is convertible to a `SOCKET` via the `AsRawSocket`
- /// trait can be provided to this function, such as `std::net::TcpStream`
- /// and friends.
- pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> {
- self._add(token, t.as_raw_socket() as HANDLE)
- }
-
- fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> {
- assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>());
- let ret = unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token as usize, 0) };
- if ret.is_null() {
- Err(io::Error::last_os_error())
- } else {
- debug_assert_eq!(ret, self.handle.raw());
- Ok(())
- }
- }
-
- /// Dequeue a completion status from this I/O completion port.
- ///
- /// This function will associate the calling thread with this completion
- /// port and then wait for a status message to become available. The precise
- /// semantics on when this function returns depends on the concurrency value
- /// specified when the port was created.
- ///
- /// A timeout can optionally be specified to this function. If `None` is
- /// provided this function will not time out, and otherwise it will time out
- /// after the specified duration has passed.
- ///
- /// On success this will return the status message which was dequeued from
- /// this completion port.
- pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> {
- let mut bytes = 0;
- let mut token = 0;
- let mut overlapped = 0 as *mut _;
- let timeout = crate::dur2ms(timeout);
- let ret = unsafe {
- GetQueuedCompletionStatus(
- self.handle.raw(),
- &mut bytes,
- &mut token,
- &mut overlapped,
- timeout,
- )
- };
- crate::cvt(ret).map(|_| {
- CompletionStatus(OVERLAPPED_ENTRY {
- dwNumberOfBytesTransferred: bytes,
- lpCompletionKey: token,
- lpOverlapped: overlapped,
- Internal: 0,
- })
- })
- }
-
- /// Dequeues a number of completion statuses from this I/O completion port.
- ///
- /// This function is the same as `get` except that it may return more than
- /// one status. A buffer of "zero" statuses is provided (the contents are
- /// not read) and then on success this function will return a sub-slice of
- /// statuses which represent those which were dequeued from this port. This
- /// function does not wait to fill up the entire list of statuses provided.
- ///
- /// Like with `get`, a timeout may be specified for this operation.
- pub fn get_many<'a>(
- &self,
- list: &'a mut [CompletionStatus],
- timeout: Option<Duration>,
- ) -> io::Result<&'a mut [CompletionStatus]> {
- debug_assert_eq!(
- mem::size_of::<CompletionStatus>(),
- mem::size_of::<OVERLAPPED_ENTRY>()
- );
- let mut removed = 0;
- let timeout = crate::dur2ms(timeout);
- let len = cmp::min(list.len(), <u32>::max_value() as usize) as u32;
- let ret = unsafe {
- GetQueuedCompletionStatusEx(
- self.handle.raw(),
- list.as_ptr() as *mut _,
- len,
- &mut removed,
- timeout,
- FALSE as i32,
- )
- };
- match crate::cvt(ret) {
- Ok(_) => Ok(&mut list[..removed as usize]),
- Err(e) => Err(e),
- }
- }
-
- /// Posts a new completion status onto this I/O completion port.
- ///
- /// This function will post the given status, with custom parameters, to the
- /// port. Threads blocked in `get` or `get_many` will eventually receive
- /// this status.
- pub fn post(&self, status: CompletionStatus) -> io::Result<()> {
- let ret = unsafe {
- PostQueuedCompletionStatus(
- self.handle.raw(),
- status.0.dwNumberOfBytesTransferred,
- status.0.lpCompletionKey,
- status.0.lpOverlapped,
- )
- };
- crate::cvt(ret).map(|_| ())
- }
-}
-
-impl AsRawHandle for CompletionPort {
- fn as_raw_handle(&self) -> HANDLE {
- self.handle.raw()
- }
-}
-
-impl FromRawHandle for CompletionPort {
- unsafe fn from_raw_handle(handle: HANDLE) -> CompletionPort {
- CompletionPort {
- handle: Handle::new(handle),
- }
- }
-}
-
-impl IntoRawHandle for CompletionPort {
- fn into_raw_handle(self) -> HANDLE {
- self.handle.into_raw()
- }
-}
-
-impl CompletionStatus {
- /// Creates a new completion status with the provided parameters.
- ///
- /// This function is useful when creating a status to send to a port with
- /// the `post` method. The parameters are opaquely passed through and not
- /// interpreted by the system at all.
- pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus {
- assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>());
- CompletionStatus(OVERLAPPED_ENTRY {
- dwNumberOfBytesTransferred: bytes,
- lpCompletionKey: token as usize,
- lpOverlapped: overlapped as *mut _,
- Internal: 0,
- })
- }
-
- /// Creates a new borrowed completion status from the borrowed
- /// `OVERLAPPED_ENTRY` argument provided.
- ///
- /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`,
- /// returning the wrapped structure.
- pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus {
- // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so
- // a reference to one is guaranteed to be layout compatible with the
- // reference to another.
- unsafe { &*(entry as *const _ as *const _) }
- }
-
- /// Creates a new "zero" completion status.
- ///
- /// This function is useful when creating a stack buffer or vector of
- /// completion statuses to be passed to the `get_many` function.
- pub fn zero() -> CompletionStatus {
- CompletionStatus::new(0, 0, 0 as *mut _)
- }
-
- /// Returns the number of bytes that were transferred for the I/O operation
- /// associated with this completion status.
- pub fn bytes_transferred(&self) -> u32 {
- self.0.dwNumberOfBytesTransferred
- }
-
- /// Returns the completion key value associated with the file handle whose
- /// I/O operation has completed.
- ///
- /// A completion key is a per-handle key that is specified when it is added
- /// to an I/O completion port via `add_handle` or `add_socket`.
- pub fn token(&self) -> usize {
- self.0.lpCompletionKey as usize
- }
-
- /// Returns a pointer to the `Overlapped` structure that was specified when
- /// the I/O operation was started.
- pub fn overlapped(&self) -> *mut OVERLAPPED {
- self.0.lpOverlapped
- }
-
- /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object.
- pub fn entry(&self) -> &OVERLAPPED_ENTRY {
- &self.0
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::iocp::{CompletionPort, CompletionStatus};
- use std::mem;
- use std::time::Duration;
- use windows_sys::Win32::Foundation::*;
-
- #[test]
- fn is_send_sync() {
- fn is_send_sync<T: Send + Sync>() {}
- is_send_sync::<CompletionPort>();
- }
-
- #[test]
- fn token_right_size() {
- assert_eq!(mem::size_of::<usize>(), mem::size_of::<usize>());
- }
-
- #[test]
- fn timeout() {
- let c = CompletionPort::new(1).unwrap();
- let err = c.get(Some(Duration::from_millis(1))).unwrap_err();
- assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32));
- }
-
- #[test]
- fn get() {
- let c = CompletionPort::new(1).unwrap();
- c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
- let s = c.get(None).unwrap();
- assert_eq!(s.bytes_transferred(), 1);
- assert_eq!(s.token(), 2);
- assert_eq!(s.overlapped(), 3 as *mut _);
- }
-
- #[test]
- fn get_many() {
- let c = CompletionPort::new(1).unwrap();
-
- c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap();
- c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap();
-
- let mut s = vec![CompletionStatus::zero(); 4];
- {
- let s = c.get_many(&mut s, None).unwrap();
- assert_eq!(s.len(), 2);
- assert_eq!(s[0].bytes_transferred(), 1);
- assert_eq!(s[0].token(), 2);
- assert_eq!(s[0].overlapped(), 3 as *mut _);
- assert_eq!(s[1].bytes_transferred(), 4);
- assert_eq!(s[1].token(), 5);
- assert_eq!(s[1].overlapped(), 6 as *mut _);
- }
- assert_eq!(s[2].bytes_transferred(), 0);
- assert_eq!(s[2].token(), 0);
- assert_eq!(s[2].overlapped(), 0 as *mut _);
- }
-}
+//! Bindings to IOCP, I/O Completion Ports + +use crate::FALSE; +use std::cmp; +use std::fmt; +use std::io; +use std::mem; +use std::os::windows::io::{AsRawHandle, AsRawSocket, FromRawHandle, IntoRawHandle, RawHandle}; +use std::time::Duration; + +use crate::handle::Handle; +use crate::Overlapped; +use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE}; +use windows_sys::Win32::System::IO::{ + CreateIoCompletionPort, GetQueuedCompletionStatus, GetQueuedCompletionStatusEx, + PostQueuedCompletionStatus, OVERLAPPED, OVERLAPPED_ENTRY, +}; + +/// A handle to an Windows I/O Completion Port. +#[derive(Debug)] +pub struct CompletionPort { + handle: Handle, +} + +/// A status message received from an I/O completion port. +/// +/// These statuses can be created via the `new` or `empty` constructors and then +/// provided to a completion port, or they are read out of a completion port. +/// The fields of each status are read through its accessor methods. +#[derive(Clone, Copy)] +#[repr(transparent)] +pub struct CompletionStatus(OVERLAPPED_ENTRY); + +impl fmt::Debug for CompletionStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "CompletionStatus(OVERLAPPED_ENTRY)") + } +} + +unsafe impl Send for CompletionStatus {} +unsafe impl Sync for CompletionStatus {} + +impl CompletionPort { + /// Creates a new I/O completion port with the specified concurrency value. + /// + /// The number of threads given corresponds to the level of concurrency + /// allowed for threads associated with this port. Consult the Windows + /// documentation for more information about this value. + pub fn new(threads: u32) -> io::Result<CompletionPort> { + let ret = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, threads) }; + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(CompletionPort { + handle: Handle::new(ret), + }) + } + } + + /// Associates a new `HANDLE` to this I/O completion port. + /// + /// This function will associate the given handle to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `HANDLE` via the `AsRawHandle` + /// trait can be provided to this function, such as `std::fs::File` and + /// friends. + pub fn add_handle<T: AsRawHandle + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> { + self._add(token, t.as_raw_handle() as HANDLE) + } + + /// Associates a new `SOCKET` to this I/O completion port. + /// + /// This function will associate the given socket to this port with the + /// given `token` to be returned in status messages whenever it receives a + /// notification. + /// + /// Any object which is convertible to a `SOCKET` via the `AsRawSocket` + /// trait can be provided to this function, such as `std::net::TcpStream` + /// and friends. + pub fn add_socket<T: AsRawSocket + ?Sized>(&self, token: usize, t: &T) -> io::Result<()> { + self._add(token, t.as_raw_socket() as HANDLE) + } + + fn _add(&self, token: usize, handle: HANDLE) -> io::Result<()> { + assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>()); + let ret = unsafe { CreateIoCompletionPort(handle, self.handle.raw(), token as usize, 0) }; + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + debug_assert_eq!(ret, self.handle.raw()); + Ok(()) + } + } + + /// Dequeue a completion status from this I/O completion port. + /// + /// This function will associate the calling thread with this completion + /// port and then wait for a status message to become available. The precise + /// semantics on when this function returns depends on the concurrency value + /// specified when the port was created. + /// + /// A timeout can optionally be specified to this function. If `None` is + /// provided this function will not time out, and otherwise it will time out + /// after the specified duration has passed. + /// + /// On success this will return the status message which was dequeued from + /// this completion port. + pub fn get(&self, timeout: Option<Duration>) -> io::Result<CompletionStatus> { + let mut bytes = 0; + let mut token = 0; + let mut overlapped = 0 as *mut _; + let timeout = crate::dur2ms(timeout); + let ret = unsafe { + GetQueuedCompletionStatus( + self.handle.raw(), + &mut bytes, + &mut token, + &mut overlapped, + timeout, + ) + }; + crate::cvt(ret).map(|_| { + CompletionStatus(OVERLAPPED_ENTRY { + dwNumberOfBytesTransferred: bytes, + lpCompletionKey: token, + lpOverlapped: overlapped, + Internal: 0, + }) + }) + } + + /// Dequeues a number of completion statuses from this I/O completion port. + /// + /// This function is the same as `get` except that it may return more than + /// one status. A buffer of "zero" statuses is provided (the contents are + /// not read) and then on success this function will return a sub-slice of + /// statuses which represent those which were dequeued from this port. This + /// function does not wait to fill up the entire list of statuses provided. + /// + /// Like with `get`, a timeout may be specified for this operation. + pub fn get_many<'a>( + &self, + list: &'a mut [CompletionStatus], + timeout: Option<Duration>, + ) -> io::Result<&'a mut [CompletionStatus]> { + debug_assert_eq!( + mem::size_of::<CompletionStatus>(), + mem::size_of::<OVERLAPPED_ENTRY>() + ); + let mut removed = 0; + let timeout = crate::dur2ms(timeout); + let len = cmp::min(list.len(), <u32>::max_value() as usize) as u32; + let ret = unsafe { + GetQueuedCompletionStatusEx( + self.handle.raw(), + list.as_ptr() as *mut _, + len, + &mut removed, + timeout, + FALSE as i32, + ) + }; + match crate::cvt(ret) { + Ok(_) => Ok(&mut list[..removed as usize]), + Err(e) => Err(e), + } + } + + /// Posts a new completion status onto this I/O completion port. + /// + /// This function will post the given status, with custom parameters, to the + /// port. Threads blocked in `get` or `get_many` will eventually receive + /// this status. + pub fn post(&self, status: CompletionStatus) -> io::Result<()> { + let ret = unsafe { + PostQueuedCompletionStatus( + self.handle.raw(), + status.0.dwNumberOfBytesTransferred, + status.0.lpCompletionKey, + status.0.lpOverlapped, + ) + }; + crate::cvt(ret).map(|_| ()) + } +} + +impl AsRawHandle for CompletionPort { + fn as_raw_handle(&self) -> RawHandle { + self.handle.raw() as RawHandle + } +} + +impl FromRawHandle for CompletionPort { + unsafe fn from_raw_handle(handle: RawHandle) -> CompletionPort { + CompletionPort { + handle: Handle::new(handle as HANDLE), + } + } +} + +impl IntoRawHandle for CompletionPort { + fn into_raw_handle(self) -> RawHandle { + self.handle.into_raw() as RawHandle + } +} + +impl CompletionStatus { + /// Creates a new completion status with the provided parameters. + /// + /// This function is useful when creating a status to send to a port with + /// the `post` method. The parameters are opaquely passed through and not + /// interpreted by the system at all. + pub fn new(bytes: u32, token: usize, overlapped: *mut Overlapped) -> CompletionStatus { + assert_eq!(mem::size_of_val(&token), mem::size_of::<usize>()); + CompletionStatus(OVERLAPPED_ENTRY { + dwNumberOfBytesTransferred: bytes, + lpCompletionKey: token as usize, + lpOverlapped: overlapped as *mut _, + Internal: 0, + }) + } + + /// Creates a new borrowed completion status from the borrowed + /// `OVERLAPPED_ENTRY` argument provided. + /// + /// This method will wrap the `OVERLAPPED_ENTRY` in a `CompletionStatus`, + /// returning the wrapped structure. + pub fn from_entry(entry: &OVERLAPPED_ENTRY) -> &CompletionStatus { + // Safety: CompletionStatus is repr(transparent) w/ OVERLAPPED_ENTRY, so + // a reference to one is guaranteed to be layout compatible with the + // reference to another. + unsafe { &*(entry as *const _ as *const _) } + } + + /// Creates a new "zero" completion status. + /// + /// This function is useful when creating a stack buffer or vector of + /// completion statuses to be passed to the `get_many` function. + pub fn zero() -> CompletionStatus { + CompletionStatus::new(0, 0, 0 as *mut _) + } + + /// Returns the number of bytes that were transferred for the I/O operation + /// associated with this completion status. + pub fn bytes_transferred(&self) -> u32 { + self.0.dwNumberOfBytesTransferred + } + + /// Returns the completion key value associated with the file handle whose + /// I/O operation has completed. + /// + /// A completion key is a per-handle key that is specified when it is added + /// to an I/O completion port via `add_handle` or `add_socket`. + pub fn token(&self) -> usize { + self.0.lpCompletionKey as usize + } + + /// Returns a pointer to the `Overlapped` structure that was specified when + /// the I/O operation was started. + pub fn overlapped(&self) -> *mut OVERLAPPED { + self.0.lpOverlapped + } + + /// Returns a pointer to the internal `OVERLAPPED_ENTRY` object. + pub fn entry(&self) -> &OVERLAPPED_ENTRY { + &self.0 + } +} + +#[cfg(test)] +mod tests { + use crate::iocp::{CompletionPort, CompletionStatus}; + use std::mem; + use std::time::Duration; + use windows_sys::Win32::Foundation::*; + + #[test] + fn is_send_sync() { + fn is_send_sync<T: Send + Sync>() {} + is_send_sync::<CompletionPort>(); + } + + #[test] + fn token_right_size() { + assert_eq!(mem::size_of::<usize>(), mem::size_of::<usize>()); + } + + #[test] + fn timeout() { + let c = CompletionPort::new(1).unwrap(); + let err = c.get(Some(Duration::from_millis(1))).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(WAIT_TIMEOUT as i32)); + } + + #[test] + fn get() { + let c = CompletionPort::new(1).unwrap(); + c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap(); + let s = c.get(None).unwrap(); + assert_eq!(s.bytes_transferred(), 1); + assert_eq!(s.token(), 2); + assert_eq!(s.overlapped(), 3 as *mut _); + } + + #[test] + fn get_many() { + let c = CompletionPort::new(1).unwrap(); + + c.post(CompletionStatus::new(1, 2, 3 as *mut _)).unwrap(); + c.post(CompletionStatus::new(4, 5, 6 as *mut _)).unwrap(); + + let mut s = vec![CompletionStatus::zero(); 4]; + { + let s = c.get_many(&mut s, None).unwrap(); + assert_eq!(s.len(), 2); + assert_eq!(s[0].bytes_transferred(), 1); + assert_eq!(s[0].token(), 2); + assert_eq!(s[0].overlapped(), 3 as *mut _); + assert_eq!(s[1].bytes_transferred(), 4); + assert_eq!(s[1].token(), 5); + assert_eq!(s[1].overlapped(), 6 as *mut _); + } + assert_eq!(s[2].bytes_transferred(), 0); + assert_eq!(s[2].token(), 0); + assert_eq!(s[2].overlapped(), 0 as *mut _); + } +} diff --git a/vendor/miow/src/lib.rs b/vendor/miow/src/lib.rs index 815537fe7..c8f78a544 100644 --- a/vendor/miow/src/lib.rs +++ b/vendor/miow/src/lib.rs @@ -1,54 +1,54 @@ -//! A zero overhead Windows I/O library
-
-#![cfg(windows)]
-#![deny(missing_docs)]
-#![allow(bad_style)]
-#![doc(html_root_url = "https://docs.rs/miow/0.3/x86_64-pc-windows-msvc/")]
-
-use std::cmp;
-use std::io;
-use std::time::Duration;
-
-use windows_sys::Win32::Foundation::*;
-use windows_sys::Win32::System::WindowsProgramming::*;
-
-#[cfg(test)]
-macro_rules! t {
- ($e:expr) => {
- match $e {
- Ok(e) => e,
- Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
- }
- };
-}
-
-mod handle;
-mod overlapped;
-
-pub mod iocp;
-pub mod net;
-pub mod pipe;
-
-pub use crate::overlapped::Overlapped;
-pub(crate) const TRUE: BOOL = 1;
-pub(crate) const FALSE: BOOL = 0;
-
-fn cvt(i: BOOL) -> io::Result<BOOL> {
- if i == 0 {
- Err(io::Error::last_os_error())
- } else {
- Ok(i)
- }
-}
-
-fn dur2ms(dur: Option<Duration>) -> u32 {
- let dur = match dur {
- Some(dur) => dur,
- None => return INFINITE,
- };
- let ms = dur.as_secs().checked_mul(1_000);
- let ms_extra = dur.subsec_nanos() / 1_000_000;
- ms.and_then(|ms| ms.checked_add(ms_extra as u64))
- .map(|ms| cmp::min(u32::max_value() as u64, ms) as u32)
- .unwrap_or(INFINITE - 1)
-}
+//! A zero overhead Windows I/O library + +#![cfg(windows)] +#![deny(missing_docs)] +#![allow(bad_style)] +#![doc(html_root_url = "https://docs.rs/miow/0.3/x86_64-pc-windows-msvc/")] + +use std::cmp; +use std::io; +use std::time::Duration; + +use windows_sys::Win32::Foundation::BOOL; +use windows_sys::Win32::System::WindowsProgramming::INFINITE; + +#[cfg(test)] +macro_rules! t { + ($e:expr) => { + match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + } + }; +} + +mod handle; +mod overlapped; + +pub mod iocp; +pub mod net; +pub mod pipe; + +pub use crate::overlapped::Overlapped; +pub(crate) const TRUE: BOOL = 1; +pub(crate) const FALSE: BOOL = 0; + +fn cvt(i: BOOL) -> io::Result<BOOL> { + if i == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(i) + } +} + +fn dur2ms(dur: Option<Duration>) -> u32 { + let dur = match dur { + Some(dur) => dur, + None => return INFINITE, + }; + let ms = dur.as_secs().checked_mul(1_000); + let ms_extra = dur.subsec_nanos() / 1_000_000; + ms.and_then(|ms| ms.checked_add(ms_extra as u64)) + .map(|ms| cmp::min(u32::max_value() as u64, ms) as u32) + .unwrap_or(INFINITE - 1) +} diff --git a/vendor/miow/src/net.rs b/vendor/miow/src/net.rs index 262d8612f..a84b389a6 100644 --- a/vendor/miow/src/net.rs +++ b/vendor/miow/src/net.rs @@ -1,1296 +1,1300 @@ -//! Extensions and types for the standard networking primitives.
-//!
-//! This module contains a number of extension traits for the types in
-//! `std::net` for Windows-specific functionality.
-
-use crate::*;
-use std::cmp;
-use std::io;
-use std::mem;
-use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
-use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
-use std::os::windows::prelude::*;
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-use windows_sys::core::*;
-use windows_sys::Win32::NetworkManagement::IpHelper::*;
-use windows_sys::Win32::Networking::WinSock::*;
-use windows_sys::Win32::System::IO::*;
-
-/// A type to represent a buffer in which a socket address will be stored.
-///
-/// This type is used with the `recv_from_overlapped` function on the
-/// `UdpSocketExt` trait to provide space for the overlapped I/O operation to
-/// fill in the address upon completion.
-#[derive(Clone, Copy)]
-pub struct SocketAddrBuf {
- buf: SOCKADDR_STORAGE,
- len: i32,
-}
-
-/// A type to represent a buffer in which an accepted socket's address will be
-/// stored.
-///
-/// This type is used with the `accept_overlapped` method on the
-/// `TcpListenerExt` trait to provide space for the overlapped I/O operation to
-/// fill in the socket addresses upon completion.
-#[repr(C)]
-pub struct AcceptAddrsBuf {
- // For AcceptEx we've got the restriction that the addresses passed in that
- // buffer need to be at least 16 bytes more than the maximum address length
- // for the protocol in question, so add some extra here and there
- local: SOCKADDR_STORAGE,
- _pad1: [u8; 16],
- remote: SOCKADDR_STORAGE,
- _pad2: [u8; 16],
-}
-
-/// The parsed return value of `AcceptAddrsBuf`.
-pub struct AcceptAddrs<'a> {
- local: *mut SOCKADDR,
- local_len: i32,
- remote: *mut SOCKADDR,
- remote_len: i32,
- _data: &'a AcceptAddrsBuf,
-}
-
-struct WsaExtension {
- guid: GUID,
- val: AtomicUsize,
-}
-
-/// Additional methods for the `TcpStream` type in the standard library.
-pub trait TcpStreamExt {
- /// Execute an overlapped read I/O operation on this TCP stream.
- ///
- /// This function will issue an overlapped I/O read (via `WSARecv`) on this
- /// socket. The provided buffer will be filled in when the operation
- /// completes and the given `OVERLAPPED` instance is used to track the
- /// overlapped operation.
- ///
- /// If the operation succeeds, `Ok(Some(n))` is returned indicating how
- /// many bytes were read. If the operation returns an error indicating that
- /// the I/O is currently pending, `Ok(None)` is returned. Otherwise, the
- /// error associated with the operation is returned and no overlapped
- /// operation is enqueued.
- ///
- /// The number of bytes read will be returned as part of the completion
- /// notification when the I/O finishes.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf` and
- /// `overlapped` pointers are valid until the end of the I/O operation. The
- /// kernel also requires that `overlapped` is unique for this I/O operation
- /// and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that these two input
- /// pointers are valid until the I/O operation is completed, typically via
- /// completion ports and waiting to receive the completion notification on
- /// the port.
- unsafe fn read_overlapped(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Execute an overlapped write I/O operation on this TCP stream.
- ///
- /// This function will issue an overlapped I/O write (via `WSASend`) on this
- /// socket. The provided buffer will be written when the operation completes
- /// and the given `OVERLAPPED` instance is used to track the overlapped
- /// operation.
- ///
- /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the
- /// number of bytes that were written. If the operation returns an error
- /// indicating that the I/O is currently pending, `Ok(None)` is returned.
- /// Otherwise, the error associated with the operation is returned and no
- /// overlapped operation is enqueued.
- ///
- /// The number of bytes written will be returned as part of the completion
- /// notification when the I/O finishes.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf` and
- /// `overlapped` pointers are valid until the end of the I/O operation. The
- /// kernel also requires that `overlapped` is unique for this I/O operation
- /// and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that these two input
- /// pointers are valid until the I/O operation is completed, typically via
- /// completion ports and waiting to receive the completion notification on
- /// the port.
- unsafe fn write_overlapped(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Attempt to consume the internal socket in this builder by executing an
- /// overlapped connect operation.
- ///
- /// This function will issue a connect operation to the address specified on
- /// the underlying socket, flagging it as an overlapped operation which will
- /// complete asynchronously. If successful this function will return the
- /// corresponding TCP stream.
- ///
- /// The `buf` argument provided is an initial buffer of data that should be
- /// sent after the connection is initiated. It's acceptable to
- /// pass an empty slice here.
- ///
- /// This function will also return whether the connect immediately
- /// succeeded or not. If `None` is returned then the I/O operation is still
- /// pending and will complete at a later date, and if `Some(bytes)` is
- /// returned then that many bytes were transferred.
- ///
- /// Note that to succeed this requires that the underlying socket has
- /// previously been bound via a call to `bind` to a local address.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the
- /// `overlapped` and `buf` pointers to be valid until the end of the I/O
- /// operation. The kernel also requires that `overlapped` is unique for
- /// this I/O operation and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that this pointer is
- /// valid until the I/O operation is completed, typically via completion
- /// ports and waiting to receive the completion notification on the port.
- unsafe fn connect_overlapped(
- &self,
- addr: &SocketAddr,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Once a `connect_overlapped` has finished, this function needs to be
- /// called to finish the connect operation.
- ///
- /// Currently this just calls `setsockopt` with `SO_UPDATE_CONNECT_CONTEXT`
- /// to ensure that further functions like `getpeername` and `getsockname`
- /// work correctly.
- fn connect_complete(&self) -> io::Result<()>;
-
- /// Calls the `GetOverlappedResult` function to get the result of an
- /// overlapped operation for this handle.
- ///
- /// This function takes the `OVERLAPPED` argument which must have been used
- /// to initiate an overlapped I/O operation, and returns either the
- /// successful number of bytes transferred during the operation or an error
- /// if one occurred, along with the results of the `lpFlags` parameter of
- /// the relevant operation, if applicable.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe as `overlapped` must have previously been used
- /// to execute an operation for this handle, and it must also be a valid
- /// pointer to an `OVERLAPPED` instance.
- ///
- /// # Panics
- ///
- /// This function will panic
- unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>;
-}
-
-/// Additional methods for the `UdpSocket` type in the standard library.
-pub trait UdpSocketExt {
- /// Execute an overlapped receive I/O operation on this UDP socket.
- ///
- /// This function will issue an overlapped I/O read (via `WSARecvFrom`) on
- /// this socket. The provided buffer will be filled in when the operation
- /// completes, the source from where the data came from will be written to
- /// `addr`, and the given `OVERLAPPED` instance is used to track the
- /// overlapped operation.
- ///
- /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the
- /// number of bytes that were read. If the operation returns an error
- /// indicating that the I/O is currently pending, `Ok(None)` is returned.
- /// Otherwise, the error associated with the operation is returned and no
- /// overlapped operation is enqueued.
- ///
- /// The number of bytes read will be returned as part of the completion
- /// notification when the I/O finishes.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf`,
- /// `addr`, and `overlapped` pointers are valid until the end of the I/O
- /// operation. The kernel also requires that `overlapped` is unique for this
- /// I/O operation and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that these two input
- /// pointers are valid until the I/O operation is completed, typically via
- /// completion ports and waiting to receive the completion notification on
- /// the port.
- unsafe fn recv_from_overlapped(
- &self,
- buf: &mut [u8],
- addr: *mut SocketAddrBuf,
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Execute an overlapped receive I/O operation on this UDP socket.
- ///
- /// This function will issue an overlapped I/O read (via `WSARecv`) on
- /// this socket. The provided buffer will be filled in when the operation
- /// completes, the source from where the data came from will be written to
- /// `addr`, and the given `OVERLAPPED` instance is used to track the
- /// overlapped operation.
- ///
- /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the
- /// number of bytes that were read. If the operation returns an error
- /// indicating that the I/O is currently pending, `Ok(None)` is returned.
- /// Otherwise, the error associated with the operation is returned and no
- /// overlapped operation is enqueued.
- ///
- /// The number of bytes read will be returned as part of the completion
- /// notification when the I/O finishes.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf`,
- /// and `overlapped` pointers are valid until the end of the I/O
- /// operation. The kernel also requires that `overlapped` is unique for this
- /// I/O operation and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that these two input
- /// pointers are valid until the I/O operation is completed, typically via
- /// completion ports and waiting to receive the completion notification on
- /// the port.
- unsafe fn recv_overlapped(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Execute an overlapped send I/O operation on this UDP socket.
- ///
- /// This function will issue an overlapped I/O write (via `WSASendTo`) on
- /// this socket to the address specified by `addr`. The provided buffer will
- /// be written when the operation completes and the given `OVERLAPPED`
- /// instance is used to track the overlapped operation.
- ///
- /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte
- /// were written. If the operation returns an error indicating that the I/O
- /// is currently pending, `Ok(None)` is returned. Otherwise, the error
- /// associated with the operation is returned and no overlapped operation
- /// is enqueued.
- ///
- /// The number of bytes written will be returned as part of the completion
- /// notification when the I/O finishes.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf` and
- /// `overlapped` pointers are valid until the end of the I/O operation. The
- /// kernel also requires that `overlapped` is unique for this I/O operation
- /// and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that these two input
- /// pointers are valid until the I/O operation is completed, typically via
- /// completion ports and waiting to receive the completion notification on
- /// the port.
- unsafe fn send_to_overlapped(
- &self,
- buf: &[u8],
- addr: &SocketAddr,
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Execute an overlapped send I/O operation on this UDP socket.
- ///
- /// This function will issue an overlapped I/O write (via `WSASend`) on
- /// this socket to the address it was previously connected to. The provided
- /// buffer will be written when the operation completes and the given `OVERLAPPED`
- /// instance is used to track the overlapped operation.
- ///
- /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte
- /// were written. If the operation returns an error indicating that the I/O
- /// is currently pending, `Ok(None)` is returned. Otherwise, the error
- /// associated with the operation is returned and no overlapped operation
- /// is enqueued.
- ///
- /// The number of bytes written will be returned as part of the completion
- /// notification when the I/O finishes.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf` and
- /// `overlapped` pointers are valid until the end of the I/O operation. The
- /// kernel also requires that `overlapped` is unique for this I/O operation
- /// and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that these two input
- /// pointers are valid until the I/O operation is completed, typically via
- /// completion ports and waiting to receive the completion notification on
- /// the port.
- unsafe fn send_overlapped(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>>;
-
- /// Calls the `GetOverlappedResult` function to get the result of an
- /// overlapped operation for this handle.
- ///
- /// This function takes the `OVERLAPPED` argument which must have been used
- /// to initiate an overlapped I/O operation, and returns either the
- /// successful number of bytes transferred during the operation or an error
- /// if one occurred, along with the results of the `lpFlags` parameter of
- /// the relevant operation, if applicable.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe as `overlapped` must have previously been used
- /// to execute an operation for this handle, and it must also be a valid
- /// pointer to an `OVERLAPPED` instance.
- ///
- /// # Panics
- ///
- /// This function will panic
- unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>;
-}
-
-/// Additional methods for the `TcpListener` type in the standard library.
-pub trait TcpListenerExt {
- /// Perform an accept operation on this listener, accepting a connection in
- /// an overlapped fashion.
- ///
- /// This function will issue an I/O request to accept an incoming connection
- /// with the specified overlapped instance. The `socket` provided must be a
- /// configured but not bound or connected socket, and if successful this
- /// will consume the internal socket of the builder to return a TCP stream.
- ///
- /// The `addrs` buffer provided will be filled in with the local and remote
- /// addresses of the connection upon completion.
- ///
- /// If the accept succeeds immediately, `Ok(true)` is returned. If
- /// the connect indicates that the I/O is currently pending, `Ok(false)` is
- /// returned. Otherwise, the error associated with the operation is
- /// returned and no overlapped operation is enqueued.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the
- /// `addrs` and `overlapped` pointers are valid until the end of the I/O
- /// operation. The kernel also requires that `overlapped` is unique for this
- /// I/O operation and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that the pointers are
- /// valid until the I/O operation is completed, typically via completion
- /// ports and waiting to receive the completion notification on the port.
- unsafe fn accept_overlapped(
- &self,
- socket: &TcpStream,
- addrs: &mut AcceptAddrsBuf,
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<bool>;
-
- /// Once an `accept_overlapped` has finished, this function needs to be
- /// called to finish the accept operation.
- ///
- /// Currently this just calls `setsockopt` with `SO_UPDATE_ACCEPT_CONTEXT`
- /// to ensure that further functions like `getpeername` and `getsockname`
- /// work correctly.
- fn accept_complete(&self, socket: &TcpStream) -> io::Result<()>;
-
- /// Calls the `GetOverlappedResult` function to get the result of an
- /// overlapped operation for this handle.
- ///
- /// This function takes the `OVERLAPPED` argument which must have been used
- /// to initiate an overlapped I/O operation, and returns either the
- /// successful number of bytes transferred during the operation or an error
- /// if one occurred, along with the results of the `lpFlags` parameter of
- /// the relevant operation, if applicable.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe as `overlapped` must have previously been used
- /// to execute an operation for this handle, and it must also be a valid
- /// pointer to an `OVERLAPPED` instance.
- ///
- /// # Panics
- ///
- /// This function will panic
- unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>;
-}
-
-#[doc(hidden)]
-trait NetInt {
- fn from_be(i: Self) -> Self;
- fn to_be(&self) -> Self;
-}
-macro_rules! doit {
- ($($t:ident)*) => ($(impl NetInt for $t {
- fn from_be(i: Self) -> Self { <$t>::from_be(i) }
- fn to_be(&self) -> Self { <$t>::to_be(*self) }
- })*)
-}
-doit! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize }
-
-// fn hton<I: NetInt>(i: I) -> I { i.to_be() }
-fn ntoh<I: NetInt>(i: I) -> I {
- I::from_be(i)
-}
-
-fn last_err() -> io::Result<Option<usize>> {
- let err = unsafe { WSAGetLastError() };
- if err == WSA_IO_PENDING as i32 {
- Ok(None)
- } else {
- Err(io::Error::from_raw_os_error(err))
- }
-}
-
-fn cvt(i: i32, size: u32) -> io::Result<Option<usize>> {
- if i == SOCKET_ERROR {
- last_err()
- } else {
- Ok(Some(size as usize))
- }
-}
-
-/// A type with the same memory layout as `SOCKADDR`. Used in converting Rust level
-/// SocketAddr* types into their system representation. The benefit of this specific
-/// type over using `SOCKADDR_STORAGE` is that this type is exactly as large as it
-/// needs to be and not a lot larger. And it can be initialized cleaner from Rust.
-#[repr(C)]
-pub(crate) union SocketAddrCRepr {
- v4: SOCKADDR_IN,
- v6: SOCKADDR_IN6,
-}
-
-impl SocketAddrCRepr {
- pub(crate) fn as_ptr(&self) -> *const SOCKADDR {
- self as *const _ as *const SOCKADDR
- }
-}
-
-fn socket_addr_to_ptrs(addr: &SocketAddr) -> (SocketAddrCRepr, i32) {
- match *addr {
- SocketAddr::V4(ref a) => {
- let sin_addr = IN_ADDR {
- S_un: IN_ADDR_0 {
- S_addr: u32::from_ne_bytes(a.ip().octets()),
- },
- };
-
- let sockaddr_in = SOCKADDR_IN {
- sin_family: AF_INET as _,
- sin_port: a.port().to_be(),
- sin_addr,
- sin_zero: [0; 8],
- };
-
- let sockaddr = SocketAddrCRepr { v4: sockaddr_in };
- (sockaddr, mem::size_of::<SOCKADDR_IN>() as i32)
- }
- SocketAddr::V6(ref a) => {
- let sockaddr_in6 = SOCKADDR_IN6 {
- sin6_family: AF_INET6 as _,
- sin6_port: a.port().to_be(),
- sin6_addr: IN6_ADDR {
- u: IN6_ADDR_0 {
- Byte: a.ip().octets(),
- },
- },
- sin6_flowinfo: a.flowinfo(),
- Anonymous: SOCKADDR_IN6_0 {
- sin6_scope_id: a.scope_id(),
- },
- };
-
- let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 };
- (sockaddr, mem::size_of::<SOCKADDR_IN6>() as i32)
- }
- }
-}
-
-unsafe fn ptrs_to_socket_addr(ptr: *const SOCKADDR, len: i32) -> Option<SocketAddr> {
- if (len as usize) < mem::size_of::<i32>() {
- return None;
- }
- match (*ptr).sa_family as _ {
- AF_INET if len as usize >= mem::size_of::<SOCKADDR_IN>() => {
- let b = &*(ptr as *const SOCKADDR_IN);
- let ip = ntoh(b.sin_addr.S_un.S_addr);
- let ip = Ipv4Addr::new(
- (ip >> 24) as u8,
- (ip >> 16) as u8,
- (ip >> 8) as u8,
- ip as u8,
- );
- Some(SocketAddr::V4(SocketAddrV4::new(ip, ntoh(b.sin_port))))
- }
- AF_INET6 if len as usize >= mem::size_of::<SOCKADDR_IN6>() => {
- let b = &*(ptr as *const SOCKADDR_IN6);
- let arr = &b.sin6_addr.u.Byte;
- let ip = Ipv6Addr::new(
- ((arr[0] as u16) << 8) | (arr[1] as u16),
- ((arr[2] as u16) << 8) | (arr[3] as u16),
- ((arr[4] as u16) << 8) | (arr[5] as u16),
- ((arr[6] as u16) << 8) | (arr[7] as u16),
- ((arr[8] as u16) << 8) | (arr[9] as u16),
- ((arr[10] as u16) << 8) | (arr[11] as u16),
- ((arr[12] as u16) << 8) | (arr[13] as u16),
- ((arr[14] as u16) << 8) | (arr[15] as u16),
- );
- let addr = SocketAddrV6::new(
- ip,
- ntoh(b.sin6_port),
- ntoh(b.sin6_flowinfo),
- ntoh(b.Anonymous.sin6_scope_id),
- );
- Some(SocketAddr::V6(addr))
- }
- _ => None,
- }
-}
-
-unsafe fn slice2buf(slice: &[u8]) -> WSABUF {
- WSABUF {
- len: cmp::min(slice.len(), <u32>::max_value() as usize) as u32,
- buf: slice.as_ptr() as *mut _,
- }
-}
-
-unsafe fn result(socket: SOCKET, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> {
- let mut transferred = 0;
- let mut flags = 0;
- let r = WSAGetOverlappedResult(socket, overlapped, &mut transferred, FALSE, &mut flags);
- if r == 0 {
- Err(io::Error::last_os_error())
- } else {
- Ok((transferred as usize, flags))
- }
-}
-
-impl TcpStreamExt for TcpStream {
- unsafe fn read_overlapped(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- let mut buf = slice2buf(buf);
- let mut flags = 0;
- let mut bytes_read: u32 = 0;
- let r = WSARecv(
- self.as_raw_socket() as SOCKET,
- &mut buf,
- 1,
- &mut bytes_read,
- &mut flags,
- overlapped,
- None,
- );
- cvt(r, bytes_read)
- }
-
- unsafe fn write_overlapped(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- let mut buf = slice2buf(buf);
- let mut bytes_written = 0;
-
- // Note here that we capture the number of bytes written. The
- // documentation on MSDN, however, states:
- //
- // > Use NULL for this parameter if the lpOverlapped parameter is not
- // > NULL to avoid potentially erroneous results. This parameter can be
- // > NULL only if the lpOverlapped parameter is not NULL.
- //
- // If we're not passing a null overlapped pointer here, then why are we
- // then capturing the number of bytes! Well so it turns out that this is
- // clearly faster to learn the bytes here rather than later calling
- // `WSAGetOverlappedResult`, and in practice almost all implementations
- // use this anyway [1].
- //
- // As a result we use this to and report back the result.
- //
- // [1]: https://github.com/carllerche/mio/pull/520#issuecomment-273983823
- let r = WSASend(
- self.as_raw_socket() as SOCKET,
- &mut buf,
- 1,
- &mut bytes_written,
- 0,
- overlapped,
- None,
- );
- cvt(r, bytes_written)
- }
-
- unsafe fn connect_overlapped(
- &self,
- addr: &SocketAddr,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- connect_overlapped(self.as_raw_socket() as SOCKET, addr, buf, overlapped)
- }
-
- fn connect_complete(&self) -> io::Result<()> {
- const SO_UPDATE_CONNECT_CONTEXT: i32 = 0x7010;
- let result = unsafe {
- setsockopt(
- self.as_raw_socket() as SOCKET,
- SOL_SOCKET as _,
- SO_UPDATE_CONNECT_CONTEXT,
- 0 as *mut _,
- 0,
- )
- };
- if result == 0 {
- Ok(())
- } else {
- Err(io::Error::last_os_error())
- }
- }
-
- unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> {
- result(self.as_raw_socket() as SOCKET, overlapped)
- }
-}
-
-unsafe fn connect_overlapped(
- socket: SOCKET,
- addr: &SocketAddr,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
-) -> io::Result<Option<usize>> {
- static CONNECTEX: WsaExtension = WsaExtension {
- guid: GUID {
- data1: 0x25a207b9,
- data2: 0xddf3,
- data3: 0x4660,
- data4: [0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e],
- },
- val: AtomicUsize::new(0),
- };
-
- let ptr = CONNECTEX.get(socket)?;
- assert!(ptr != 0);
- let connect_ex = mem::transmute::<_, LPFN_CONNECTEX>(ptr);
-
- let (addr_buf, addr_len) = socket_addr_to_ptrs(addr);
- let mut bytes_sent: u32 = 0;
- let r = connect_ex(
- socket,
- addr_buf.as_ptr(),
- addr_len,
- buf.as_ptr() as *mut _,
- buf.len() as u32,
- &mut bytes_sent,
- overlapped,
- );
- if r == TRUE {
- Ok(Some(bytes_sent as usize))
- } else {
- last_err()
- }
-}
-
-impl UdpSocketExt for UdpSocket {
- unsafe fn recv_from_overlapped(
- &self,
- buf: &mut [u8],
- addr: *mut SocketAddrBuf,
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- let mut buf = slice2buf(buf);
- let mut flags = 0;
- let mut received_bytes: u32 = 0;
- let r = WSARecvFrom(
- self.as_raw_socket() as SOCKET,
- &mut buf,
- 1,
- &mut received_bytes,
- &mut flags,
- &mut (*addr).buf as *mut _ as *mut _,
- &mut (*addr).len,
- overlapped,
- None,
- );
- cvt(r, received_bytes)
- }
-
- unsafe fn recv_overlapped(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- let mut buf = slice2buf(buf);
- let mut flags = 0;
- let mut received_bytes: u32 = 0;
- let r = WSARecv(
- self.as_raw_socket() as SOCKET,
- &mut buf,
- 1,
- &mut received_bytes,
- &mut flags,
- overlapped,
- None,
- );
- cvt(r, received_bytes)
- }
-
- unsafe fn send_to_overlapped(
- &self,
- buf: &[u8],
- addr: &SocketAddr,
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- let (addr_buf, addr_len) = socket_addr_to_ptrs(addr);
- let mut buf = slice2buf(buf);
- let mut sent_bytes = 0;
- let r = WSASendTo(
- self.as_raw_socket() as SOCKET,
- &mut buf,
- 1,
- &mut sent_bytes,
- 0,
- addr_buf.as_ptr() as *const _,
- addr_len,
- overlapped,
- None,
- );
- cvt(r, sent_bytes)
- }
-
- unsafe fn send_overlapped(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- let mut buf = slice2buf(buf);
- let mut sent_bytes = 0;
- let r = WSASend(
- self.as_raw_socket() as SOCKET,
- &mut buf,
- 1,
- &mut sent_bytes,
- 0,
- overlapped,
- None,
- );
- cvt(r, sent_bytes)
- }
-
- unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> {
- result(self.as_raw_socket() as SOCKET, overlapped)
- }
-}
-
-impl TcpListenerExt for TcpListener {
- unsafe fn accept_overlapped(
- &self,
- socket: &TcpStream,
- addrs: &mut AcceptAddrsBuf,
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<bool> {
- static ACCEPTEX: WsaExtension = WsaExtension {
- guid: GUID {
- data1: 0xb5367df1,
- data2: 0xcbac,
- data3: 0x11cf,
- data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92],
- },
- val: AtomicUsize::new(0),
- };
-
- let ptr = ACCEPTEX.get(self.as_raw_socket() as SOCKET)?;
- assert!(ptr != 0);
- let accept_ex = mem::transmute::<_, LPFN_ACCEPTEX>(ptr);
-
- let mut bytes = 0;
- let (a, b, c, d) = (*addrs).args();
- let r = accept_ex(
- self.as_raw_socket() as SOCKET,
- socket.as_raw_socket() as SOCKET,
- a,
- b,
- c,
- d,
- &mut bytes,
- overlapped,
- );
- let succeeded = if r == TRUE {
- true
- } else {
- last_err()?;
- false
- };
- Ok(succeeded)
- }
-
- fn accept_complete(&self, socket: &TcpStream) -> io::Result<()> {
- const SO_UPDATE_ACCEPT_CONTEXT: i32 = 0x700B;
- let me = self.as_raw_socket();
- let result = unsafe {
- setsockopt(
- socket.as_raw_socket() as SOCKET,
- SOL_SOCKET as _,
- SO_UPDATE_ACCEPT_CONTEXT,
- &me as *const _ as *mut _,
- mem::size_of_val(&me) as i32,
- )
- };
- if result == 0 {
- Ok(())
- } else {
- Err(io::Error::last_os_error())
- }
- }
-
- unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> {
- result(self.as_raw_socket() as SOCKET, overlapped)
- }
-}
-
-impl SocketAddrBuf {
- /// Creates a new blank socket address buffer.
- ///
- /// This should be used before a call to `recv_from_overlapped` overlapped
- /// to create an instance to pass down.
- pub fn new() -> SocketAddrBuf {
- SocketAddrBuf {
- buf: unsafe { mem::zeroed() },
- len: mem::size_of::<SOCKADDR_STORAGE>() as i32,
- }
- }
-
- /// Parses this buffer to return a standard socket address.
- ///
- /// This function should be called after the buffer has been filled in with
- /// a call to `recv_from_overlapped` being completed. It will interpret the
- /// address filled in and return the standard socket address type.
- ///
- /// If an error is encountered then `None` is returned.
- pub fn to_socket_addr(&self) -> Option<SocketAddr> {
- unsafe { ptrs_to_socket_addr(&self.buf as *const _ as *const _, self.len) }
- }
-}
-
-static GETACCEPTEXSOCKADDRS: WsaExtension = WsaExtension {
- guid: GUID {
- data1: 0xb5367df2,
- data2: 0xcbac,
- data3: 0x11cf,
- data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92],
- },
- val: AtomicUsize::new(0),
-};
-
-impl AcceptAddrsBuf {
- /// Creates a new blank buffer ready to be passed to a call to
- /// `accept_overlapped`.
- pub fn new() -> AcceptAddrsBuf {
- unsafe { mem::zeroed() }
- }
-
- /// Parses the data contained in this address buffer, returning the parsed
- /// result if successful.
- ///
- /// This function can be called after a call to `accept_overlapped` has
- /// succeeded to parse out the data that was written in.
- pub fn parse(&self, socket: &TcpListener) -> io::Result<AcceptAddrs> {
- let mut ret = AcceptAddrs {
- local: 0 as *mut _,
- local_len: 0,
- remote: 0 as *mut _,
- remote_len: 0,
- _data: self,
- };
- let ptr = GETACCEPTEXSOCKADDRS.get(socket.as_raw_socket() as SOCKET)?;
- assert!(ptr != 0);
- unsafe {
- let get_sockaddrs = mem::transmute::<_, LPFN_GETACCEPTEXSOCKADDRS>(ptr);
- let (a, b, c, d) = self.args();
- get_sockaddrs(
- a,
- b,
- c,
- d,
- &mut ret.local,
- &mut ret.local_len,
- &mut ret.remote,
- &mut ret.remote_len,
- );
- Ok(ret)
- }
- }
-
- #[allow(deref_nullptr)]
- fn args(&self) -> (*mut std::ffi::c_void, u32, u32, u32) {
- let remote_offset = unsafe { &(*(0 as *const AcceptAddrsBuf)).remote as *const _ as usize };
- (
- self as *const _ as *mut _,
- 0,
- remote_offset as u32,
- (mem::size_of_val(self) - remote_offset) as u32,
- )
- }
-}
-
-impl<'a> AcceptAddrs<'a> {
- /// Returns the local socket address contained in this buffer.
- pub fn local(&self) -> Option<SocketAddr> {
- unsafe { ptrs_to_socket_addr(self.local, self.local_len) }
- }
-
- /// Returns the remote socket address contained in this buffer.
- pub fn remote(&self) -> Option<SocketAddr> {
- unsafe { ptrs_to_socket_addr(self.remote, self.remote_len) }
- }
-}
-
-impl WsaExtension {
- fn get(&self, socket: SOCKET) -> io::Result<usize> {
- let prev = self.val.load(Ordering::SeqCst);
- if prev != 0 && !cfg!(debug_assertions) {
- return Ok(prev);
- }
- let mut ret = 0 as usize;
- let mut bytes = 0;
-
- // https://github.com/microsoft/win32metadata/issues/671
- const SIO_GET_EXTENSION_FUNCTION_POINTER: u32 = 33_5544_3206u32;
-
- let r = unsafe {
- WSAIoctl(
- socket,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &self.guid as *const _ as *mut _,
- mem::size_of_val(&self.guid) as u32,
- &mut ret as *mut _ as *mut _,
- mem::size_of_val(&ret) as u32,
- &mut bytes,
- 0 as *mut _,
- None,
- )
- };
- cvt(r, 0).map(|_| {
- debug_assert_eq!(bytes as usize, mem::size_of_val(&ret));
- debug_assert!(prev == 0 || prev == ret);
- self.val.store(ret, Ordering::SeqCst);
- ret
- })
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::io::prelude::*;
- use std::net::{
- IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket,
- };
- use std::slice;
- use std::thread;
-
- use socket2::{Domain, Socket, Type};
-
- use crate::iocp::CompletionPort;
- use crate::net::{AcceptAddrsBuf, TcpListenerExt};
- use crate::net::{SocketAddrBuf, TcpStreamExt, UdpSocketExt};
- use crate::Overlapped;
-
- fn each_ip(f: &mut dyn FnMut(SocketAddr)) {
- f(t!("127.0.0.1:0".parse()));
- f(t!("[::1]:0".parse()));
- }
-
- #[test]
- fn tcp_read() {
- each_ip(&mut |addr| {
- let l = t!(TcpListener::bind(addr));
- let addr = t!(l.local_addr());
- let t = thread::spawn(move || {
- let mut a = t!(l.accept()).0;
- t!(a.write_all(&[1, 2, 3]));
- });
-
- let cp = t!(CompletionPort::new(1));
- let s = t!(TcpStream::connect(addr));
- t!(cp.add_socket(1, &s));
-
- let mut b = [0; 10];
- let a = Overlapped::zero();
- unsafe {
- t!(s.read_overlapped(&mut b, a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
- assert_eq!(&b[0..3], &[1, 2, 3]);
-
- t!(t.join());
- })
- }
-
- #[test]
- fn tcp_write() {
- each_ip(&mut |addr| {
- let l = t!(TcpListener::bind(addr));
- let addr = t!(l.local_addr());
- let t = thread::spawn(move || {
- let mut a = t!(l.accept()).0;
- let mut b = [0; 10];
- let n = t!(a.read(&mut b));
- assert_eq!(n, 3);
- assert_eq!(&b[0..3], &[1, 2, 3]);
- });
-
- let cp = t!(CompletionPort::new(1));
- let s = t!(TcpStream::connect(addr));
- t!(cp.add_socket(1, &s));
-
- let b = [1, 2, 3];
- let a = Overlapped::zero();
- unsafe {
- t!(s.write_overlapped(&b, a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
-
- t!(t.join());
- })
- }
-
- #[test]
- fn tcp_connect() {
- each_ip(&mut |addr_template| {
- let l = t!(TcpListener::bind(addr_template));
- let addr = t!(l.local_addr());
- let t = thread::spawn(move || {
- t!(l.accept());
- });
-
- let cp = t!(CompletionPort::new(1));
- let domain = Domain::for_address(addr);
- let socket = t!(Socket::new(domain, Type::STREAM, None));
- t!(socket.bind(&addr_template.into()));
- let socket = TcpStream::from(socket);
- t!(cp.add_socket(1, &socket));
-
- let a = Overlapped::zero();
- unsafe {
- t!(socket.connect_overlapped(&addr, &[], a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 0);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
- t!(socket.connect_complete());
-
- t!(t.join());
- })
- }
-
- #[test]
- fn udp_recv_from() {
- each_ip(&mut |addr| {
- let a = t!(UdpSocket::bind(addr));
- let b = t!(UdpSocket::bind(addr));
- let a_addr = t!(a.local_addr());
- let b_addr = t!(b.local_addr());
- let t = thread::spawn(move || {
- t!(a.send_to(&[1, 2, 3], b_addr));
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_socket(1, &b));
-
- let mut buf = [0; 10];
- let a = Overlapped::zero();
- let mut addr = SocketAddrBuf::new();
- unsafe {
- t!(b.recv_from_overlapped(&mut buf, &mut addr, a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
- assert_eq!(&buf[..3], &[1, 2, 3]);
- assert_eq!(addr.to_socket_addr(), Some(a_addr));
-
- t!(t.join());
- })
- }
-
- #[test]
- fn udp_recv() {
- each_ip(&mut |addr| {
- let a = t!(UdpSocket::bind(addr));
- let b = t!(UdpSocket::bind(addr));
- let a_addr = t!(a.local_addr());
- let b_addr = t!(b.local_addr());
- assert!(b.connect(a_addr).is_ok());
- assert!(a.connect(b_addr).is_ok());
- let t = thread::spawn(move || {
- t!(a.send_to(&[1, 2, 3], b_addr));
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_socket(1, &b));
-
- let mut buf = [0; 10];
- let a = Overlapped::zero();
- unsafe {
- t!(b.recv_overlapped(&mut buf, a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
- assert_eq!(&buf[..3], &[1, 2, 3]);
-
- t!(t.join());
- })
- }
-
- #[test]
- fn udp_send_to() {
- each_ip(&mut |addr| {
- let a = t!(UdpSocket::bind(addr));
- let b = t!(UdpSocket::bind(addr));
- let a_addr = t!(a.local_addr());
- let b_addr = t!(b.local_addr());
- let t = thread::spawn(move || {
- let mut b = [0; 100];
- let (n, addr) = t!(a.recv_from(&mut b));
- assert_eq!(n, 3);
- assert_eq!(addr, b_addr);
- assert_eq!(&b[..3], &[1, 2, 3]);
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_socket(1, &b));
-
- let a = Overlapped::zero();
- unsafe {
- t!(b.send_to_overlapped(&[1, 2, 3], &a_addr, a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
-
- t!(t.join());
- })
- }
-
- #[test]
- fn udp_send() {
- each_ip(&mut |addr| {
- let a = t!(UdpSocket::bind(addr));
- let b = t!(UdpSocket::bind(addr));
- let a_addr = t!(a.local_addr());
- let b_addr = t!(b.local_addr());
- assert!(b.connect(a_addr).is_ok());
- assert!(a.connect(b_addr).is_ok());
- let t = thread::spawn(move || {
- let mut b = [0; 100];
- let (n, addr) = t!(a.recv_from(&mut b));
- assert_eq!(n, 3);
- assert_eq!(addr, b_addr);
- assert_eq!(&b[..3], &[1, 2, 3]);
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_socket(1, &b));
-
- let a = Overlapped::zero();
- unsafe {
- t!(b.send_overlapped(&[1, 2, 3], a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
-
- t!(t.join());
- })
- }
-
- #[test]
- fn tcp_accept() {
- each_ip(&mut |addr_template| {
- let l = t!(TcpListener::bind(addr_template));
- let addr = t!(l.local_addr());
- let t = thread::spawn(move || {
- let socket = t!(TcpStream::connect(addr));
- (socket.local_addr().unwrap(), socket.peer_addr().unwrap())
- });
-
- let cp = t!(CompletionPort::new(1));
- let domain = Domain::for_address(addr);
- let socket = TcpStream::from(t!(Socket::new(domain, Type::STREAM, None)));
- t!(cp.add_socket(1, &l));
-
- let a = Overlapped::zero();
- let mut addrs = AcceptAddrsBuf::new();
- unsafe {
- t!(l.accept_overlapped(&socket, &mut addrs, a.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 0);
- assert_eq!(status.token(), 1);
- assert_eq!(status.overlapped(), a.raw());
- t!(l.accept_complete(&socket));
-
- let (remote, local) = t!(t.join());
- let addrs = addrs.parse(&l).unwrap();
- assert_eq!(addrs.local(), Some(local));
- assert_eq!(addrs.remote(), Some(remote));
- })
- }
-
- #[test]
- fn sockaddr_convert_4() {
- let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(3, 4, 5, 6)), 0xabcd);
- let (raw_addr, addr_len) = super::socket_addr_to_ptrs(&addr);
- assert_eq!(addr_len, 16);
- let addr_bytes =
- unsafe { slice::from_raw_parts(raw_addr.as_ptr() as *const u8, addr_len as usize) };
- assert_eq!(
- addr_bytes,
- &[2, 0, 0xab, 0xcd, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0]
- );
- }
-
- #[test]
- fn sockaddr_convert_v6() {
- let port = 0xabcd;
- let flowinfo = 0x12345678;
- let scope_id = 0x87654321;
- let addr = SocketAddr::V6(SocketAddrV6::new(
- Ipv6Addr::new(
- 0x0102, 0x0304, 0x0506, 0x0708, 0x090a, 0x0b0c, 0x0d0e, 0x0f10,
- ),
- port,
- flowinfo,
- scope_id,
- ));
- let (raw_addr, addr_len) = super::socket_addr_to_ptrs(&addr);
- assert_eq!(addr_len, 28);
- let addr_bytes =
- unsafe { slice::from_raw_parts(raw_addr.as_ptr() as *const u8, addr_len as usize) };
- assert_eq!(
- addr_bytes,
- &[
- 23, 0, // AF_INET6
- 0xab, 0xcd, // Port
- 0x78, 0x56, 0x34, 0x12, // flowinfo
- 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
- 0x0f, 0x10, // IP
- 0x21, 0x43, 0x65, 0x87, // scope_id
- ]
- );
- }
-}
+//! Extensions and types for the standard networking primitives. +//! +//! This module contains a number of extension traits for the types in +//! `std::net` for Windows-specific functionality. + +use crate::{FALSE, TRUE}; +use std::cmp; +use std::io; +use std::mem; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}; +use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket}; +use std::os::windows::io::AsRawSocket; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use windows_sys::core::GUID; +use windows_sys::Win32::Networking::WinSock::{ + setsockopt, WSAGetLastError, WSAGetOverlappedResult, WSAIoctl, WSARecv, WSARecvFrom, WSASend, + WSASendTo, AF_INET, AF_INET6, IN6_ADDR, IN6_ADDR_0, IN_ADDR, IN_ADDR_0, LPFN_ACCEPTEX, + LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, SOCKADDR, SOCKADDR_IN, SOCKADDR_IN6, SOCKADDR_IN6_0, + SOCKADDR_STORAGE, SOCKET, SOCKET_ERROR, SOL_SOCKET, WSABUF, WSA_IO_PENDING, +}; +use windows_sys::Win32::System::IO::OVERLAPPED; + +/// A type to represent a buffer in which a socket address will be stored. +/// +/// This type is used with the `recv_from_overlapped` function on the +/// `UdpSocketExt` trait to provide space for the overlapped I/O operation to +/// fill in the address upon completion. +#[derive(Clone, Copy)] +pub struct SocketAddrBuf { + buf: SOCKADDR_STORAGE, + len: i32, +} + +/// A type to represent a buffer in which an accepted socket's address will be +/// stored. +/// +/// This type is used with the `accept_overlapped` method on the +/// `TcpListenerExt` trait to provide space for the overlapped I/O operation to +/// fill in the socket addresses upon completion. +#[repr(C)] +pub struct AcceptAddrsBuf { + // For AcceptEx we've got the restriction that the addresses passed in that + // buffer need to be at least 16 bytes more than the maximum address length + // for the protocol in question, so add some extra here and there + local: SOCKADDR_STORAGE, + _pad1: [u8; 16], + remote: SOCKADDR_STORAGE, + _pad2: [u8; 16], +} + +/// The parsed return value of `AcceptAddrsBuf`. +pub struct AcceptAddrs<'a> { + local: *mut SOCKADDR, + local_len: i32, + remote: *mut SOCKADDR, + remote_len: i32, + _data: &'a AcceptAddrsBuf, +} + +struct WsaExtension { + guid: GUID, + val: AtomicUsize, +} + +/// Additional methods for the `TcpStream` type in the standard library. +pub trait TcpStreamExt { + /// Execute an overlapped read I/O operation on this TCP stream. + /// + /// This function will issue an overlapped I/O read (via `WSARecv`) on this + /// socket. The provided buffer will be filled in when the operation + /// completes and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned indicating how + /// many bytes were read. If the operation returns an error indicating that + /// the I/O is currently pending, `Ok(None)` is returned. Otherwise, the + /// error associated with the operation is returned and no overlapped + /// operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped write I/O operation on this TCP stream. + /// + /// This function will issue an overlapped I/O write (via `WSASend`) on this + /// socket. The provided buffer will be written when the operation completes + /// and the given `OVERLAPPED` instance is used to track the overlapped + /// operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were written. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Attempt to consume the internal socket in this builder by executing an + /// overlapped connect operation. + /// + /// This function will issue a connect operation to the address specified on + /// the underlying socket, flagging it as an overlapped operation which will + /// complete asynchronously. If successful this function will return the + /// corresponding TCP stream. + /// + /// The `buf` argument provided is an initial buffer of data that should be + /// sent after the connection is initiated. It's acceptable to + /// pass an empty slice here. + /// + /// This function will also return whether the connect immediately + /// succeeded or not. If `None` is returned then the I/O operation is still + /// pending and will complete at a later date, and if `Some(bytes)` is + /// returned then that many bytes were transferred. + /// + /// Note that to succeed this requires that the underlying socket has + /// previously been bound via a call to `bind` to a local address. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` and `buf` pointers to be valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for + /// this I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + unsafe fn connect_overlapped( + &self, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Once a `connect_overlapped` has finished, this function needs to be + /// called to finish the connect operation. + /// + /// Currently this just calls `setsockopt` with `SO_UPDATE_CONNECT_CONTEXT` + /// to ensure that further functions like `getpeername` and `getsockname` + /// work correctly. + fn connect_complete(&self) -> io::Result<()>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>; +} + +/// Additional methods for the `UdpSocket` type in the standard library. +pub trait UdpSocketExt { + /// Execute an overlapped receive I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O read (via `WSARecvFrom`) on + /// this socket. The provided buffer will be filled in when the operation + /// completes, the source from where the data came from will be written to + /// `addr`, and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were read. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf`, + /// `addr`, and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn recv_from_overlapped( + &self, + buf: &mut [u8], + addr: *mut SocketAddrBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped receive I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O read (via `WSARecv`) on + /// this socket. The provided buffer will be filled in when the operation + /// completes, the source from where the data came from will be written to + /// `addr`, and the given `OVERLAPPED` instance is used to track the + /// overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n))` is returned where `n` is the + /// number of bytes that were read. If the operation returns an error + /// indicating that the I/O is currently pending, `Ok(None)` is returned. + /// Otherwise, the error associated with the operation is returned and no + /// overlapped operation is enqueued. + /// + /// The number of bytes read will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf`, + /// and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn recv_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped send I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O write (via `WSASendTo`) on + /// this socket to the address specified by `addr`. The provided buffer will + /// be written when the operation completes and the given `OVERLAPPED` + /// instance is used to track the overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte + /// were written. If the operation returns an error indicating that the I/O + /// is currently pending, `Ok(None)` is returned. Otherwise, the error + /// associated with the operation is returned and no overlapped operation + /// is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn send_to_overlapped( + &self, + buf: &[u8], + addr: &SocketAddr, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Execute an overlapped send I/O operation on this UDP socket. + /// + /// This function will issue an overlapped I/O write (via `WSASend`) on + /// this socket to the address it was previously connected to. The provided + /// buffer will be written when the operation completes and the given `OVERLAPPED` + /// instance is used to track the overlapped operation. + /// + /// If the operation succeeds, `Ok(Some(n0)` is returned where `n` byte + /// were written. If the operation returns an error indicating that the I/O + /// is currently pending, `Ok(None)` is returned. Otherwise, the error + /// associated with the operation is returned and no overlapped operation + /// is enqueued. + /// + /// The number of bytes written will be returned as part of the completion + /// notification when the I/O finishes. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers are valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that these two input + /// pointers are valid until the I/O operation is completed, typically via + /// completion ports and waiting to receive the completion notification on + /// the port. + unsafe fn send_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>; +} + +/// Additional methods for the `TcpListener` type in the standard library. +pub trait TcpListenerExt { + /// Perform an accept operation on this listener, accepting a connection in + /// an overlapped fashion. + /// + /// This function will issue an I/O request to accept an incoming connection + /// with the specified overlapped instance. The `socket` provided must be a + /// configured but not bound or connected socket, and if successful this + /// will consume the internal socket of the builder to return a TCP stream. + /// + /// The `addrs` buffer provided will be filled in with the local and remote + /// addresses of the connection upon completion. + /// + /// If the accept succeeds immediately, `Ok(true)` is returned. If + /// the connect indicates that the I/O is currently pending, `Ok(false)` is + /// returned. Otherwise, the error associated with the operation is + /// returned and no overlapped operation is enqueued. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `addrs` and `overlapped` pointers are valid until the end of the I/O + /// operation. The kernel also requires that `overlapped` is unique for this + /// I/O operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + unsafe fn accept_overlapped( + &self, + socket: &TcpStream, + addrs: &mut AcceptAddrsBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<bool>; + + /// Once an `accept_overlapped` has finished, this function needs to be + /// called to finish the accept operation. + /// + /// Currently this just calls `setsockopt` with `SO_UPDATE_ACCEPT_CONTEXT` + /// to ensure that further functions like `getpeername` and `getsockname` + /// work correctly. + fn accept_complete(&self, socket: &TcpStream) -> io::Result<()>; + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred, along with the results of the `lpFlags` parameter of + /// the relevant operation, if applicable. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `OVERLAPPED` instance. + /// + /// # Panics + /// + /// This function will panic + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)>; +} + +#[doc(hidden)] +trait NetInt { + fn from_be(i: Self) -> Self; + fn to_be(&self) -> Self; +} +macro_rules! doit { + ($($t:ident)*) => ($(impl NetInt for $t { + fn from_be(i: Self) -> Self { <$t>::from_be(i) } + fn to_be(&self) -> Self { <$t>::to_be(*self) } + })*) +} +doit! { i8 i16 i32 i64 isize u8 u16 u32 u64 usize } + +// fn hton<I: NetInt>(i: I) -> I { i.to_be() } +fn ntoh<I: NetInt>(i: I) -> I { + I::from_be(i) +} + +fn last_err() -> io::Result<Option<usize>> { + let err = unsafe { WSAGetLastError() }; + if err == WSA_IO_PENDING as i32 { + Ok(None) + } else { + Err(io::Error::from_raw_os_error(err)) + } +} + +fn cvt(i: i32, size: u32) -> io::Result<Option<usize>> { + if i == SOCKET_ERROR { + last_err() + } else { + Ok(Some(size as usize)) + } +} + +/// A type with the same memory layout as `SOCKADDR`. Used in converting Rust level +/// SocketAddr* types into their system representation. The benefit of this specific +/// type over using `SOCKADDR_STORAGE` is that this type is exactly as large as it +/// needs to be and not a lot larger. And it can be initialized cleaner from Rust. +#[repr(C)] +pub(crate) union SocketAddrCRepr { + v4: SOCKADDR_IN, + v6: SOCKADDR_IN6, +} + +impl SocketAddrCRepr { + pub(crate) fn as_ptr(&self) -> *const SOCKADDR { + self as *const _ as *const SOCKADDR + } +} + +fn socket_addr_to_ptrs(addr: &SocketAddr) -> (SocketAddrCRepr, i32) { + match *addr { + SocketAddr::V4(ref a) => { + let sin_addr = IN_ADDR { + S_un: IN_ADDR_0 { + S_addr: u32::from_ne_bytes(a.ip().octets()), + }, + }; + + let sockaddr_in = SOCKADDR_IN { + sin_family: AF_INET as _, + sin_port: a.port().to_be(), + sin_addr, + sin_zero: [0; 8], + }; + + let sockaddr = SocketAddrCRepr { v4: sockaddr_in }; + (sockaddr, mem::size_of::<SOCKADDR_IN>() as i32) + } + SocketAddr::V6(ref a) => { + let sockaddr_in6 = SOCKADDR_IN6 { + sin6_family: AF_INET6 as _, + sin6_port: a.port().to_be(), + sin6_addr: IN6_ADDR { + u: IN6_ADDR_0 { + Byte: a.ip().octets(), + }, + }, + sin6_flowinfo: a.flowinfo(), + Anonymous: SOCKADDR_IN6_0 { + sin6_scope_id: a.scope_id(), + }, + }; + + let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 }; + (sockaddr, mem::size_of::<SOCKADDR_IN6>() as i32) + } + } +} + +unsafe fn ptrs_to_socket_addr(ptr: *const SOCKADDR, len: i32) -> Option<SocketAddr> { + if (len as usize) < mem::size_of::<i32>() { + return None; + } + match (*ptr).sa_family as _ { + AF_INET if len as usize >= mem::size_of::<SOCKADDR_IN>() => { + let b = &*(ptr as *const SOCKADDR_IN); + let ip = ntoh(b.sin_addr.S_un.S_addr); + let ip = Ipv4Addr::new( + (ip >> 24) as u8, + (ip >> 16) as u8, + (ip >> 8) as u8, + ip as u8, + ); + Some(SocketAddr::V4(SocketAddrV4::new(ip, ntoh(b.sin_port)))) + } + AF_INET6 if len as usize >= mem::size_of::<SOCKADDR_IN6>() => { + let b = &*(ptr as *const SOCKADDR_IN6); + let arr = &b.sin6_addr.u.Byte; + let ip = Ipv6Addr::new( + ((arr[0] as u16) << 8) | (arr[1] as u16), + ((arr[2] as u16) << 8) | (arr[3] as u16), + ((arr[4] as u16) << 8) | (arr[5] as u16), + ((arr[6] as u16) << 8) | (arr[7] as u16), + ((arr[8] as u16) << 8) | (arr[9] as u16), + ((arr[10] as u16) << 8) | (arr[11] as u16), + ((arr[12] as u16) << 8) | (arr[13] as u16), + ((arr[14] as u16) << 8) | (arr[15] as u16), + ); + let addr = SocketAddrV6::new( + ip, + ntoh(b.sin6_port), + ntoh(b.sin6_flowinfo), + ntoh(b.Anonymous.sin6_scope_id), + ); + Some(SocketAddr::V6(addr)) + } + _ => None, + } +} + +unsafe fn slice2buf(slice: &[u8]) -> WSABUF { + WSABUF { + len: cmp::min(slice.len(), <u32>::max_value() as usize) as u32, + buf: slice.as_ptr() as *mut _, + } +} + +unsafe fn result(socket: SOCKET, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + let mut transferred = 0; + let mut flags = 0; + let r = WSAGetOverlappedResult(socket, overlapped, &mut transferred, FALSE, &mut flags); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok((transferred as usize, flags)) + } +} + +impl TcpStreamExt for TcpStream { + unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut bytes_read: u32 = 0; + let r = WSARecv( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut bytes_read, + &mut flags, + overlapped, + None, + ); + cvt(r, bytes_read) + } + + unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut bytes_written = 0; + + // Note here that we capture the number of bytes written. The + // documentation on MSDN, however, states: + // + // > Use NULL for this parameter if the lpOverlapped parameter is not + // > NULL to avoid potentially erroneous results. This parameter can be + // > NULL only if the lpOverlapped parameter is not NULL. + // + // If we're not passing a null overlapped pointer here, then why are we + // then capturing the number of bytes! Well so it turns out that this is + // clearly faster to learn the bytes here rather than later calling + // `WSAGetOverlappedResult`, and in practice almost all implementations + // use this anyway [1]. + // + // As a result we use this to and report back the result. + // + // [1]: https://github.com/carllerche/mio/pull/520#issuecomment-273983823 + let r = WSASend( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut bytes_written, + 0, + overlapped, + None, + ); + cvt(r, bytes_written) + } + + unsafe fn connect_overlapped( + &self, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + connect_overlapped(self.as_raw_socket() as SOCKET, addr, buf, overlapped) + } + + fn connect_complete(&self) -> io::Result<()> { + const SO_UPDATE_CONNECT_CONTEXT: i32 = 0x7010; + let result = unsafe { + setsockopt( + self.as_raw_socket() as SOCKET, + SOL_SOCKET as _, + SO_UPDATE_CONNECT_CONTEXT, + 0 as *mut _, + 0, + ) + }; + if result == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +unsafe fn connect_overlapped( + socket: SOCKET, + addr: &SocketAddr, + buf: &[u8], + overlapped: *mut OVERLAPPED, +) -> io::Result<Option<usize>> { + static CONNECTEX: WsaExtension = WsaExtension { + guid: GUID { + data1: 0x25a207b9, + data2: 0xddf3, + data3: 0x4660, + data4: [0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e], + }, + val: AtomicUsize::new(0), + }; + + let ptr = CONNECTEX.get(socket)?; + assert!(ptr != 0); + let connect_ex = mem::transmute::<_, LPFN_CONNECTEX>(ptr).unwrap(); + + let (addr_buf, addr_len) = socket_addr_to_ptrs(addr); + let mut bytes_sent: u32 = 0; + let r = connect_ex( + socket, + addr_buf.as_ptr(), + addr_len, + buf.as_ptr() as *mut _, + buf.len() as u32, + &mut bytes_sent, + overlapped, + ); + if r == TRUE { + Ok(Some(bytes_sent as usize)) + } else { + last_err() + } +} + +impl UdpSocketExt for UdpSocket { + unsafe fn recv_from_overlapped( + &self, + buf: &mut [u8], + addr: *mut SocketAddrBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut received_bytes: u32 = 0; + let r = WSARecvFrom( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut received_bytes, + &mut flags, + &mut (*addr).buf as *mut _ as *mut _, + &mut (*addr).len, + overlapped, + None, + ); + cvt(r, received_bytes) + } + + unsafe fn recv_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut flags = 0; + let mut received_bytes: u32 = 0; + let r = WSARecv( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut received_bytes, + &mut flags, + overlapped, + None, + ); + cvt(r, received_bytes) + } + + unsafe fn send_to_overlapped( + &self, + buf: &[u8], + addr: &SocketAddr, + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let (addr_buf, addr_len) = socket_addr_to_ptrs(addr); + let mut buf = slice2buf(buf); + let mut sent_bytes = 0; + let r = WSASendTo( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut sent_bytes, + 0, + addr_buf.as_ptr() as *const _, + addr_len, + overlapped, + None, + ); + cvt(r, sent_bytes) + } + + unsafe fn send_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + let mut buf = slice2buf(buf); + let mut sent_bytes = 0; + let r = WSASend( + self.as_raw_socket() as SOCKET, + &mut buf, + 1, + &mut sent_bytes, + 0, + overlapped, + None, + ); + cvt(r, sent_bytes) + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +impl TcpListenerExt for TcpListener { + unsafe fn accept_overlapped( + &self, + socket: &TcpStream, + addrs: &mut AcceptAddrsBuf, + overlapped: *mut OVERLAPPED, + ) -> io::Result<bool> { + static ACCEPTEX: WsaExtension = WsaExtension { + guid: GUID { + data1: 0xb5367df1, + data2: 0xcbac, + data3: 0x11cf, + data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92], + }, + val: AtomicUsize::new(0), + }; + + let ptr = ACCEPTEX.get(self.as_raw_socket() as SOCKET)?; + assert!(ptr != 0); + let accept_ex = mem::transmute::<_, LPFN_ACCEPTEX>(ptr).unwrap(); + + let mut bytes = 0; + let (a, b, c, d) = (*addrs).args(); + let r = accept_ex( + self.as_raw_socket() as SOCKET, + socket.as_raw_socket() as SOCKET, + a, + b, + c, + d, + &mut bytes, + overlapped, + ); + let succeeded = if r == TRUE { + true + } else { + last_err()?; + false + }; + Ok(succeeded) + } + + fn accept_complete(&self, socket: &TcpStream) -> io::Result<()> { + const SO_UPDATE_ACCEPT_CONTEXT: i32 = 0x700B; + let me = self.as_raw_socket(); + let result = unsafe { + setsockopt( + socket.as_raw_socket() as SOCKET, + SOL_SOCKET as _, + SO_UPDATE_ACCEPT_CONTEXT, + &me as *const _ as *mut _, + mem::size_of_val(&me) as i32, + ) + }; + if result == 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + + unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<(usize, u32)> { + result(self.as_raw_socket() as SOCKET, overlapped) + } +} + +impl SocketAddrBuf { + /// Creates a new blank socket address buffer. + /// + /// This should be used before a call to `recv_from_overlapped` overlapped + /// to create an instance to pass down. + pub fn new() -> SocketAddrBuf { + SocketAddrBuf { + buf: unsafe { mem::zeroed() }, + len: mem::size_of::<SOCKADDR_STORAGE>() as i32, + } + } + + /// Parses this buffer to return a standard socket address. + /// + /// This function should be called after the buffer has been filled in with + /// a call to `recv_from_overlapped` being completed. It will interpret the + /// address filled in and return the standard socket address type. + /// + /// If an error is encountered then `None` is returned. + pub fn to_socket_addr(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(&self.buf as *const _ as *const _, self.len) } + } +} + +static GETACCEPTEXSOCKADDRS: WsaExtension = WsaExtension { + guid: GUID { + data1: 0xb5367df2, + data2: 0xcbac, + data3: 0x11cf, + data4: [0x95, 0xca, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92], + }, + val: AtomicUsize::new(0), +}; + +impl AcceptAddrsBuf { + /// Creates a new blank buffer ready to be passed to a call to + /// `accept_overlapped`. + pub fn new() -> AcceptAddrsBuf { + unsafe { mem::zeroed() } + } + + /// Parses the data contained in this address buffer, returning the parsed + /// result if successful. + /// + /// This function can be called after a call to `accept_overlapped` has + /// succeeded to parse out the data that was written in. + pub fn parse(&self, socket: &TcpListener) -> io::Result<AcceptAddrs> { + let mut ret = AcceptAddrs { + local: 0 as *mut _, + local_len: 0, + remote: 0 as *mut _, + remote_len: 0, + _data: self, + }; + let ptr = GETACCEPTEXSOCKADDRS.get(socket.as_raw_socket() as SOCKET)?; + assert!(ptr != 0); + unsafe { + let get_sockaddrs = mem::transmute::<_, LPFN_GETACCEPTEXSOCKADDRS>(ptr).unwrap(); + let (a, b, c, d) = self.args(); + get_sockaddrs( + a, + b, + c, + d, + &mut ret.local, + &mut ret.local_len, + &mut ret.remote, + &mut ret.remote_len, + ); + Ok(ret) + } + } + + #[allow(deref_nullptr)] + fn args(&self) -> (*mut std::ffi::c_void, u32, u32, u32) { + let remote_offset = unsafe { &(*(0 as *const AcceptAddrsBuf)).remote as *const _ as usize }; + ( + self as *const _ as *mut _, + 0, + remote_offset as u32, + (mem::size_of_val(self) - remote_offset) as u32, + ) + } +} + +impl<'a> AcceptAddrs<'a> { + /// Returns the local socket address contained in this buffer. + pub fn local(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(self.local, self.local_len) } + } + + /// Returns the remote socket address contained in this buffer. + pub fn remote(&self) -> Option<SocketAddr> { + unsafe { ptrs_to_socket_addr(self.remote, self.remote_len) } + } +} + +impl WsaExtension { + fn get(&self, socket: SOCKET) -> io::Result<usize> { + let prev = self.val.load(Ordering::SeqCst); + if prev != 0 && !cfg!(debug_assertions) { + return Ok(prev); + } + let mut ret = 0 as usize; + let mut bytes = 0; + + // https://github.com/microsoft/win32metadata/issues/671 + const SIO_GET_EXTENSION_FUNCTION_POINTER: u32 = 33_5544_3206u32; + + let r = unsafe { + WSAIoctl( + socket, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &self.guid as *const _ as *mut _, + mem::size_of_val(&self.guid) as u32, + &mut ret as *mut _ as *mut _, + mem::size_of_val(&ret) as u32, + &mut bytes, + 0 as *mut _, + None, + ) + }; + cvt(r, 0).map(|_| { + debug_assert_eq!(bytes as usize, mem::size_of_val(&ret)); + debug_assert!(prev == 0 || prev == ret); + self.val.store(ret, Ordering::SeqCst); + ret + }) + } +} + +#[cfg(test)] +mod tests { + use std::io::prelude::*; + use std::net::{ + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6, TcpListener, TcpStream, UdpSocket, + }; + use std::slice; + use std::thread; + + use socket2::{Domain, Socket, Type}; + + use crate::iocp::CompletionPort; + use crate::net::{AcceptAddrsBuf, TcpListenerExt}; + use crate::net::{SocketAddrBuf, TcpStreamExt, UdpSocketExt}; + use crate::Overlapped; + + fn each_ip(f: &mut dyn FnMut(SocketAddr)) { + f(t!("127.0.0.1:0".parse())); + f(t!("[::1]:0".parse())); + } + + #[test] + fn tcp_read() { + each_ip(&mut |addr| { + let l = t!(TcpListener::bind(addr)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let mut a = t!(l.accept()).0; + t!(a.write_all(&[1, 2, 3])); + }); + + let cp = t!(CompletionPort::new(1)); + let s = t!(TcpStream::connect(addr)); + t!(cp.add_socket(1, &s)); + + let mut b = [0; 10]; + let a = Overlapped::zero(); + unsafe { + t!(s.read_overlapped(&mut b, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&b[0..3], &[1, 2, 3]); + + t!(t.join()); + }) + } + + #[test] + fn tcp_write() { + each_ip(&mut |addr| { + let l = t!(TcpListener::bind(addr)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let mut a = t!(l.accept()).0; + let mut b = [0; 10]; + let n = t!(a.read(&mut b)); + assert_eq!(n, 3); + assert_eq!(&b[0..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + let s = t!(TcpStream::connect(addr)); + t!(cp.add_socket(1, &s)); + + let b = [1, 2, 3]; + let a = Overlapped::zero(); + unsafe { + t!(s.write_overlapped(&b, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn tcp_connect() { + each_ip(&mut |addr_template| { + let l = t!(TcpListener::bind(addr_template)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + t!(l.accept()); + }); + + let cp = t!(CompletionPort::new(1)); + let domain = Domain::for_address(addr); + let socket = t!(Socket::new(domain, Type::STREAM, None)); + t!(socket.bind(&addr_template.into())); + let socket = TcpStream::from(socket); + t!(cp.add_socket(1, &socket)); + + let a = Overlapped::zero(); + unsafe { + t!(socket.connect_overlapped(&addr, &[], a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + t!(socket.connect_complete()); + + t!(t.join()); + }) + } + + #[test] + fn udp_recv_from() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + let t = thread::spawn(move || { + t!(a.send_to(&[1, 2, 3], b_addr)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let mut buf = [0; 10]; + let a = Overlapped::zero(); + let mut addr = SocketAddrBuf::new(); + unsafe { + t!(b.recv_from_overlapped(&mut buf, &mut addr, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&buf[..3], &[1, 2, 3]); + assert_eq!(addr.to_socket_addr(), Some(a_addr)); + + t!(t.join()); + }) + } + + #[test] + fn udp_recv() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + assert!(b.connect(a_addr).is_ok()); + assert!(a.connect(b_addr).is_ok()); + let t = thread::spawn(move || { + t!(a.send_to(&[1, 2, 3], b_addr)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let mut buf = [0; 10]; + let a = Overlapped::zero(); + unsafe { + t!(b.recv_overlapped(&mut buf, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + assert_eq!(&buf[..3], &[1, 2, 3]); + + t!(t.join()); + }) + } + + #[test] + fn udp_send_to() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + let t = thread::spawn(move || { + let mut b = [0; 100]; + let (n, addr) = t!(a.recv_from(&mut b)); + assert_eq!(n, 3); + assert_eq!(addr, b_addr); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let a = Overlapped::zero(); + unsafe { + t!(b.send_to_overlapped(&[1, 2, 3], &a_addr, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn udp_send() { + each_ip(&mut |addr| { + let a = t!(UdpSocket::bind(addr)); + let b = t!(UdpSocket::bind(addr)); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + assert!(b.connect(a_addr).is_ok()); + assert!(a.connect(b_addr).is_ok()); + let t = thread::spawn(move || { + let mut b = [0; 100]; + let (n, addr) = t!(a.recv_from(&mut b)); + assert_eq!(n, 3); + assert_eq!(addr, b_addr); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_socket(1, &b)); + + let a = Overlapped::zero(); + unsafe { + t!(b.send_overlapped(&[1, 2, 3], a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + + t!(t.join()); + }) + } + + #[test] + fn tcp_accept() { + each_ip(&mut |addr_template| { + let l = t!(TcpListener::bind(addr_template)); + let addr = t!(l.local_addr()); + let t = thread::spawn(move || { + let socket = t!(TcpStream::connect(addr)); + (socket.local_addr().unwrap(), socket.peer_addr().unwrap()) + }); + + let cp = t!(CompletionPort::new(1)); + let domain = Domain::for_address(addr); + let socket = TcpStream::from(t!(Socket::new(domain, Type::STREAM, None))); + t!(cp.add_socket(1, &l)); + + let a = Overlapped::zero(); + let mut addrs = AcceptAddrsBuf::new(); + unsafe { + t!(l.accept_overlapped(&socket, &mut addrs, a.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 1); + assert_eq!(status.overlapped(), a.raw()); + t!(l.accept_complete(&socket)); + + let (remote, local) = t!(t.join()); + let addrs = addrs.parse(&l).unwrap(); + assert_eq!(addrs.local(), Some(local)); + assert_eq!(addrs.remote(), Some(remote)); + }) + } + + #[test] + fn sockaddr_convert_4() { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(3, 4, 5, 6)), 0xabcd); + let (raw_addr, addr_len) = super::socket_addr_to_ptrs(&addr); + assert_eq!(addr_len, 16); + let addr_bytes = + unsafe { slice::from_raw_parts(raw_addr.as_ptr() as *const u8, addr_len as usize) }; + assert_eq!( + addr_bytes, + &[2, 0, 0xab, 0xcd, 3, 4, 5, 6, 0, 0, 0, 0, 0, 0, 0, 0] + ); + } + + #[test] + fn sockaddr_convert_v6() { + let port = 0xabcd; + let flowinfo = 0x12345678; + let scope_id = 0x87654321; + let addr = SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new( + 0x0102, 0x0304, 0x0506, 0x0708, 0x090a, 0x0b0c, 0x0d0e, 0x0f10, + ), + port, + flowinfo, + scope_id, + )); + let (raw_addr, addr_len) = super::socket_addr_to_ptrs(&addr); + assert_eq!(addr_len, 28); + let addr_bytes = + unsafe { slice::from_raw_parts(raw_addr.as_ptr() as *const u8, addr_len as usize) }; + assert_eq!( + addr_bytes, + &[ + 23, 0, // AF_INET6 + 0xab, 0xcd, // Port + 0x78, 0x56, 0x34, 0x12, // flowinfo + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, + 0x0f, 0x10, // IP + 0x21, 0x43, 0x65, 0x87, // scope_id + ] + ); + } +} diff --git a/vendor/miow/src/overlapped.rs b/vendor/miow/src/overlapped.rs index a5a538893..c84baa67a 100644 --- a/vendor/miow/src/overlapped.rs +++ b/vendor/miow/src/overlapped.rs @@ -1,93 +1,93 @@ -use std::fmt;
-use std::io;
-use std::mem;
-use std::ptr;
-
-use windows_sys::Win32::Foundation::*;
-use windows_sys::Win32::System::Threading::*;
-use windows_sys::Win32::System::IO::*;
-
-/// A wrapper around `OVERLAPPED` to provide "rustic" accessors and
-/// initializers.
-pub struct Overlapped(OVERLAPPED);
-
-impl fmt::Debug for Overlapped {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "OVERLAPPED")
- }
-}
-
-unsafe impl Send for Overlapped {}
-unsafe impl Sync for Overlapped {}
-
-impl Overlapped {
- /// Creates a new zeroed out instance of an overlapped I/O tracking state.
- ///
- /// This is suitable for passing to methods which will then later get
- /// notified via an I/O Completion Port.
- pub fn zero() -> Overlapped {
- Overlapped(unsafe { mem::zeroed() })
- }
-
- /// Creates a new `Overlapped` with an initialized non-null `hEvent`. The caller is
- /// responsible for calling `CloseHandle` on the `hEvent` field of the returned
- /// `Overlapped`. The event is created with `bManualReset` set to `FALSE`, meaning after a
- /// single thread waits on the event, it will be reset.
- pub fn initialize_with_autoreset_event() -> io::Result<Overlapped> {
- let event = unsafe { CreateEventW(ptr::null_mut(), 0i32, 0i32, ptr::null_mut()) };
- if event.is_null() {
- return Err(io::Error::last_os_error());
- }
- let mut overlapped = Self::zero();
- overlapped.set_event(event);
- Ok(overlapped)
- }
-
- /// Creates a new `Overlapped` function pointer from the underlying
- /// `OVERLAPPED`, wrapping in the "rusty" wrapper for working with
- /// accessors.
- ///
- /// # Unsafety
- ///
- /// This function doesn't validate `ptr` nor the lifetime of the returned
- /// pointer at all, it's recommended to use this method with extreme
- /// caution.
- pub unsafe fn from_raw<'a>(ptr: *mut OVERLAPPED) -> &'a mut Overlapped {
- &mut *(ptr as *mut Overlapped)
- }
-
- /// Gain access to the raw underlying data
- pub fn raw(&self) -> *mut OVERLAPPED {
- &self.0 as *const _ as *mut _
- }
-
- /// Sets the offset inside this overlapped structure.
- ///
- /// Note that for I/O operations in general this only has meaning for I/O
- /// handles that are on a seeking device that supports the concept of an
- /// offset.
- pub fn set_offset(&mut self, offset: u64) {
- self.0.Anonymous.Anonymous.Offset = offset as u32;
- self.0.Anonymous.Anonymous.OffsetHigh = (offset >> 32) as u32;
- }
-
- /// Reads the offset inside this overlapped structure.
- pub fn offset(&self) -> u64 {
- unsafe {
- (self.0.Anonymous.Anonymous.Offset as u64)
- | ((self.0.Anonymous.Anonymous.OffsetHigh as u64) << 32)
- }
- }
-
- /// Sets the `hEvent` field of this structure.
- ///
- /// The event specified can be null.
- pub fn set_event(&mut self, event: HANDLE) {
- self.0.hEvent = event;
- }
-
- /// Reads the `hEvent` field of this structure, may return null.
- pub fn event(&self) -> HANDLE {
- self.0.hEvent
- }
-}
+use std::fmt; +use std::io; +use std::mem; +use std::ptr; + +use windows_sys::Win32::Foundation::HANDLE; +use windows_sys::Win32::System::Threading::CreateEventW; +use windows_sys::Win32::System::IO::OVERLAPPED; + +/// A wrapper around `OVERLAPPED` to provide "rustic" accessors and +/// initializers. +pub struct Overlapped(OVERLAPPED); + +impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "OVERLAPPED") + } +} + +unsafe impl Send for Overlapped {} +unsafe impl Sync for Overlapped {} + +impl Overlapped { + /// Creates a new zeroed out instance of an overlapped I/O tracking state. + /// + /// This is suitable for passing to methods which will then later get + /// notified via an I/O Completion Port. + pub fn zero() -> Overlapped { + Overlapped(unsafe { mem::zeroed() }) + } + + /// Creates a new `Overlapped` with an initialized non-null `hEvent`. The caller is + /// responsible for calling `CloseHandle` on the `hEvent` field of the returned + /// `Overlapped`. The event is created with `bManualReset` set to `FALSE`, meaning after a + /// single thread waits on the event, it will be reset. + pub fn initialize_with_autoreset_event() -> io::Result<Overlapped> { + let event = unsafe { CreateEventW(ptr::null_mut(), 0i32, 0i32, ptr::null_mut()) }; + if event == 0 { + return Err(io::Error::last_os_error()); + } + let mut overlapped = Self::zero(); + overlapped.set_event(event); + Ok(overlapped) + } + + /// Creates a new `Overlapped` function pointer from the underlying + /// `OVERLAPPED`, wrapping in the "rusty" wrapper for working with + /// accessors. + /// + /// # Unsafety + /// + /// This function doesn't validate `ptr` nor the lifetime of the returned + /// pointer at all, it's recommended to use this method with extreme + /// caution. + pub unsafe fn from_raw<'a>(ptr: *mut OVERLAPPED) -> &'a mut Overlapped { + &mut *(ptr as *mut Overlapped) + } + + /// Gain access to the raw underlying data + pub fn raw(&self) -> *mut OVERLAPPED { + &self.0 as *const _ as *mut _ + } + + /// Sets the offset inside this overlapped structure. + /// + /// Note that for I/O operations in general this only has meaning for I/O + /// handles that are on a seeking device that supports the concept of an + /// offset. + pub fn set_offset(&mut self, offset: u64) { + self.0.Anonymous.Anonymous.Offset = offset as u32; + self.0.Anonymous.Anonymous.OffsetHigh = (offset >> 32) as u32; + } + + /// Reads the offset inside this overlapped structure. + pub fn offset(&self) -> u64 { + unsafe { + (self.0.Anonymous.Anonymous.Offset as u64) + | ((self.0.Anonymous.Anonymous.OffsetHigh as u64) << 32) + } + } + + /// Sets the `hEvent` field of this structure. + /// + /// The event specified can be null. + pub fn set_event(&mut self, event: HANDLE) { + self.0.hEvent = event; + } + + /// Reads the `hEvent` field of this structure, may return null. + pub fn event(&self) -> HANDLE { + self.0.hEvent + } +} diff --git a/vendor/miow/src/pipe.rs b/vendor/miow/src/pipe.rs index 68c2fd396..4ff425b29 100644 --- a/vendor/miow/src/pipe.rs +++ b/vendor/miow/src/pipe.rs @@ -1,785 +1,795 @@ -//! Interprocess Communication pipes
-//!
-//! A pipe is a section of shared memory that processes use for communication.
-//! The process that creates a pipe is the _pipe server_. A process that connects
-//! to a pipe is a _pipe client_. One process writes information to the pipe, then
-//! the other process reads the information from the pipe. This overview
-//! describes how to create, manage, and use pipes.
-//!
-//! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and
-//! [named pipes](#fn.named.html). Anonymous pipes require less overhead than
-//! named pipes, but offer limited services.
-//!
-//! # Anonymous pipes
-//!
-//! An anonymous pipe is an unnamed, one-way pipe that typically transfers data
-//! between a parent process and a child process. Anonymous pipes are always
-//! local; they cannot be used for communication over a network.
-//!
-//! # Named pipes
-//!
-//! A *named pipe* is a named, one-way or duplex pipe for communication between
-//! the pipe server and one or more pipe clients. All instances of a named pipe
-//! share the same pipe name, but each instance has its own buffers and handles,
-//! and provides a separate conduit for client/server communication. The use of
-//! instances enables multiple pipe clients to use the same named pipe
-//! simultaneously.
-//!
-//! Any process can access named pipes, subject to security checks, making named
-//! pipes an easy form of communication between related or unrelated processes.
-//!
-//! Any process can act as both a server and a client, making peer-to-peer
-//! communication possible. As used here, the term pipe server refers to a
-//! process that creates a named pipe, and the term pipe client refers to a
-//! process that connects to an instance of a named pipe.
-//!
-//! Named pipes can be used to provide communication between processes on the
-//! same computer or between processes on different computers across a network.
-//! If the server service is running, all named pipes are accessible remotely. If
-//! you intend to use a named pipe locally only, deny access to NT
-//! AUTHORITY\\NETWORK or switch to local RPC.
-//!
-//! # References
-//!
-//! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md)
-
-use crate::*;
-use std::cell::RefCell;
-use std::ffi::OsStr;
-use std::fs::{File, OpenOptions};
-use std::io;
-use std::io::prelude::*;
-use std::os::windows::ffi::*;
-use std::os::windows::io::*;
-use std::time::Duration;
-
-use crate::handle::Handle;
-use crate::overlapped::Overlapped;
-
-use windows_sys::Win32::Security::*;
-use windows_sys::Win32::Storage::FileSystem::*;
-use windows_sys::Win32::System::Pipes::*;
-use windows_sys::Win32::System::IO::*;
-
-/// Readable half of an anonymous pipe.
-#[derive(Debug)]
-pub struct AnonRead(Handle);
-
-/// Writable half of an anonymous pipe.
-#[derive(Debug)]
-pub struct AnonWrite(Handle);
-
-/// A named pipe that can accept connections.
-#[derive(Debug)]
-pub struct NamedPipe(Handle);
-
-/// A builder structure for creating a new named pipe.
-#[derive(Debug)]
-pub struct NamedPipeBuilder {
- name: Vec<u16>,
- dwOpenMode: u32,
- dwPipeMode: u32,
- nMaxInstances: u32,
- nOutBufferSize: u32,
- nInBufferSize: u32,
- nDefaultTimeOut: u32,
-}
-
-/// Creates a new anonymous in-memory pipe, returning the read/write ends of the
-/// pipe.
-///
-/// The buffer size for this pipe may also be specified, but the system will
-/// normally use this as a suggestion and it's not guaranteed that the buffer
-/// will be precisely this size.
-pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
- let mut read = 0 as HANDLE;
- let mut write = 0 as HANDLE;
- crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?;
- Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
-}
-
-impl Read for AnonRead {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.0.read(buf)
- }
-}
-impl<'a> Read for &'a AnonRead {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.0.read(buf)
- }
-}
-
-impl AsRawHandle for AnonRead {
- fn as_raw_handle(&self) -> HANDLE {
- self.0.raw()
- }
-}
-impl FromRawHandle for AnonRead {
- unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
- AnonRead(Handle::new(handle))
- }
-}
-impl IntoRawHandle for AnonRead {
- fn into_raw_handle(self) -> HANDLE {
- self.0.into_raw()
- }
-}
-
-impl Write for AnonWrite {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.0.write(buf)
- }
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
- }
-}
-impl<'a> Write for &'a AnonWrite {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.0.write(buf)
- }
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
- }
-}
-
-impl AsRawHandle for AnonWrite {
- fn as_raw_handle(&self) -> HANDLE {
- self.0.raw()
- }
-}
-impl FromRawHandle for AnonWrite {
- unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
- AnonWrite(Handle::new(handle))
- }
-}
-impl IntoRawHandle for AnonWrite {
- fn into_raw_handle(self) -> HANDLE {
- self.0.into_raw()
- }
-}
-
-/// A convenience function to connect to a named pipe.
-///
-/// This function will block the calling process until it can connect to the
-/// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
-/// to block until it can connect.
-pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
- _connect(addr.as_ref())
-}
-
-fn _connect(addr: &OsStr) -> io::Result<File> {
- let mut r = OpenOptions::new();
- let mut w = OpenOptions::new();
- let mut rw = OpenOptions::new();
- r.read(true);
- w.write(true);
- rw.read(true).write(true);
- loop {
- let res = rw
- .open(addr)
- .or_else(|_| r.open(addr))
- .or_else(|_| w.open(addr));
- match res {
- Ok(f) => return Ok(f),
- Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {}
- Err(e) => return Err(e),
- }
-
- NamedPipe::wait(addr, Some(Duration::new(20, 0)))?;
- }
-}
-
-impl NamedPipe {
- /// Creates a new initial named pipe.
- ///
- /// This function is equivalent to:
- ///
- /// ```
- /// use miow::pipe::NamedPipeBuilder;
- ///
- /// # let addr = "foo";
- /// NamedPipeBuilder::new(addr)
- /// .first(true)
- /// .inbound(true)
- /// .outbound(true)
- /// .out_buffer_size(65536)
- /// .in_buffer_size(65536)
- /// .create();
- /// ```
- pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
- NamedPipeBuilder::new(addr).create()
- }
-
- /// Waits until either a time-out interval elapses or an instance of the
- /// specified named pipe is available for connection.
- ///
- /// If this function succeeds the process can create a `File` to connect to
- /// the named pipe.
- pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> {
- NamedPipe::_wait(addr.as_ref(), timeout)
- }
-
- fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
- let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
- let timeout = crate::dur2ms(timeout);
- crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr() as _, timeout) }).map(|_| ())
- }
-
- /// Connects this named pipe to a client, blocking until one becomes
- /// available.
- ///
- /// This function will call the `ConnectNamedPipe` function to await for a
- /// client to connect. This can be called immediately after the pipe is
- /// created, or after it has been disconnected from a previous client.
- pub fn connect(&self) -> io::Result<()> {
- match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
- Ok(_) => Ok(()),
- Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()),
- Err(e) => Err(e),
- }
- }
-
- /// Issue a connection request with the specified overlapped operation.
- ///
- /// This function will issue a request to connect a client to this server,
- /// returning immediately after starting the overlapped operation.
- ///
- /// If this function immediately succeeds then `Ok(true)` is returned. If
- /// the overlapped operation is enqueued and pending, then `Ok(false)` is
- /// returned. Otherwise an error is returned indicating what went wrong.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the
- /// `overlapped` pointer is valid until the end of the I/O operation. The
- /// kernel also requires that `overlapped` is unique for this I/O operation
- /// and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that this pointer is
- /// valid until the I/O operation is completed, typically via completion
- /// ports and waiting to receive the completion notification on the port.
- pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
- match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
- Ok(_) => Ok(true),
- Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true),
- Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false),
- Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true),
- Err(e) => Err(e),
- }
- }
-
- /// Disconnects this named pipe from any connected client.
- pub fn disconnect(&self) -> io::Result<()> {
- crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ())
- }
-
- /// Issues an overlapped read operation to occur on this pipe.
- ///
- /// This function will issue an asynchronous read to occur in an overlapped
- /// fashion, returning immediately. The `buf` provided will be filled in
- /// with data and the request is tracked by the `overlapped` function
- /// provided.
- ///
- /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
- /// `n` is the number of bytes read. If an asynchronous operation is
- /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
- /// it is returned.
- ///
- /// When this operation completes (or if it completes immediately), another
- /// mechanism must be used to learn how many bytes were transferred (such as
- /// looking at the filed in the IOCP status message).
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf` and
- /// `overlapped` pointers to be valid until the end of the I/O operation.
- /// The kernel also requires that `overlapped` is unique for this I/O
- /// operation and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that the pointers are
- /// valid until the I/O operation is completed, typically via completion
- /// ports and waiting to receive the completion notification on the port.
- pub unsafe fn read_overlapped(
- &self,
- buf: &mut [u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- self.0.read_overlapped(buf, overlapped)
- }
-
- /// Issues an overlapped write operation to occur on this pipe.
- ///
- /// This function will issue an asynchronous write to occur in an overlapped
- /// fashion, returning immediately. The `buf` provided will be filled in
- /// with data and the request is tracked by the `overlapped` function
- /// provided.
- ///
- /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
- /// `n` is the number of bytes written. If an asynchronous operation is
- /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
- /// it is returned.
- ///
- /// When this operation completes (or if it completes immediately), another
- /// mechanism must be used to learn how many bytes were transferred (such as
- /// looking at the filed in the IOCP status message).
- ///
- /// # Unsafety
- ///
- /// This function is unsafe because the kernel requires that the `buf` and
- /// `overlapped` pointers to be valid until the end of the I/O operation.
- /// The kernel also requires that `overlapped` is unique for this I/O
- /// operation and is not in use for any other I/O.
- ///
- /// To safely use this function callers must ensure that the pointers are
- /// valid until the I/O operation is completed, typically via completion
- /// ports and waiting to receive the completion notification on the port.
- pub unsafe fn write_overlapped(
- &self,
- buf: &[u8],
- overlapped: *mut OVERLAPPED,
- ) -> io::Result<Option<usize>> {
- self.0.write_overlapped(buf, overlapped)
- }
-
- /// Calls the `GetOverlappedResult` function to get the result of an
- /// overlapped operation for this handle.
- ///
- /// This function takes the `OVERLAPPED` argument which must have been used
- /// to initiate an overlapped I/O operation, and returns either the
- /// successful number of bytes transferred during the operation or an error
- /// if one occurred.
- ///
- /// # Unsafety
- ///
- /// This function is unsafe as `overlapped` must have previously been used
- /// to execute an operation for this handle, and it must also be a valid
- /// pointer to an `Overlapped` instance.
- ///
- /// # Panics
- ///
- /// This function will panic
- pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
- let mut transferred = 0;
- let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE);
- if r == 0 {
- Err(io::Error::last_os_error())
- } else {
- Ok(transferred as usize)
- }
- }
-}
-
-thread_local! {
- static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
-}
-
-/// Call a function with a threadlocal `Overlapped`. The function `f` should be
-/// sure that the event is reset, either manually or by a thread being released.
-fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
-where
- F: FnOnce(&Overlapped) -> io::Result<usize>,
-{
- NAMED_PIPE_OVERLAPPED.with(|overlapped| {
- let mut mborrow = overlapped.borrow_mut();
- if let None = *mborrow {
- let op = Overlapped::initialize_with_autoreset_event()?;
- *mborrow = Some(op);
- }
- f(mborrow.as_ref().unwrap())
- })
-}
-
-impl Read for NamedPipe {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
- with_threadlocal_overlapped(|overlapped| unsafe {
- self.0
- .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
- })
- }
-}
-impl<'a> Read for &'a NamedPipe {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
- with_threadlocal_overlapped(|overlapped| unsafe {
- self.0
- .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
- })
- }
-}
-
-impl Write for NamedPipe {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
- with_threadlocal_overlapped(|overlapped| unsafe {
- self.0
- .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
- })
- }
- fn flush(&mut self) -> io::Result<()> {
- <&NamedPipe as Write>::flush(&mut &*self)
- }
-}
-impl<'a> Write for &'a NamedPipe {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
- with_threadlocal_overlapped(|overlapped| unsafe {
- self.0
- .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
- })
- }
- fn flush(&mut self) -> io::Result<()> {
- crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
- }
-}
-
-impl AsRawHandle for NamedPipe {
- fn as_raw_handle(&self) -> HANDLE {
- self.0.raw()
- }
-}
-impl FromRawHandle for NamedPipe {
- unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
- NamedPipe(Handle::new(handle))
- }
-}
-impl IntoRawHandle for NamedPipe {
- fn into_raw_handle(self) -> HANDLE {
- self.0.into_raw()
- }
-}
-
-fn flag(slot: &mut u32, on: bool, val: u32) {
- if on {
- *slot |= val;
- } else {
- *slot &= !val;
- }
-}
-
-impl NamedPipeBuilder {
- /// Creates a new named pipe builder with the default settings.
- pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
- NamedPipeBuilder {
- name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
- dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
- dwPipeMode: PIPE_TYPE_BYTE,
- nMaxInstances: PIPE_UNLIMITED_INSTANCES,
- nOutBufferSize: 65536,
- nInBufferSize: 65536,
- nDefaultTimeOut: 0,
- }
- }
-
- /// Indicates whether data is allowed to flow from the client to the server.
- pub fn inbound(&mut self, allowed: bool) -> &mut Self {
- flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
- self
- }
-
- /// Indicates whether data is allowed to flow from the server to the client.
- pub fn outbound(&mut self, allowed: bool) -> &mut Self {
- flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
- self
- }
-
- /// Indicates that this pipe must be the first instance.
- ///
- /// If set to true, then creation will fail if there's already an instance
- /// elsewhere.
- pub fn first(&mut self, first: bool) -> &mut Self {
- flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
- self
- }
-
- /// Indicates whether this server can accept remote clients or not.
- pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
- flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
- self
- }
-
- /// Specifies the maximum number of instances of the server pipe that are
- /// allowed.
- ///
- /// The first instance of a pipe can specify this value. A value of 255
- /// indicates that there is no limit to the number of instances.
- pub fn max_instances(&mut self, instances: u8) -> &mut Self {
- self.nMaxInstances = instances as u32;
- self
- }
-
- /// Specifies the number of bytes to reserver for the output buffer
- pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
- self.nOutBufferSize = buffer as u32;
- self
- }
-
- /// Specifies the number of bytes to reserver for the input buffer
- pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
- self.nInBufferSize = buffer as u32;
- self
- }
-
- /// Using the options in this builder, attempt to create a new named pipe.
- ///
- /// This function will call the `CreateNamedPipe` function and return the
- /// result.
- pub fn create(&mut self) -> io::Result<NamedPipe> {
- unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
- }
-
- /// Using the options in the builder and the provided security attributes, attempt to create a
- /// new named pipe. This function has to be called with a valid pointer to a
- /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
- /// null pointer.
- ///
- /// This function will call the `CreateNamedPipe` function and return the
- /// result.
- pub unsafe fn with_security_attributes(
- &mut self,
- attrs: *mut SECURITY_ATTRIBUTES,
- ) -> io::Result<NamedPipe> {
- let h = CreateNamedPipeW(
- self.name.as_mut_ptr(),
- self.dwOpenMode,
- self.dwPipeMode,
- self.nMaxInstances,
- self.nOutBufferSize,
- self.nInBufferSize,
- self.nDefaultTimeOut,
- attrs,
- );
-
- if h == INVALID_HANDLE_VALUE {
- Err(io::Error::last_os_error())
- } else {
- Ok(NamedPipe(Handle::new(h)))
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::fs::{File, OpenOptions};
- use std::io::prelude::*;
- use std::sync::mpsc::channel;
- use std::thread;
- use std::time::Duration;
-
- use rand::{distributions::Alphanumeric, thread_rng, Rng};
-
- use super::{anonymous, NamedPipe, NamedPipeBuilder};
- use crate::iocp::CompletionPort;
- use crate::Overlapped;
-
- fn name() -> String {
- let name = thread_rng()
- .sample_iter(Alphanumeric)
- .take(30)
- .map(char::from)
- .collect::<String>();
- format!(r"\\.\pipe\{}", name)
- }
-
- #[test]
- fn anon() {
- let (mut read, mut write) = t!(anonymous(256));
- assert_eq!(t!(write.write(&[1, 2, 3])), 3);
- let mut b = [0; 10];
- assert_eq!(t!(read.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
- }
-
- #[test]
- fn named_not_first() {
- let name = name();
- let _a = t!(NamedPipe::new(&name));
- assert!(NamedPipe::new(&name).is_err());
-
- t!(NamedPipeBuilder::new(&name).first(false).create());
- }
-
- #[test]
- fn named_connect() {
- let name = name();
- let a = t!(NamedPipe::new(&name));
-
- let t = thread::spawn(move || {
- t!(File::open(name));
- });
-
- t!(a.connect());
- t!(a.disconnect());
- t!(t.join());
- }
-
- #[test]
- fn named_wait() {
- let name = name();
- let a = t!(NamedPipe::new(&name));
-
- let (tx, rx) = channel();
- let t = thread::spawn(move || {
- t!(NamedPipe::wait(&name, None));
- t!(File::open(&name));
- assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
- t!(tx.send(()));
- });
-
- t!(a.connect());
- t!(rx.recv());
- t!(a.disconnect());
- t!(t.join());
- }
-
- #[test]
- fn named_connect_overlapped() {
- let name = name();
- let a = t!(NamedPipe::new(&name));
-
- let t = thread::spawn(move || {
- t!(File::open(name));
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_handle(2, &a));
-
- let over = Overlapped::zero();
- unsafe {
- t!(a.connect_overlapped(over.raw()));
- }
-
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 0);
- assert_eq!(status.token(), 2);
- assert_eq!(status.overlapped(), over.raw());
- t!(t.join());
- }
-
- #[test]
- fn named_read_write() {
- let name = name();
- let mut a = t!(NamedPipe::new(&name));
-
- let t = thread::spawn(move || {
- let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
- t!(f.write_all(&[1, 2, 3]));
- let mut b = [0; 10];
- assert_eq!(t!(f.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
- });
-
- t!(a.connect());
- let mut b = [0; 10];
- assert_eq!(t!(a.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
- t!(a.write_all(&[1, 2, 3]));
- t!(a.flush());
- t!(a.disconnect());
- t!(t.join());
- }
-
- #[test]
- fn named_read_write_multi() {
- for _ in 0..5 {
- named_read_write()
- }
- }
-
- #[test]
- fn named_read_write_multi_same_thread() {
- let name1 = name();
- let mut a1 = t!(NamedPipe::new(&name1));
- let name2 = name();
- let mut a2 = t!(NamedPipe::new(&name2));
-
- let t = thread::spawn(move || {
- let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
- t!(f.write_all(&[1, 2, 3]));
- let mut b = [0; 10];
- assert_eq!(t!(f.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
-
- let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
- t!(f.write_all(&[1, 2, 3]));
- let mut b = [0; 10];
- assert_eq!(t!(f.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
- });
-
- t!(a1.connect());
- let mut b = [0; 10];
- assert_eq!(t!(a1.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
- t!(a1.write_all(&[1, 2, 3]));
- t!(a1.flush());
- t!(a1.disconnect());
-
- t!(a2.connect());
- let mut b = [0; 10];
- assert_eq!(t!(a2.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3]);
- t!(a2.write_all(&[1, 2, 3]));
- t!(a2.flush());
- t!(a2.disconnect());
-
- t!(t.join());
- }
-
- #[test]
- fn named_read_overlapped() {
- let name = name();
- let a = t!(NamedPipe::new(&name));
-
- let t = thread::spawn(move || {
- let mut f = t!(File::create(name));
- t!(f.write_all(&[1, 2, 3]));
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_handle(3, &a));
- t!(a.connect());
-
- let mut b = [0; 10];
- let over = Overlapped::zero();
- unsafe {
- t!(a.read_overlapped(&mut b, over.raw()));
- }
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 3);
- assert_eq!(status.overlapped(), over.raw());
- assert_eq!(&b[..3], &[1, 2, 3]);
-
- t!(t.join());
- }
-
- #[test]
- fn named_write_overlapped() {
- let name = name();
- let a = t!(NamedPipe::new(&name));
-
- let t = thread::spawn(move || {
- let mut f = t!(super::connect(name));
- let mut b = [0; 10];
- assert_eq!(t!(f.read(&mut b)), 3);
- assert_eq!(&b[..3], &[1, 2, 3])
- });
-
- let cp = t!(CompletionPort::new(1));
- t!(cp.add_handle(3, &a));
- t!(a.connect());
-
- let over = Overlapped::zero();
- unsafe {
- t!(a.write_overlapped(&[1, 2, 3], over.raw()));
- }
-
- let status = t!(cp.get(None));
- assert_eq!(status.bytes_transferred(), 3);
- assert_eq!(status.token(), 3);
- assert_eq!(status.overlapped(), over.raw());
-
- t!(t.join());
- }
-}
+//! Interprocess Communication pipes +//! +//! A pipe is a section of shared memory that processes use for communication. +//! The process that creates a pipe is the _pipe server_. A process that connects +//! to a pipe is a _pipe client_. One process writes information to the pipe, then +//! the other process reads the information from the pipe. This overview +//! describes how to create, manage, and use pipes. +//! +//! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and +//! [named pipes](#fn.named.html). Anonymous pipes require less overhead than +//! named pipes, but offer limited services. +//! +//! # Anonymous pipes +//! +//! An anonymous pipe is an unnamed, one-way pipe that typically transfers data +//! between a parent process and a child process. Anonymous pipes are always +//! local; they cannot be used for communication over a network. +//! +//! # Named pipes +//! +//! A *named pipe* is a named, one-way or duplex pipe for communication between +//! the pipe server and one or more pipe clients. All instances of a named pipe +//! share the same pipe name, but each instance has its own buffers and handles, +//! and provides a separate conduit for client/server communication. The use of +//! instances enables multiple pipe clients to use the same named pipe +//! simultaneously. +//! +//! Any process can access named pipes, subject to security checks, making named +//! pipes an easy form of communication between related or unrelated processes. +//! +//! Any process can act as both a server and a client, making peer-to-peer +//! communication possible. As used here, the term pipe server refers to a +//! process that creates a named pipe, and the term pipe client refers to a +//! process that connects to an instance of a named pipe. +//! +//! Named pipes can be used to provide communication between processes on the +//! same computer or between processes on different computers across a network. +//! If the server service is running, all named pipes are accessible remotely. If +//! you intend to use a named pipe locally only, deny access to NT +//! AUTHORITY\\NETWORK or switch to local RPC. +//! +//! # References +//! +//! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md) + +use crate::FALSE; +use std::cell::RefCell; +use std::ffi::OsStr; +use std::fs::{File, OpenOptions}; +use std::io; +use std::io::{Read, Write}; +use std::os::windows::ffi::OsStrExt; +use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; +use std::time::Duration; + +use crate::handle::Handle; +use crate::overlapped::Overlapped; + +use windows_sys::Win32::Foundation::{ + ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_BUSY, ERROR_PIPE_CONNECTED, HANDLE, + INVALID_HANDLE_VALUE, +}; +use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; +use windows_sys::Win32::Storage::FileSystem::{ + FlushFileBuffers, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, + PIPE_ACCESS_INBOUND, PIPE_ACCESS_OUTBOUND, +}; +use windows_sys::Win32::System::Pipes::{ + ConnectNamedPipe, CreateNamedPipeW, CreatePipe, DisconnectNamedPipe, WaitNamedPipeW, + PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_UNLIMITED_INSTANCES, +}; +use windows_sys::Win32::System::IO::{GetOverlappedResult, OVERLAPPED}; + +/// Readable half of an anonymous pipe. +#[derive(Debug)] +pub struct AnonRead(Handle); + +/// Writable half of an anonymous pipe. +#[derive(Debug)] +pub struct AnonWrite(Handle); + +/// A named pipe that can accept connections. +#[derive(Debug)] +pub struct NamedPipe(Handle); + +/// A builder structure for creating a new named pipe. +#[derive(Debug)] +pub struct NamedPipeBuilder { + name: Vec<u16>, + dwOpenMode: u32, + dwPipeMode: u32, + nMaxInstances: u32, + nOutBufferSize: u32, + nInBufferSize: u32, + nDefaultTimeOut: u32, +} + +/// Creates a new anonymous in-memory pipe, returning the read/write ends of the +/// pipe. +/// +/// The buffer size for this pipe may also be specified, but the system will +/// normally use this as a suggestion and it's not guaranteed that the buffer +/// will be precisely this size. +pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> { + let mut read = 0 as HANDLE; + let mut write = 0 as HANDLE; + crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?; + Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write)))) +} + +impl Read for AnonRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } +} +impl<'a> Read for &'a AnonRead { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } +} + +impl AsRawHandle for AnonRead { + fn as_raw_handle(&self) -> RawHandle { + self.0.raw() as RawHandle + } +} +impl FromRawHandle for AnonRead { + unsafe fn from_raw_handle(handle: RawHandle) -> AnonRead { + AnonRead(Handle::new(handle as HANDLE)) + } +} +impl IntoRawHandle for AnonRead { + fn into_raw_handle(self) -> RawHandle { + self.0.into_raw() as RawHandle + } +} + +impl Write for AnonWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} +impl<'a> Write for &'a AnonWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsRawHandle for AnonWrite { + fn as_raw_handle(&self) -> RawHandle { + self.0.raw() as RawHandle + } +} +impl FromRawHandle for AnonWrite { + unsafe fn from_raw_handle(handle: RawHandle) -> AnonWrite { + AnonWrite(Handle::new(handle as HANDLE)) + } +} +impl IntoRawHandle for AnonWrite { + fn into_raw_handle(self) -> RawHandle { + self.0.into_raw() as RawHandle + } +} + +/// A convenience function to connect to a named pipe. +/// +/// This function will block the calling process until it can connect to the +/// pipe server specified by `addr`. This will use `NamedPipe::wait` internally +/// to block until it can connect. +pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> { + _connect(addr.as_ref()) +} + +fn _connect(addr: &OsStr) -> io::Result<File> { + let mut r = OpenOptions::new(); + let mut w = OpenOptions::new(); + let mut rw = OpenOptions::new(); + r.read(true); + w.write(true); + rw.read(true).write(true); + loop { + let res = rw + .open(addr) + .or_else(|_| r.open(addr)) + .or_else(|_| w.open(addr)); + match res { + Ok(f) => return Ok(f), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {} + Err(e) => return Err(e), + } + + NamedPipe::wait(addr, Some(Duration::new(20, 0)))?; + } +} + +impl NamedPipe { + /// Creates a new initial named pipe. + /// + /// This function is equivalent to: + /// + /// ``` + /// use miow::pipe::NamedPipeBuilder; + /// + /// # let addr = "foo"; + /// NamedPipeBuilder::new(addr) + /// .first(true) + /// .inbound(true) + /// .outbound(true) + /// .out_buffer_size(65536) + /// .in_buffer_size(65536) + /// .create(); + /// ``` + pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> { + NamedPipeBuilder::new(addr).create() + } + + /// Waits until either a time-out interval elapses or an instance of the + /// specified named pipe is available for connection. + /// + /// If this function succeeds the process can create a `File` to connect to + /// the named pipe. + pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> { + NamedPipe::_wait(addr.as_ref(), timeout) + } + + fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> { + let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>(); + let timeout = crate::dur2ms(timeout); + crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr() as _, timeout) }).map(|_| ()) + } + + /// Connects this named pipe to a client, blocking until one becomes + /// available. + /// + /// This function will call the `ConnectNamedPipe` function to await for a + /// client to connect. This can be called immediately after the pipe is + /// created, or after it has been disconnected from a previous client. + pub fn connect(&self) -> io::Result<()> { + match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) { + Ok(_) => Ok(()), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()), + Err(e) => Err(e), + } + } + + /// Issue a connection request with the specified overlapped operation. + /// + /// This function will issue a request to connect a client to this server, + /// returning immediately after starting the overlapped operation. + /// + /// If this function immediately succeeds then `Ok(true)` is returned. If + /// the overlapped operation is enqueued and pending, then `Ok(false)` is + /// returned. Otherwise an error is returned indicating what went wrong. + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the + /// `overlapped` pointer is valid until the end of the I/O operation. The + /// kernel also requires that `overlapped` is unique for this I/O operation + /// and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that this pointer is + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> { + match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) { + Ok(_) => Ok(true), + Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true), + Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false), + Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true), + Err(e) => Err(e), + } + } + + /// Disconnects this named pipe from any connected client. + pub fn disconnect(&self) -> io::Result<()> { + crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ()) + } + + /// Issues an overlapped read operation to occur on this pipe. + /// + /// This function will issue an asynchronous read to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes read. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn read_overlapped( + &self, + buf: &mut [u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.0.read_overlapped(buf, overlapped) + } + + /// Issues an overlapped write operation to occur on this pipe. + /// + /// This function will issue an asynchronous write to occur in an overlapped + /// fashion, returning immediately. The `buf` provided will be filled in + /// with data and the request is tracked by the `overlapped` function + /// provided. + /// + /// If the operation succeeds immediately, `Ok(Some(n))` is returned where + /// `n` is the number of bytes written. If an asynchronous operation is + /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred + /// it is returned. + /// + /// When this operation completes (or if it completes immediately), another + /// mechanism must be used to learn how many bytes were transferred (such as + /// looking at the filed in the IOCP status message). + /// + /// # Unsafety + /// + /// This function is unsafe because the kernel requires that the `buf` and + /// `overlapped` pointers to be valid until the end of the I/O operation. + /// The kernel also requires that `overlapped` is unique for this I/O + /// operation and is not in use for any other I/O. + /// + /// To safely use this function callers must ensure that the pointers are + /// valid until the I/O operation is completed, typically via completion + /// ports and waiting to receive the completion notification on the port. + pub unsafe fn write_overlapped( + &self, + buf: &[u8], + overlapped: *mut OVERLAPPED, + ) -> io::Result<Option<usize>> { + self.0.write_overlapped(buf, overlapped) + } + + /// Calls the `GetOverlappedResult` function to get the result of an + /// overlapped operation for this handle. + /// + /// This function takes the `OVERLAPPED` argument which must have been used + /// to initiate an overlapped I/O operation, and returns either the + /// successful number of bytes transferred during the operation or an error + /// if one occurred. + /// + /// # Unsafety + /// + /// This function is unsafe as `overlapped` must have previously been used + /// to execute an operation for this handle, and it must also be a valid + /// pointer to an `Overlapped` instance. + /// + /// # Panics + /// + /// This function will panic + pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> { + let mut transferred = 0; + let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(transferred as usize) + } + } +} + +thread_local! { + static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None); +} + +/// Call a function with a threadlocal `Overlapped`. The function `f` should be +/// sure that the event is reset, either manually or by a thread being released. +fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize> +where + F: FnOnce(&Overlapped) -> io::Result<usize>, +{ + NAMED_PIPE_OVERLAPPED.with(|overlapped| { + let mut mborrow = overlapped.borrow_mut(); + if let None = *mborrow { + let op = Overlapped::initialize_with_autoreset_event()?; + *mborrow = Some(op); + } + f(mborrow.as_ref().unwrap()) + }) +} + +impl Read for NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } +} +impl<'a> Read for &'a NamedPipe { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } +} + +impl Write for NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } + fn flush(&mut self) -> io::Result<()> { + <&NamedPipe as Write>::flush(&mut &*self) + } +} +impl<'a> Write for &'a NamedPipe { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. + with_threadlocal_overlapped(|overlapped| unsafe { + self.0 + .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) + }) + } + fn flush(&mut self) -> io::Result<()> { + crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ()) + } +} + +impl AsRawHandle for NamedPipe { + fn as_raw_handle(&self) -> RawHandle { + self.0.raw() as RawHandle + } +} +impl FromRawHandle for NamedPipe { + unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe { + NamedPipe(Handle::new(handle as HANDLE)) + } +} +impl IntoRawHandle for NamedPipe { + fn into_raw_handle(self) -> RawHandle { + self.0.into_raw() as RawHandle + } +} + +fn flag(slot: &mut u32, on: bool, val: u32) { + if on { + *slot |= val; + } else { + *slot &= !val; + } +} + +impl NamedPipeBuilder { + /// Creates a new named pipe builder with the default settings. + pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder { + NamedPipeBuilder { + name: addr.as_ref().encode_wide().chain(Some(0)).collect(), + dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, + dwPipeMode: PIPE_TYPE_BYTE, + nMaxInstances: PIPE_UNLIMITED_INSTANCES, + nOutBufferSize: 65536, + nInBufferSize: 65536, + nDefaultTimeOut: 0, + } + } + + /// Indicates whether data is allowed to flow from the client to the server. + pub fn inbound(&mut self, allowed: bool) -> &mut Self { + flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND); + self + } + + /// Indicates whether data is allowed to flow from the server to the client. + pub fn outbound(&mut self, allowed: bool) -> &mut Self { + flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND); + self + } + + /// Indicates that this pipe must be the first instance. + /// + /// If set to true, then creation will fail if there's already an instance + /// elsewhere. + pub fn first(&mut self, first: bool) -> &mut Self { + flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE); + self + } + + /// Indicates whether this server can accept remote clients or not. + pub fn accept_remote(&mut self, accept: bool) -> &mut Self { + flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS); + self + } + + /// Specifies the maximum number of instances of the server pipe that are + /// allowed. + /// + /// The first instance of a pipe can specify this value. A value of 255 + /// indicates that there is no limit to the number of instances. + pub fn max_instances(&mut self, instances: u8) -> &mut Self { + self.nMaxInstances = instances as u32; + self + } + + /// Specifies the number of bytes to reserver for the output buffer + pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self { + self.nOutBufferSize = buffer as u32; + self + } + + /// Specifies the number of bytes to reserver for the input buffer + pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self { + self.nInBufferSize = buffer as u32; + self + } + + /// Using the options in this builder, attempt to create a new named pipe. + /// + /// This function will call the `CreateNamedPipe` function and return the + /// result. + pub fn create(&mut self) -> io::Result<NamedPipe> { + unsafe { self.with_security_attributes(::std::ptr::null_mut()) } + } + + /// Using the options in the builder and the provided security attributes, attempt to create a + /// new named pipe. This function has to be called with a valid pointer to a + /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a + /// null pointer. + /// + /// This function will call the `CreateNamedPipe` function and return the + /// result. + pub unsafe fn with_security_attributes( + &mut self, + attrs: *mut SECURITY_ATTRIBUTES, + ) -> io::Result<NamedPipe> { + let h = CreateNamedPipeW( + self.name.as_mut_ptr(), + self.dwOpenMode, + self.dwPipeMode, + self.nMaxInstances, + self.nOutBufferSize, + self.nInBufferSize, + self.nDefaultTimeOut, + attrs, + ); + + if h == INVALID_HANDLE_VALUE { + Err(io::Error::last_os_error()) + } else { + Ok(NamedPipe(Handle::new(h))) + } + } +} + +#[cfg(test)] +mod tests { + use std::fs::{File, OpenOptions}; + use std::io::prelude::*; + use std::sync::mpsc::channel; + use std::thread; + use std::time::Duration; + + use rand::{distributions::Alphanumeric, thread_rng, Rng}; + + use super::{anonymous, NamedPipe, NamedPipeBuilder}; + use crate::iocp::CompletionPort; + use crate::Overlapped; + + fn name() -> String { + let name = thread_rng() + .sample_iter(Alphanumeric) + .take(30) + .map(char::from) + .collect::<String>(); + format!(r"\\.\pipe\{}", name) + } + + #[test] + fn anon() { + let (mut read, mut write) = t!(anonymous(256)); + assert_eq!(t!(write.write(&[1, 2, 3])), 3); + let mut b = [0; 10]; + assert_eq!(t!(read.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + } + + #[test] + fn named_not_first() { + let name = name(); + let _a = t!(NamedPipe::new(&name)); + assert!(NamedPipe::new(&name).is_err()); + + t!(NamedPipeBuilder::new(&name).first(false).create()); + } + + #[test] + fn named_connect() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + t!(File::open(name)); + }); + + t!(a.connect()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_wait() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let (tx, rx) = channel(); + let t = thread::spawn(move || { + t!(NamedPipe::wait(&name, None)); + t!(File::open(&name)); + assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err()); + t!(tx.send(())); + }); + + t!(a.connect()); + t!(rx.recv()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_connect_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + t!(File::open(name)); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(2, &a)); + + let over = Overlapped::zero(); + unsafe { + t!(a.connect_overlapped(over.raw())); + } + + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 0); + assert_eq!(status.token(), 2); + assert_eq!(status.overlapped(), over.raw()); + t!(t.join()); + } + + #[test] + fn named_read_write() { + let name = name(); + let mut a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(OpenOptions::new().read(true).write(true).open(name)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + t!(a.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a.write_all(&[1, 2, 3])); + t!(a.flush()); + t!(a.disconnect()); + t!(t.join()); + } + + #[test] + fn named_read_write_multi() { + for _ in 0..5 { + named_read_write() + } + } + + #[test] + fn named_read_write_multi_same_thread() { + let name1 = name(); + let mut a1 = t!(NamedPipe::new(&name1)); + let name2 = name(); + let mut a2 = t!(NamedPipe::new(&name2)); + + let t = thread::spawn(move || { + let mut f = t!(OpenOptions::new().read(true).write(true).open(name1)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + + let mut f = t!(OpenOptions::new().read(true).write(true).open(name2)); + t!(f.write_all(&[1, 2, 3])); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + }); + + t!(a1.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a1.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a1.write_all(&[1, 2, 3])); + t!(a1.flush()); + t!(a1.disconnect()); + + t!(a2.connect()); + let mut b = [0; 10]; + assert_eq!(t!(a2.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]); + t!(a2.write_all(&[1, 2, 3])); + t!(a2.flush()); + t!(a2.disconnect()); + + t!(t.join()); + } + + #[test] + fn named_read_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(File::create(name)); + t!(f.write_all(&[1, 2, 3])); + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(3, &a)); + t!(a.connect()); + + let mut b = [0; 10]; + let over = Overlapped::zero(); + unsafe { + t!(a.read_overlapped(&mut b, over.raw())); + } + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 3); + assert_eq!(status.overlapped(), over.raw()); + assert_eq!(&b[..3], &[1, 2, 3]); + + t!(t.join()); + } + + #[test] + fn named_write_overlapped() { + let name = name(); + let a = t!(NamedPipe::new(&name)); + + let t = thread::spawn(move || { + let mut f = t!(super::connect(name)); + let mut b = [0; 10]; + assert_eq!(t!(f.read(&mut b)), 3); + assert_eq!(&b[..3], &[1, 2, 3]) + }); + + let cp = t!(CompletionPort::new(1)); + t!(cp.add_handle(3, &a)); + t!(a.connect()); + + let over = Overlapped::zero(); + unsafe { + t!(a.write_overlapped(&[1, 2, 3], over.raw())); + } + + let status = t!(cp.get(None)); + assert_eq!(status.bytes_transferred(), 3); + assert_eq!(status.token(), 3); + assert_eq!(status.overlapped(), over.raw()); + + t!(t.join()); + } +} |