//! Interprocess Communication pipes //! //! A pipe is a section of shared memory that processes use for communication. //! The process that creates a pipe is the _pipe server_. A process that connects //! to a pipe is a _pipe client_. One process writes information to the pipe, then //! the other process reads the information from the pipe. This overview //! describes how to create, manage, and use pipes. //! //! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and //! [named pipes](#fn.named.html). Anonymous pipes require less overhead than //! named pipes, but offer limited services. //! //! # Anonymous pipes //! //! An anonymous pipe is an unnamed, one-way pipe that typically transfers data //! between a parent process and a child process. Anonymous pipes are always //! local; they cannot be used for communication over a network. //! //! # Named pipes //! //! A *named pipe* is a named, one-way or duplex pipe for communication between //! the pipe server and one or more pipe clients. All instances of a named pipe //! share the same pipe name, but each instance has its own buffers and handles, //! and provides a separate conduit for client/server communication. The use of //! instances enables multiple pipe clients to use the same named pipe //! simultaneously. //! //! Any process can access named pipes, subject to security checks, making named //! pipes an easy form of communication between related or unrelated processes. //! //! Any process can act as both a server and a client, making peer-to-peer //! communication possible. As used here, the term pipe server refers to a //! process that creates a named pipe, and the term pipe client refers to a //! process that connects to an instance of a named pipe. //! //! Named pipes can be used to provide communication between processes on the //! same computer or between processes on different computers across a network. //! If the server service is running, all named pipes are accessible remotely. If //! you intend to use a named pipe locally only, deny access to NT //! AUTHORITY\\NETWORK or switch to local RPC. //! //! # References //! //! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md) use std::cell::RefCell; use std::ffi::OsStr; use std::fs::{File, OpenOptions}; use std::io; use std::io::prelude::*; use std::os::windows::ffi::*; use std::os::windows::io::*; use std::time::Duration; use crate::handle::Handle; use crate::overlapped::Overlapped; use winapi::shared::minwindef::*; use winapi::shared::ntdef::HANDLE; use winapi::shared::winerror::*; use winapi::um::fileapi::*; use winapi::um::handleapi::*; use winapi::um::ioapiset::*; use winapi::um::minwinbase::*; use winapi::um::namedpipeapi::*; use winapi::um::winbase::*; /// Readable half of an anonymous pipe. #[derive(Debug)] pub struct AnonRead(Handle); /// Writable half of an anonymous pipe. #[derive(Debug)] pub struct AnonWrite(Handle); /// A named pipe that can accept connections. #[derive(Debug)] pub struct NamedPipe(Handle); /// A builder structure for creating a new named pipe. #[derive(Debug)] pub struct NamedPipeBuilder { name: Vec, dwOpenMode: DWORD, dwPipeMode: DWORD, nMaxInstances: DWORD, nOutBufferSize: DWORD, nInBufferSize: DWORD, nDefaultTimeOut: DWORD, } /// Creates a new anonymous in-memory pipe, returning the read/write ends of the /// pipe. /// /// The buffer size for this pipe may also be specified, but the system will /// normally use this as a suggestion and it's not guaranteed that the buffer /// will be precisely this size. pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> { let mut read = 0 as HANDLE; let mut write = 0 as HANDLE; crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?; Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write)))) } impl Read for AnonRead { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } impl<'a> Read for &'a AnonRead { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } impl AsRawHandle for AnonRead { fn as_raw_handle(&self) -> HANDLE { self.0.raw() } } impl FromRawHandle for AnonRead { unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead { AnonRead(Handle::new(handle)) } } impl IntoRawHandle for AnonRead { fn into_raw_handle(self) -> HANDLE { self.0.into_raw() } } impl Write for AnonWrite { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl<'a> Write for &'a AnonWrite { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl AsRawHandle for AnonWrite { fn as_raw_handle(&self) -> HANDLE { self.0.raw() } } impl FromRawHandle for AnonWrite { unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite { AnonWrite(Handle::new(handle)) } } impl IntoRawHandle for AnonWrite { fn into_raw_handle(self) -> HANDLE { self.0.into_raw() } } /// A convenience function to connect to a named pipe. /// /// This function will block the calling process until it can connect to the /// pipe server specified by `addr`. This will use `NamedPipe::wait` internally /// to block until it can connect. pub fn connect>(addr: A) -> io::Result { _connect(addr.as_ref()) } fn _connect(addr: &OsStr) -> io::Result { let mut r = OpenOptions::new(); let mut w = OpenOptions::new(); let mut rw = OpenOptions::new(); r.read(true); w.write(true); rw.read(true).write(true); loop { let res = rw .open(addr) .or_else(|_| r.open(addr)) .or_else(|_| w.open(addr)); match res { Ok(f) => return Ok(f), Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {} Err(e) => return Err(e), } NamedPipe::wait(addr, Some(Duration::new(20, 0)))?; } } impl NamedPipe { /// Creates a new initial named pipe. /// /// This function is equivalent to: /// /// ``` /// use miow::pipe::NamedPipeBuilder; /// /// # let addr = "foo"; /// NamedPipeBuilder::new(addr) /// .first(true) /// .inbound(true) /// .outbound(true) /// .out_buffer_size(65536) /// .in_buffer_size(65536) /// .create(); /// ``` pub fn new>(addr: A) -> io::Result { NamedPipeBuilder::new(addr).create() } /// Waits until either a time-out interval elapses or an instance of the /// specified named pipe is available for connection. /// /// If this function succeeds the process can create a `File` to connect to /// the named pipe. pub fn wait>(addr: A, timeout: Option) -> io::Result<()> { NamedPipe::_wait(addr.as_ref(), timeout) } fn _wait(addr: &OsStr, timeout: Option) -> io::Result<()> { let addr = addr.encode_wide().chain(Some(0)).collect::>(); let timeout = crate::dur2ms(timeout); crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr(), timeout) }).map(|_| ()) } /// Connects this named pipe to a client, blocking until one becomes /// available. /// /// This function will call the `ConnectNamedPipe` function to await for a /// client to connect. This can be called immediately after the pipe is /// created, or after it has been disconnected from a previous client. pub fn connect(&self) -> io::Result<()> { match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) { Ok(_) => Ok(()), Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()), Err(e) => Err(e), } } /// Issue a connection request with the specified overlapped operation. /// /// This function will issue a request to connect a client to this server, /// returning immediately after starting the overlapped operation. /// /// If this function immediately succeeds then `Ok(true)` is returned. If /// the overlapped operation is enqueued and pending, then `Ok(false)` is /// returned. Otherwise an error is returned indicating what went wrong. /// /// # Unsafety /// /// This function is unsafe because the kernel requires that the /// `overlapped` pointer is valid until the end of the I/O operation. The /// kernel also requires that `overlapped` is unique for this I/O operation /// and is not in use for any other I/O. /// /// To safely use this function callers must ensure that this pointer is /// valid until the I/O operation is completed, typically via completion /// ports and waiting to receive the completion notification on the port. pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result { match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) { Ok(_) => Ok(true), Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true), Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false), Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true), Err(e) => Err(e), } } /// Disconnects this named pipe from any connected client. pub fn disconnect(&self) -> io::Result<()> { crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ()) } /// Issues an overlapped read operation to occur on this pipe. /// /// This function will issue an asynchronous read to occur in an overlapped /// fashion, returning immediately. The `buf` provided will be filled in /// with data and the request is tracked by the `overlapped` function /// provided. /// /// If the operation succeeds immediately, `Ok(Some(n))` is returned where /// `n` is the number of bytes read. If an asynchronous operation is /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred /// it is returned. /// /// When this operation completes (or if it completes immediately), another /// mechanism must be used to learn how many bytes were transferred (such as /// looking at the filed in the IOCP status message). /// /// # Unsafety /// /// This function is unsafe because the kernel requires that the `buf` and /// `overlapped` pointers to be valid until the end of the I/O operation. /// The kernel also requires that `overlapped` is unique for this I/O /// operation and is not in use for any other I/O. /// /// To safely use this function callers must ensure that the pointers are /// valid until the I/O operation is completed, typically via completion /// ports and waiting to receive the completion notification on the port. pub unsafe fn read_overlapped( &self, buf: &mut [u8], overlapped: *mut OVERLAPPED, ) -> io::Result> { self.0.read_overlapped(buf, overlapped) } /// Issues an overlapped write operation to occur on this pipe. /// /// This function will issue an asynchronous write to occur in an overlapped /// fashion, returning immediately. The `buf` provided will be filled in /// with data and the request is tracked by the `overlapped` function /// provided. /// /// If the operation succeeds immediately, `Ok(Some(n))` is returned where /// `n` is the number of bytes written. If an asynchronous operation is /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred /// it is returned. /// /// When this operation completes (or if it completes immediately), another /// mechanism must be used to learn how many bytes were transferred (such as /// looking at the filed in the IOCP status message). /// /// # Unsafety /// /// This function is unsafe because the kernel requires that the `buf` and /// `overlapped` pointers to be valid until the end of the I/O operation. /// The kernel also requires that `overlapped` is unique for this I/O /// operation and is not in use for any other I/O. /// /// To safely use this function callers must ensure that the pointers are /// valid until the I/O operation is completed, typically via completion /// ports and waiting to receive the completion notification on the port. pub unsafe fn write_overlapped( &self, buf: &[u8], overlapped: *mut OVERLAPPED, ) -> io::Result> { self.0.write_overlapped(buf, overlapped) } /// Calls the `GetOverlappedResult` function to get the result of an /// overlapped operation for this handle. /// /// This function takes the `OVERLAPPED` argument which must have been used /// to initiate an overlapped I/O operation, and returns either the /// successful number of bytes transferred during the operation or an error /// if one occurred. /// /// # Unsafety /// /// This function is unsafe as `overlapped` must have previously been used /// to execute an operation for this handle, and it must also be a valid /// pointer to an `Overlapped` instance. /// /// # Panics /// /// This function will panic pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result { let mut transferred = 0; let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE); if r == 0 { Err(io::Error::last_os_error()) } else { Ok(transferred as usize) } } } thread_local! { static NAMED_PIPE_OVERLAPPED: RefCell> = RefCell::new(None); } /// Call a function with a threadlocal `Overlapped`. The function `f` should be /// sure that the event is reset, either manually or by a thread being released. fn with_threadlocal_overlapped(f: F) -> io::Result where F: FnOnce(&Overlapped) -> io::Result, { NAMED_PIPE_OVERLAPPED.with(|overlapped| { let mut mborrow = overlapped.borrow_mut(); if let None = *mborrow { let op = Overlapped::initialize_with_autoreset_event()?; *mborrow = Some(op); } f(mborrow.as_ref().unwrap()) }) } impl Read for NamedPipe { fn read(&mut self, buf: &mut [u8]) -> io::Result { // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. with_threadlocal_overlapped(|overlapped| unsafe { self.0 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) }) } } impl<'a> Read for &'a NamedPipe { fn read(&mut self, buf: &mut [u8]) -> io::Result { // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. with_threadlocal_overlapped(|overlapped| unsafe { self.0 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) }) } } impl Write for NamedPipe { fn write(&mut self, buf: &[u8]) -> io::Result { // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. with_threadlocal_overlapped(|overlapped| unsafe { self.0 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) }) } fn flush(&mut self) -> io::Result<()> { <&NamedPipe as Write>::flush(&mut &*self) } } impl<'a> Write for &'a NamedPipe { fn write(&mut self, buf: &[u8]) -> io::Result { // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`. with_threadlocal_overlapped(|overlapped| unsafe { self.0 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED) }) } fn flush(&mut self) -> io::Result<()> { crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ()) } } impl AsRawHandle for NamedPipe { fn as_raw_handle(&self) -> HANDLE { self.0.raw() } } impl FromRawHandle for NamedPipe { unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe { NamedPipe(Handle::new(handle)) } } impl IntoRawHandle for NamedPipe { fn into_raw_handle(self) -> HANDLE { self.0.into_raw() } } fn flag(slot: &mut DWORD, on: bool, val: DWORD) { if on { *slot |= val; } else { *slot &= !val; } } impl NamedPipeBuilder { /// Creates a new named pipe builder with the default settings. pub fn new>(addr: A) -> NamedPipeBuilder { NamedPipeBuilder { name: addr.as_ref().encode_wide().chain(Some(0)).collect(), dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, dwPipeMode: PIPE_TYPE_BYTE, nMaxInstances: PIPE_UNLIMITED_INSTANCES, nOutBufferSize: 65536, nInBufferSize: 65536, nDefaultTimeOut: 0, } } /// Indicates whether data is allowed to flow from the client to the server. pub fn inbound(&mut self, allowed: bool) -> &mut Self { flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND); self } /// Indicates whether data is allowed to flow from the server to the client. pub fn outbound(&mut self, allowed: bool) -> &mut Self { flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND); self } /// Indicates that this pipe must be the first instance. /// /// If set to true, then creation will fail if there's already an instance /// elsewhere. pub fn first(&mut self, first: bool) -> &mut Self { flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE); self } /// Indicates whether this server can accept remote clients or not. pub fn accept_remote(&mut self, accept: bool) -> &mut Self { flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS); self } /// Specifies the maximum number of instances of the server pipe that are /// allowed. /// /// The first instance of a pipe can specify this value. A value of 255 /// indicates that there is no limit to the number of instances. pub fn max_instances(&mut self, instances: u8) -> &mut Self { self.nMaxInstances = instances as DWORD; self } /// Specifies the number of bytes to reserver for the output buffer pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self { self.nOutBufferSize = buffer as DWORD; self } /// Specifies the number of bytes to reserver for the input buffer pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self { self.nInBufferSize = buffer as DWORD; self } /// Using the options in this builder, attempt to create a new named pipe. /// /// This function will call the `CreateNamedPipe` function and return the /// result. pub fn create(&mut self) -> io::Result { unsafe { self.with_security_attributes(::std::ptr::null_mut()) } } /// Using the options in the builder and the provided security attributes, attempt to create a /// new named pipe. This function has to be called with a valid pointer to a /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a /// null pointer. /// /// This function will call the `CreateNamedPipe` function and return the /// result. pub unsafe fn with_security_attributes( &mut self, attrs: *mut SECURITY_ATTRIBUTES, ) -> io::Result { let h = CreateNamedPipeW( self.name.as_ptr(), self.dwOpenMode, self.dwPipeMode, self.nMaxInstances, self.nOutBufferSize, self.nInBufferSize, self.nDefaultTimeOut, attrs, ); if h == INVALID_HANDLE_VALUE { Err(io::Error::last_os_error()) } else { Ok(NamedPipe(Handle::new(h))) } } } #[cfg(test)] mod tests { use std::fs::{File, OpenOptions}; use std::io::prelude::*; use std::sync::mpsc::channel; use std::thread; use std::time::Duration; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use super::{anonymous, NamedPipe, NamedPipeBuilder}; use crate::iocp::CompletionPort; use crate::Overlapped; fn name() -> String { let name = thread_rng() .sample_iter(Alphanumeric) .take(30) .map(char::from) .collect::(); format!(r"\\.\pipe\{}", name) } #[test] fn anon() { let (mut read, mut write) = t!(anonymous(256)); assert_eq!(t!(write.write(&[1, 2, 3])), 3); let mut b = [0; 10]; assert_eq!(t!(read.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); } #[test] fn named_not_first() { let name = name(); let _a = t!(NamedPipe::new(&name)); assert!(NamedPipe::new(&name).is_err()); t!(NamedPipeBuilder::new(&name).first(false).create()); } #[test] fn named_connect() { let name = name(); let a = t!(NamedPipe::new(&name)); let t = thread::spawn(move || { t!(File::open(name)); }); t!(a.connect()); t!(a.disconnect()); t!(t.join()); } #[test] fn named_wait() { let name = name(); let a = t!(NamedPipe::new(&name)); let (tx, rx) = channel(); let t = thread::spawn(move || { t!(NamedPipe::wait(&name, None)); t!(File::open(&name)); assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err()); t!(tx.send(())); }); t!(a.connect()); t!(rx.recv()); t!(a.disconnect()); t!(t.join()); } #[test] fn named_connect_overlapped() { let name = name(); let a = t!(NamedPipe::new(&name)); let t = thread::spawn(move || { t!(File::open(name)); }); let cp = t!(CompletionPort::new(1)); t!(cp.add_handle(2, &a)); let over = Overlapped::zero(); unsafe { t!(a.connect_overlapped(over.raw())); } let status = t!(cp.get(None)); assert_eq!(status.bytes_transferred(), 0); assert_eq!(status.token(), 2); assert_eq!(status.overlapped(), over.raw()); t!(t.join()); } #[test] fn named_read_write() { let name = name(); let mut a = t!(NamedPipe::new(&name)); let t = thread::spawn(move || { let mut f = t!(OpenOptions::new().read(true).write(true).open(name)); t!(f.write_all(&[1, 2, 3])); let mut b = [0; 10]; assert_eq!(t!(f.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); }); t!(a.connect()); let mut b = [0; 10]; assert_eq!(t!(a.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); t!(a.write_all(&[1, 2, 3])); t!(a.flush()); t!(a.disconnect()); t!(t.join()); } #[test] fn named_read_write_multi() { for _ in 0..5 { named_read_write() } } #[test] fn named_read_write_multi_same_thread() { let name1 = name(); let mut a1 = t!(NamedPipe::new(&name1)); let name2 = name(); let mut a2 = t!(NamedPipe::new(&name2)); let t = thread::spawn(move || { let mut f = t!(OpenOptions::new().read(true).write(true).open(name1)); t!(f.write_all(&[1, 2, 3])); let mut b = [0; 10]; assert_eq!(t!(f.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); let mut f = t!(OpenOptions::new().read(true).write(true).open(name2)); t!(f.write_all(&[1, 2, 3])); let mut b = [0; 10]; assert_eq!(t!(f.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); }); t!(a1.connect()); let mut b = [0; 10]; assert_eq!(t!(a1.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); t!(a1.write_all(&[1, 2, 3])); t!(a1.flush()); t!(a1.disconnect()); t!(a2.connect()); let mut b = [0; 10]; assert_eq!(t!(a2.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]); t!(a2.write_all(&[1, 2, 3])); t!(a2.flush()); t!(a2.disconnect()); t!(t.join()); } #[test] fn named_read_overlapped() { let name = name(); let a = t!(NamedPipe::new(&name)); let t = thread::spawn(move || { let mut f = t!(File::create(name)); t!(f.write_all(&[1, 2, 3])); }); let cp = t!(CompletionPort::new(1)); t!(cp.add_handle(3, &a)); t!(a.connect()); let mut b = [0; 10]; let over = Overlapped::zero(); unsafe { t!(a.read_overlapped(&mut b, over.raw())); } let status = t!(cp.get(None)); assert_eq!(status.bytes_transferred(), 3); assert_eq!(status.token(), 3); assert_eq!(status.overlapped(), over.raw()); assert_eq!(&b[..3], &[1, 2, 3]); t!(t.join()); } #[test] fn named_write_overlapped() { let name = name(); let a = t!(NamedPipe::new(&name)); let t = thread::spawn(move || { let mut f = t!(super::connect(name)); let mut b = [0; 10]; assert_eq!(t!(f.read(&mut b)), 3); assert_eq!(&b[..3], &[1, 2, 3]) }); let cp = t!(CompletionPort::new(1)); t!(cp.add_handle(3, &a)); t!(a.connect()); let over = Overlapped::zero(); unsafe { t!(a.write_overlapped(&[1, 2, 3], over.raw())); } let status = t!(cp.get(None)); assert_eq!(status.bytes_transferred(), 3); assert_eq!(status.token(), 3); assert_eq!(status.overlapped(), over.raw()); t!(t.join()); } }