use std::fmt; use std::io::{self, Read, ErrorKind}; use std::mem; use std::net::{self, SocketAddr, Shutdown}; use std::os::windows::prelude::*; use std::sync::{Mutex, MutexGuard}; use std::time::Duration; use miow::iocp::CompletionStatus; use miow::net::*; use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt}; use winapi::um::minwinbase::OVERLAPPED_ENTRY; use winapi::um::winnt::HANDLE; use iovec::IoVec; use {poll, Ready, Poll, PollOpt, Token}; use event::Evented; use sys::windows::from_raw_arc::FromRawArc; use sys::windows::selector::{Overlapped, ReadyBinding}; use sys::windows::Family; pub struct TcpStream { /// Separately stored implementation to ensure that the `Drop` /// implementation on this type is only executed when it's actually dropped /// (many clones of this `imp` are made). imp: StreamImp, registration: Mutex>, } pub struct TcpListener { imp: ListenerImp, registration: Mutex>, } #[derive(Clone)] struct StreamImp { /// A stable address and synchronized access for all internals. This serves /// to ensure that all `Overlapped` pointers are valid for a long period of /// time as well as allowing completion callbacks to have access to the /// internals without having ownership. /// /// Note that the reference count also allows us "loan out" copies to /// completion ports while I/O is running to guarantee that this stays alive /// until the I/O completes. You'll notice a number of calls to /// `mem::forget` below, and these only happen on successful scheduling of /// I/O and are paired with `overlapped2arc!` macro invocations in the /// completion callbacks (to have a decrement match the increment). inner: FromRawArc, } #[derive(Clone)] struct ListenerImp { inner: FromRawArc, } struct StreamIo { inner: Mutex, read: Overlapped, // also used for connect write: Overlapped, socket: net::TcpStream, } struct ListenerIo { inner: Mutex, accept: Overlapped, family: Family, socket: net::TcpListener, } struct StreamInner { iocp: ReadyBinding, deferred_connect: Option, read: State<(), ()>, write: State<(Vec, usize), (Vec, usize)>, /// whether we are instantly notified of success /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS, /// without a roundtrip through the event loop) instant_notify: bool, } struct ListenerInner { iocp: ReadyBinding, accept: State, accept_buf: AcceptAddrsBuf, } enum State { Empty, // no I/O operation in progress Pending(T), // an I/O operation is in progress Ready(U), // I/O has finished with this value Error(io::Error), // there was an I/O error } impl TcpStream { fn new(socket: net::TcpStream, deferred_connect: Option) -> TcpStream { TcpStream { registration: Mutex::new(None), imp: StreamImp { inner: FromRawArc::new(StreamIo { read: Overlapped::new(read_done), write: Overlapped::new(write_done), socket: socket, inner: Mutex::new(StreamInner { iocp: ReadyBinding::new(), deferred_connect: deferred_connect, read: State::Empty, write: State::Empty, instant_notify: false, }), }), }, } } pub fn connect(socket: net::TcpStream, addr: &SocketAddr) -> io::Result { socket.set_nonblocking(true)?; Ok(TcpStream::new(socket, Some(*addr))) } pub fn from_stream(stream: net::TcpStream) -> TcpStream { TcpStream::new(stream, None) } pub fn peer_addr(&self) -> io::Result { self.imp.inner.socket.peer_addr() } pub fn local_addr(&self) -> io::Result { self.imp.inner.socket.local_addr() } pub fn try_clone(&self) -> io::Result { self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None)) } pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.imp.inner.socket.shutdown(how) } pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { self.imp.inner.socket.set_nodelay(nodelay) } pub fn nodelay(&self) -> io::Result { self.imp.inner.socket.nodelay() } pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { self.imp.inner.socket.set_recv_buffer_size(size) } pub fn recv_buffer_size(&self) -> io::Result { self.imp.inner.socket.recv_buffer_size() } pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { self.imp.inner.socket.set_send_buffer_size(size) } pub fn send_buffer_size(&self) -> io::Result { self.imp.inner.socket.send_buffer_size() } pub fn set_keepalive(&self, keepalive: Option) -> io::Result<()> { self.imp.inner.socket.set_keepalive(keepalive) } pub fn keepalive(&self) -> io::Result> { self.imp.inner.socket.keepalive() } pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.imp.inner.socket.set_ttl(ttl) } pub fn ttl(&self) -> io::Result { self.imp.inner.socket.ttl() } pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { self.imp.inner.socket.set_only_v6(only_v6) } pub fn only_v6(&self) -> io::Result { self.imp.inner.socket.only_v6() } pub fn set_linger(&self, dur: Option) -> io::Result<()> { Net2TcpExt::set_linger(&self.imp.inner.socket, dur) } pub fn linger(&self) -> io::Result> { Net2TcpExt::linger(&self.imp.inner.socket) } pub fn take_error(&self) -> io::Result> { if let Some(e) = self.imp.inner.socket.take_error()? { return Ok(Some(e)) } // If the syscall didn't return anything then also check to see if we've // squirreled away an error elsewhere for example as part of a connect // operation. // // Typically this is used like so: // // 1. A `connect` is issued // 2. Wait for the socket to be writable // 3. Call `take_error` to see if the connect succeeded. // // Right now the `connect` operation finishes in `read_done` below and // fill will in `State::Error` in the `read` slot if it fails, so we // extract that here. let mut me = self.inner(); match mem::replace(&mut me.read, State::Empty) { State::Error(e) => { self.imp.schedule_read(&mut me); Ok(Some(e)) } other => { me.read = other; Ok(None) } } } fn inner(&self) -> MutexGuard { self.imp.inner() } fn before_read(&self) -> io::Result> { let mut me = self.inner(); match me.read { // Empty == we're not associated yet, and if we're pending then // these are both cases where we return "would block" State::Empty | State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()), // If we got a delayed error as part of a `read_overlapped` below, // return that here. Also schedule another read in case it was // transient. State::Error(_) => { let e = match mem::replace(&mut me.read, State::Empty) { State::Error(e) => e, _ => panic!(), }; self.imp.schedule_read(&mut me); return Err(e) } // If we're ready for a read then some previous 0-byte read has // completed. In that case the OS's socket buffer has something for // us, so we just keep pulling out bytes while we can in the loop // below. State::Ready(()) => {} } Ok(me) } fn post_register(&self, interest: Ready, me: &mut StreamInner) { if interest.is_readable() { self.imp.schedule_read(me); } // At least with epoll, if a socket is registered with an interest in // writing and it's immediately writable then a writable event is // generated immediately, so do so here. if interest.is_writable() { if let State::Empty = me.write { self.imp.add_readiness(me, Ready::writable()); } } } pub fn read(&self, buf: &mut [u8]) -> io::Result { match IoVec::from_bytes_mut(buf) { Some(vec) => self.readv(&mut [vec]), None => Ok(0), } } pub fn peek(&self, buf: &mut [u8]) -> io::Result { let mut me = self.before_read()?; match (&self.imp.inner.socket).peek(buf) { Ok(n) => Ok(n), Err(e) => { me.read = State::Empty; self.imp.schedule_read(&mut me); Err(e) } } } pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result { let mut me = self.before_read()?; // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to // call that instead of looping over all the buffers and calling // `recv` on each buffer. I'm not sure though if an overlapped // socket in nonblocking mode would work with that use case, // however, so for now we just call `recv`. let mut amt = 0; for buf in bufs { match (&self.imp.inner.socket).read(buf) { // If we did a partial read, then return what we've read so far Ok(n) if n < buf.len() => return Ok(amt + n), // Otherwise filled this buffer entirely, so try to fill the // next one as well. Ok(n) => amt += n, // If we hit an error then things get tricky if we've already // read some data. If the error is "would block" then we just // return the data we've read so far while scheduling another // 0-byte read. // // If we've read data and the error kind is not "would block", // then we stash away the error to get returned later and return // the data that we've read. // // Finally if we haven't actually read any data we just // reschedule a 0-byte read to happen again and then return the // error upwards. Err(e) => { if amt > 0 && e.kind() == io::ErrorKind::WouldBlock { me.read = State::Empty; self.imp.schedule_read(&mut me); return Ok(amt) } else if amt > 0 { me.read = State::Error(e); return Ok(amt) } else { me.read = State::Empty; self.imp.schedule_read(&mut me); return Err(e) } } } } Ok(amt) } pub fn write(&self, buf: &[u8]) -> io::Result { match IoVec::from_bytes(buf) { Some(vec) => self.writev(&[vec]), None => Ok(0), } } pub fn writev(&self, bufs: &[&IoVec]) -> io::Result { let mut me = self.inner(); let me = &mut *me; match mem::replace(&mut me.write, State::Empty) { State::Empty => {} State::Error(e) => return Err(e), other => { me.write = other; return Err(io::ErrorKind::WouldBlock.into()) } } if !me.iocp.registered() { return Err(io::ErrorKind::WouldBlock.into()) } if bufs.is_empty() { return Ok(0) } let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b); let mut intermediate = me.iocp.get_buffer(len); for buf in bufs { intermediate.extend_from_slice(buf); } self.imp.schedule_write(intermediate, 0, me); Ok(len) } pub fn flush(&self) -> io::Result<()> { Ok(()) } } impl StreamImp { fn inner(&self) -> MutexGuard { self.inner.inner.lock().unwrap() } fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> { unsafe { trace!("scheduling a connect"); self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?; } // see docs above on StreamImp.inner for rationale on forget mem::forget(self.clone()); Ok(()) } /// Schedule a read to happen on this socket, enqueuing us to receive a /// notification when a read is ready. /// /// Note that this does *not* work with a buffer. When reading a TCP stream /// we actually read into a 0-byte buffer so Windows will send us a /// notification when the socket is otherwise ready for reading. This allows /// us to avoid buffer allocations for in-flight reads. fn schedule_read(&self, me: &mut StreamInner) { match me.read { State::Empty => {} State::Ready(_) | State::Error(_) => { self.add_readiness(me, Ready::readable()); return; } _ => return, } me.iocp.set_readiness(me.iocp.readiness() - Ready::readable()); trace!("scheduling a read"); let res = unsafe { self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr()) }; match res { // Note that `Ok(true)` means that this completed immediately and // our socket is readable. This typically means that the caller of // this function (likely `read` above) can try again as an // optimization and return bytes quickly. // // Normally, though, although the read completed immediately // there's still an IOCP completion packet enqueued that we're going // to receive. // // You can configure this behavior (miow) with // SetFileCompletionNotificationModes to indicate that `Ok(true)` // does **not** enqueue a completion packet. (This is the case // for me.instant_notify) // // Note that apparently libuv has scary code to work around bugs in // `WSARecv` for UDP sockets apparently for handles which have had // the `SetFileCompletionNotificationModes` function called on them, // worth looking into! Ok(Some(_)) if me.instant_notify => { me.read = State::Ready(()); self.add_readiness(me, Ready::readable()); } Ok(_) => { // see docs above on StreamImp.inner for rationale on forget me.read = State::Pending(()); mem::forget(self.clone()); } Err(e) => { me.read = State::Error(e); self.add_readiness(me, Ready::readable()); } } } /// Similar to `schedule_read`, except that this issues, well, writes. /// /// This function will continually attempt to write the entire contents of /// the buffer `buf` until they have all been written. The `pos` argument is /// the current offset within the buffer up to which the contents have /// already been written. /// /// A new writable event (e.g. allowing another write) will only happen once /// the buffer has been written completely (or hit an error). fn schedule_write(&self, buf: Vec, mut pos: usize, me: &mut StreamInner) { // About to write, clear any pending level triggered events me.iocp.set_readiness(me.iocp.readiness() - Ready::writable()); loop { trace!("scheduling a write of {} bytes", buf[pos..].len()); let ret = unsafe { self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr()) }; match ret { Ok(Some(transferred_bytes)) if me.instant_notify => { trace!("done immediately with {} bytes", transferred_bytes); if transferred_bytes == buf.len() - pos { self.add_readiness(me, Ready::writable()); me.write = State::Empty; break; } pos += transferred_bytes; } Ok(_) => { trace!("scheduled for later"); // see docs above on StreamImp.inner for rationale on forget me.write = State::Pending((buf, pos)); mem::forget(self.clone()); break; } Err(e) => { trace!("write error: {}", e); me.write = State::Error(e); self.add_readiness(me, Ready::writable()); me.iocp.put_buffer(buf); break; } } } } /// Pushes an event for this socket onto the selector its registered for. /// /// When an event is generated on this socket, if it happened after the /// socket was closed then we don't want to actually push the event onto our /// selector as otherwise it's just a spurious notification. fn add_readiness(&self, me: &mut StreamInner, set: Ready) { me.iocp.set_readiness(set | me.iocp.readiness()); } } fn read_done(status: &OVERLAPPED_ENTRY) { let status = CompletionStatus::from_entry(status); let me2 = StreamImp { inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) }, }; let mut me = me2.inner(); match mem::replace(&mut me.read, State::Empty) { State::Pending(()) => { trace!("finished a read: {}", status.bytes_transferred()); assert_eq!(status.bytes_transferred(), 0); me.read = State::Ready(()); return me2.add_readiness(&mut me, Ready::readable()) } s => me.read = s, } // If a read didn't complete, then the connect must have just finished. trace!("finished a connect"); // By guarding with socket.result(), we ensure that a connection // was successfully made before performing operations requiring a // connected socket. match unsafe { me2.inner.socket.result(status.overlapped()) } .and_then(|_| me2.inner.socket.connect_complete()) { Ok(()) => { me2.add_readiness(&mut me, Ready::writable()); me2.schedule_read(&mut me); } Err(e) => { me2.add_readiness(&mut me, Ready::readable() | Ready::writable()); me.read = State::Error(e); } } } fn write_done(status: &OVERLAPPED_ENTRY) { let status = CompletionStatus::from_entry(status); trace!("finished a write {}", status.bytes_transferred()); let me2 = StreamImp { inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) }, }; let mut me = me2.inner(); let (buf, pos) = match mem::replace(&mut me.write, State::Empty) { State::Pending(pair) => pair, _ => unreachable!(), }; let new_pos = pos + (status.bytes_transferred() as usize); if new_pos == buf.len() { me2.add_readiness(&mut me, Ready::writable()); } else { me2.schedule_write(buf, new_pos, &mut me); } } impl Evented for TcpStream { fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { let mut me = self.inner(); me.iocp.register_socket(&self.imp.inner.socket, poll, token, interest, opts, &self.registration)?; unsafe { super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?; me.instant_notify = true; } // If we were connected before being registered process that request // here and go along our merry ways. Note that the callback for a // successful connect will worry about generating writable/readable // events and scheduling a new read. if let Some(addr) = me.deferred_connect.take() { return self.imp.schedule_connect(&addr).map(|_| ()) } self.post_register(interest, &mut me); Ok(()) } fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { let mut me = self.inner(); me.iocp.reregister_socket(&self.imp.inner.socket, poll, token, interest, opts, &self.registration)?; self.post_register(interest, &mut me); Ok(()) } fn deregister(&self, poll: &Poll) -> io::Result<()> { self.inner().iocp.deregister(&self.imp.inner.socket, poll, &self.registration) } } impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("TcpStream") .finish() } } impl Drop for TcpStream { fn drop(&mut self) { // If we're still internally reading, we're no longer interested. Note // though that we don't cancel any writes which may have been issued to // preserve the same semantics as Unix. // // Note that "Empty" here may mean that a connect is pending, so we // cancel even if that happens as well. unsafe { match self.inner().read { State::Pending(_) | State::Empty => { trace!("cancelling active TCP read"); drop(super::cancel(&self.imp.inner.socket, &self.imp.inner.read)); } State::Ready(_) | State::Error(_) => {} } } } } impl TcpListener { pub fn new(socket: net::TcpListener) -> io::Result { let addr = socket.local_addr()?; Ok(TcpListener::new_family(socket, match addr { SocketAddr::V4(..) => Family::V4, SocketAddr::V6(..) => Family::V6, })) } fn new_family(socket: net::TcpListener, family: Family) -> TcpListener { TcpListener { registration: Mutex::new(None), imp: ListenerImp { inner: FromRawArc::new(ListenerIo { accept: Overlapped::new(accept_done), family: family, socket: socket, inner: Mutex::new(ListenerInner { iocp: ReadyBinding::new(), accept: State::Empty, accept_buf: AcceptAddrsBuf::new(), }), }), }, } } pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> { let mut me = self.inner(); let ret = match mem::replace(&mut me.accept, State::Empty) { State::Empty => return Err(io::ErrorKind::WouldBlock.into()), State::Pending(t) => { me.accept = State::Pending(t); return Err(io::ErrorKind::WouldBlock.into()); } State::Ready((s, a)) => Ok((s, a)), State::Error(e) => Err(e), }; self.imp.schedule_accept(&mut me); return ret } pub fn local_addr(&self) -> io::Result { self.imp.inner.socket.local_addr() } pub fn try_clone(&self) -> io::Result { self.imp.inner.socket.try_clone().map(|s| { TcpListener::new_family(s, self.imp.inner.family) }) } #[allow(deprecated)] pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { self.imp.inner.socket.set_only_v6(only_v6) } #[allow(deprecated)] pub fn only_v6(&self) -> io::Result { self.imp.inner.socket.only_v6() } pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.imp.inner.socket.set_ttl(ttl) } pub fn ttl(&self) -> io::Result { self.imp.inner.socket.ttl() } pub fn take_error(&self) -> io::Result> { self.imp.inner.socket.take_error() } fn inner(&self) -> MutexGuard { self.imp.inner() } } impl ListenerImp { fn inner(&self) -> MutexGuard { self.inner.inner.lock().unwrap() } fn schedule_accept(&self, me: &mut ListenerInner) { match me.accept { State::Empty => {} _ => return } me.iocp.set_readiness(me.iocp.readiness() - Ready::readable()); let res = match self.inner.family { Family::V4 => TcpBuilder::new_v4(), Family::V6 => TcpBuilder::new_v6(), } .and_then(|builder| builder.to_tcp_stream()) .and_then(|stream| unsafe { trace!("scheduling an accept"); self.inner .socket .accept_overlapped(&stream, &mut me.accept_buf, self.inner.accept.as_mut_ptr()) .map(|x| (stream, x)) }); match res { Ok((socket, _)) => { // see docs above on StreamImp.inner for rationale on forget me.accept = State::Pending(socket); mem::forget(self.clone()); } Err(e) => { me.accept = State::Error(e); self.add_readiness(me, Ready::readable()); } } } // See comments in StreamImp::push fn add_readiness(&self, me: &mut ListenerInner, set: Ready) { me.iocp.set_readiness(set | me.iocp.readiness()); } } fn accept_done(status: &OVERLAPPED_ENTRY) { let status = CompletionStatus::from_entry(status); let me2 = ListenerImp { inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) }, }; let mut me = me2.inner(); let socket = match mem::replace(&mut me.accept, State::Empty) { State::Pending(s) => s, _ => unreachable!(), }; trace!("finished an accept"); let result = me2.inner.socket.accept_complete(&socket).and_then(|()| { me.accept_buf.parse(&me2.inner.socket) }).and_then(|buf| { buf.remote().ok_or_else(|| { io::Error::new(ErrorKind::Other, "could not obtain remote address") }) }); me.accept = match result { Ok(remote_addr) => State::Ready((socket, remote_addr)), Err(e) => State::Error(e), }; me2.add_readiness(&mut me, Ready::readable()); } impl Evented for TcpListener { fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { let mut me = self.inner(); me.iocp.register_socket(&self.imp.inner.socket, poll, token, interest, opts, &self.registration)?; unsafe { super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?; } self.imp.schedule_accept(&mut me); Ok(()) } fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { let mut me = self.inner(); me.iocp.reregister_socket(&self.imp.inner.socket, poll, token, interest, opts, &self.registration)?; self.imp.schedule_accept(&mut me); Ok(()) } fn deregister(&self, poll: &Poll) -> io::Result<()> { self.inner().iocp.deregister(&self.imp.inner.socket, poll, &self.registration) } } impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("TcpListener") .finish() } } impl Drop for TcpListener { fn drop(&mut self) { // If we're still internally reading, we're no longer interested. unsafe { match self.inner().accept { State::Pending(_) => { trace!("cancelling active TCP accept"); drop(super::cancel(&self.imp.inner.socket, &self.imp.inner.accept)); } State::Empty | State::Ready(_) | State::Error(_) => {} } } } }