diff options
Diffstat (limited to 'third_party/rust/mio/src/sys/unix')
-rw-r--r-- | third_party/rust/mio/src/sys/unix/awakener.rs | 74 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/dlsym.rs | 47 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/epoll.rs | 260 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/eventedfd.rs | 107 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/io.rs | 107 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/kqueue.rs | 360 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/mod.rs | 95 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/ready.rs | 499 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/tcp.rs | 286 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/udp.rs | 181 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uds.rs | 262 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uio.rs | 44 |
12 files changed, 2322 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/sys/unix/awakener.rs b/third_party/rust/mio/src/sys/unix/awakener.rs new file mode 100644 index 0000000000..9cc367a78c --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/awakener.rs @@ -0,0 +1,74 @@ +pub use self::pipe::Awakener; + +/// Default awakener backed by a pipe +mod pipe { + use sys::unix; + use {io, Ready, Poll, PollOpt, Token}; + use event::Evented; + use std::io::{Read, Write}; + + /* + * + * ===== Awakener ===== + * + */ + + pub struct Awakener { + reader: unix::Io, + writer: unix::Io, + } + + impl Awakener { + pub fn new() -> io::Result<Awakener> { + let (rd, wr) = unix::pipe()?; + + Ok(Awakener { + reader: rd, + writer: wr, + }) + } + + pub fn wakeup(&self) -> io::Result<()> { + match (&self.writer).write(&[1]) { + Ok(_) => Ok(()), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + Ok(()) + } else { + Err(e) + } + } + } + } + + pub fn cleanup(&self) { + let mut buf = [0; 128]; + + loop { + // Consume data until all bytes are purged + match (&self.reader).read(&mut buf) { + Ok(i) if i > 0 => {}, + _ => return, + } + } + } + + fn reader(&self) -> &unix::Io { + &self.reader + } + } + + impl Evented for Awakener { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.reader().register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.reader().reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.reader().deregister(poll) + } + } +} diff --git a/third_party/rust/mio/src/sys/unix/dlsym.rs b/third_party/rust/mio/src/sys/unix/dlsym.rs new file mode 100644 index 0000000000..e88c595fc9 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/dlsym.rs @@ -0,0 +1,47 @@ +use std::marker; +use std::mem; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use libc; + +macro_rules! dlsym { + (fn $name:ident($($t:ty),*) -> $ret:ty) => ( + #[allow(bad_style)] + static $name: ::sys::unix::dlsym::DlSym<unsafe extern fn($($t),*) -> $ret> = + ::sys::unix::dlsym::DlSym { + name: concat!(stringify!($name), "\0"), + addr: ::std::sync::atomic::ATOMIC_USIZE_INIT, + _marker: ::std::marker::PhantomData, + }; + ) +} + +pub struct DlSym<F> { + pub name: &'static str, + pub addr: AtomicUsize, + pub _marker: marker::PhantomData<F>, +} + +impl<F> DlSym<F> { + pub fn get(&self) -> Option<&F> { + assert_eq!(mem::size_of::<F>(), mem::size_of::<usize>()); + unsafe { + if self.addr.load(Ordering::SeqCst) == 0 { + self.addr.store(fetch(self.name), Ordering::SeqCst); + } + if self.addr.load(Ordering::SeqCst) == 1 { + None + } else { + mem::transmute::<&AtomicUsize, Option<&F>>(&self.addr) + } + } + } +} + +unsafe fn fetch(name: &str) -> usize { + assert_eq!(name.as_bytes()[name.len() - 1], 0); + match libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr() as *const _) as usize { + 0 => 1, + n => n, + } +} diff --git a/third_party/rust/mio/src/sys/unix/epoll.rs b/third_party/rust/mio/src/sys/unix/epoll.rs new file mode 100644 index 0000000000..03b0ebd5b3 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/epoll.rs @@ -0,0 +1,260 @@ +#![allow(deprecated)] +use std::os::unix::io::AsRawFd; +use std::os::unix::io::RawFd; +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::time::Duration; +use std::{cmp, i32}; + +use libc::{self, c_int}; +use libc::{EPOLLERR, EPOLLHUP, EPOLLONESHOT}; +use libc::{EPOLLET, EPOLLOUT, EPOLLIN, EPOLLPRI}; + +use {io, Ready, PollOpt, Token}; +use event_imp::Event; +use sys::unix::{cvt, UnixReady}; +use sys::unix::io::set_cloexec; + +/// Each Selector has a globally unique(ish) ID associated with it. This ID +/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first +/// registered with the `Selector`. If a type that is previously associated with +/// a `Selector` attempts to register itself with a different `Selector`, the +/// operation will return with an error. This matches windows behavior. +static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; + +#[derive(Debug)] +pub struct Selector { + id: usize, + epfd: RawFd, +} + +impl Selector { + pub fn new() -> io::Result<Selector> { + let epfd = unsafe { + // Emulate `epoll_create` by using `epoll_create1` if it's available + // and otherwise falling back to `epoll_create` followed by a call to + // set the CLOEXEC flag. + dlsym!(fn epoll_create1(c_int) -> c_int); + + match epoll_create1.get() { + Some(epoll_create1_fn) => { + cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))? + } + None => { + let fd = cvt(libc::epoll_create(1024))?; + drop(set_cloexec(fd)); + fd + } + } + }; + + // offset by 1 to avoid choosing 0 as the id of a selector + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1; + + Ok(Selector { + id: id, + epfd: epfd, + }) + } + + pub fn id(&self) -> usize { + self.id + } + + /// Wait for events from the OS + pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> { + let timeout_ms = timeout + .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32) + .unwrap_or(-1); + + // Wait for epoll events for at most timeout_ms milliseconds + evts.clear(); + unsafe { + let cnt = cvt(libc::epoll_wait(self.epfd, + evts.events.as_mut_ptr(), + evts.events.capacity() as i32, + timeout_ms))?; + let cnt = cnt as usize; + evts.events.set_len(cnt); + + for i in 0..cnt { + if evts.events[i].u64 as usize == awakener.into() { + evts.events.remove(i); + return Ok(true); + } + } + } + + Ok(false) + } + + /// Register event interests for the given IO handle with the OS + pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> { + let mut info = libc::epoll_event { + events: ioevent_to_epoll(interests, opts), + u64: usize::from(token) as u64 + }; + + unsafe { + cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?; + Ok(()) + } + } + + /// Register event interests for the given IO handle with the OS + pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> { + let mut info = libc::epoll_event { + events: ioevent_to_epoll(interests, opts), + u64: usize::from(token) as u64 + }; + + unsafe { + cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?; + Ok(()) + } + } + + /// Deregister event interests for the given IO handle with the OS + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + // The &info argument should be ignored by the system, + // but linux < 2.6.9 required it to be not null. + // For compatibility, we provide a dummy EpollEvent. + let mut info = libc::epoll_event { + events: 0, + u64: 0, + }; + + unsafe { + cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?; + Ok(()) + } + } +} + +fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 { + let mut kind = 0; + + if interest.is_readable() { + kind |= EPOLLIN; + } + + if interest.is_writable() { + kind |= EPOLLOUT; + } + + if UnixReady::from(interest).is_priority() { + kind |= EPOLLPRI; + } + + if opts.is_edge() { + kind |= EPOLLET; + } + + if opts.is_oneshot() { + kind |= EPOLLONESHOT; + } + + if opts.is_level() { + kind &= !EPOLLET; + } + + kind as u32 +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + self.epfd + } +} + +impl Drop for Selector { + fn drop(&mut self) { + unsafe { + let _ = libc::close(self.epfd); + } + } +} + +pub struct Events { + events: Vec<libc::epoll_event>, +} + +impl Events { + pub fn with_capacity(u: usize) -> Events { + Events { + events: Vec::with_capacity(u) + } + } + + #[inline] + pub fn len(&self) -> usize { + self.events.len() + } + + #[inline] + pub fn capacity(&self) -> usize { + self.events.capacity() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.events.is_empty() + } + + #[inline] + pub fn get(&self, idx: usize) -> Option<Event> { + self.events.get(idx).map(|event| { + let epoll = event.events as c_int; + let mut kind = Ready::empty(); + + if (epoll & EPOLLIN) != 0 { + kind = kind | Ready::readable(); + } + + if (epoll & EPOLLPRI) != 0 { + kind = kind | Ready::readable() | UnixReady::priority(); + } + + if (epoll & EPOLLOUT) != 0 { + kind = kind | Ready::writable(); + } + + // EPOLLHUP - Usually means a socket error happened + if (epoll & EPOLLERR) != 0 { + kind = kind | UnixReady::error(); + } + + if (epoll & EPOLLHUP) != 0 { + kind = kind | UnixReady::hup(); + } + + let token = self.events[idx].u64; + + Event::new(kind, Token(token as usize)) + }) + } + + pub fn push_event(&mut self, event: Event) { + self.events.push(libc::epoll_event { + events: ioevent_to_epoll(event.readiness(), PollOpt::empty()), + u64: usize::from(event.token()) as u64 + }); + } + + pub fn clear(&mut self) { + unsafe { self.events.set_len(0); } + } +} + +const NANOS_PER_MILLI: u32 = 1_000_000; +const MILLIS_PER_SEC: u64 = 1_000; + +/// Convert a `Duration` to milliseconds, rounding up and saturating at +/// `u64::MAX`. +/// +/// The saturating is fine because `u64::MAX` milliseconds are still many +/// million years. +pub fn millis(duration: Duration) -> u64 { + // Round up. + let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI; + duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64) +} diff --git a/third_party/rust/mio/src/sys/unix/eventedfd.rs b/third_party/rust/mio/src/sys/unix/eventedfd.rs new file mode 100644 index 0000000000..72586f6652 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/eventedfd.rs @@ -0,0 +1,107 @@ +use {io, poll, Ready, Poll, PollOpt, Token}; +use event::Evented; +use std::os::unix::io::RawFd; + +/* + * + * ===== EventedFd ===== + * + */ + +#[derive(Debug)] + +/// Adapter for [`RawFd`] providing an [`Evented`] implementation. +/// +/// `EventedFd` enables registering any type with an FD with [`Poll`]. +/// +/// While only implementations for TCP and UDP are provided, Mio supports +/// registering any FD that can be registered with the underlying OS selector. +/// `EventedFd` provides the necessary bridge. +/// +/// Note that `EventedFd` takes a `&RawFd`. This is because `EventedFd` **does +/// not** take ownership of the FD. Specifically, it will not manage any +/// lifecycle related operations, such as closing the FD on drop. It is expected +/// that the `EventedFd` is constructed right before a call to +/// [`Poll::register`]. See the examples for more detail. +/// +/// # Examples +/// +/// Basic usage +/// +/// ``` +/// # use std::error::Error; +/// # fn try_main() -> Result<(), Box<Error>> { +/// use mio::{Ready, Poll, PollOpt, Token}; +/// use mio::unix::EventedFd; +/// +/// use std::os::unix::io::AsRawFd; +/// use std::net::TcpListener; +/// +/// // Bind a std listener +/// let listener = TcpListener::bind("127.0.0.1:0")?; +/// +/// let poll = Poll::new()?; +/// +/// // Register the listener +/// poll.register(&EventedFd(&listener.as_raw_fd()), +/// Token(0), Ready::readable(), PollOpt::edge())?; +/// # Ok(()) +/// # } +/// # +/// # fn main() { +/// # try_main().unwrap(); +/// # } +/// ``` +/// +/// Implementing [`Evented`] for a custom type backed by a [`RawFd`]. +/// +/// ``` +/// use mio::{Ready, Poll, PollOpt, Token}; +/// use mio::event::Evented; +/// use mio::unix::EventedFd; +/// +/// use std::os::unix::io::RawFd; +/// use std::io; +/// +/// pub struct MyIo { +/// fd: RawFd, +/// } +/// +/// impl Evented for MyIo { +/// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) +/// -> io::Result<()> +/// { +/// EventedFd(&self.fd).register(poll, token, interest, opts) +/// } +/// +/// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) +/// -> io::Result<()> +/// { +/// EventedFd(&self.fd).reregister(poll, token, interest, opts) +/// } +/// +/// fn deregister(&self, poll: &Poll) -> io::Result<()> { +/// EventedFd(&self.fd).deregister(poll) +/// } +/// } +/// ``` +/// +/// [`RawFd`]: https://doc.rust-lang.org/std/os/unix/io/type.RawFd.html +/// [`Evented`]: ../event/trait.Evented.html +/// [`Poll`]: ../struct.Poll.html +/// [`Poll::register`]: ../struct.Poll.html#method.register +pub struct EventedFd<'a>(pub &'a RawFd); + +impl<'a> Evented for EventedFd<'a> { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + poll::selector(poll).register(*self.0, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + poll::selector(poll).reregister(*self.0, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + poll::selector(poll).deregister(*self.0) + } +} diff --git a/third_party/rust/mio/src/sys/unix/io.rs b/third_party/rust/mio/src/sys/unix/io.rs new file mode 100644 index 0000000000..47a3a70d1f --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/io.rs @@ -0,0 +1,107 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd}; + +use libc; + +use {io, Ready, Poll, PollOpt, Token}; +use event::Evented; +use unix::EventedFd; +use sys::unix::cvt; + +pub fn set_nonblock(fd: libc::c_int) -> io::Result<()> { + unsafe { + let flags = libc::fcntl(fd, libc::F_GETFL); + cvt(libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK)).map(|_|()) + } +} + +pub fn set_cloexec(fd: libc::c_int) -> io::Result<()> { + unsafe { + let flags = libc::fcntl(fd, libc::F_GETFD); + cvt(libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC)).map(|_| ()) + } +} + +/* + * + * ===== Basic IO type ===== + * + */ + +/// Manages a FD +#[derive(Debug)] +pub struct Io { + fd: File, +} + +impl Io { + /// Try to clone the FD + pub fn try_clone(&self) -> io::Result<Io> { + Ok(Io { fd: self.fd.try_clone()? }) + } +} + +impl FromRawFd for Io { + unsafe fn from_raw_fd(fd: RawFd) -> Io { + Io { fd: File::from_raw_fd(fd) } + } +} + +impl IntoRawFd for Io { + fn into_raw_fd(self) -> RawFd { + self.fd.into_raw_fd() + } +} + +impl AsRawFd for Io { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + +impl Evented for Io { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).deregister(poll) + } +} + +impl Read for Io { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + (&self.fd).read(dst) + } +} + +impl<'a> Read for &'a Io { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + (&self.fd).read(dst) + } +} + +impl Write for Io { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + (&self.fd).write(src) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.fd).flush() + } +} + +impl<'a> Write for &'a Io { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + (&self.fd).write(src) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.fd).flush() + } +} diff --git a/third_party/rust/mio/src/sys/unix/kqueue.rs b/third_party/rust/mio/src/sys/unix/kqueue.rs new file mode 100644 index 0000000000..59c70e1e18 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/kqueue.rs @@ -0,0 +1,360 @@ +use std::{cmp, fmt, ptr}; +#[cfg(not(target_os = "netbsd"))] +use std::os::raw::{c_int, c_short}; +use std::os::unix::io::AsRawFd; +use std::os::unix::io::RawFd; +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::time::Duration; + +use libc::{self, time_t}; + +use {io, Ready, PollOpt, Token}; +use event_imp::{self as event, Event}; +use sys::unix::{cvt, UnixReady}; +use sys::unix::io::set_cloexec; + +/// Each Selector has a globally unique(ish) ID associated with it. This ID +/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first +/// registered with the `Selector`. If a type that is previously associated with +/// a `Selector` attempts to register itself with a different `Selector`, the +/// operation will return with an error. This matches windows behavior. +static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; + +#[cfg(not(target_os = "netbsd"))] +type Filter = c_short; +#[cfg(not(target_os = "netbsd"))] +type UData = *mut ::libc::c_void; +#[cfg(not(target_os = "netbsd"))] +type Count = c_int; + +#[cfg(target_os = "netbsd")] +type Filter = u32; +#[cfg(target_os = "netbsd")] +type UData = ::libc::intptr_t; +#[cfg(target_os = "netbsd")] +type Count = usize; + +macro_rules! kevent { + ($id: expr, $filter: expr, $flags: expr, $data: expr) => { + libc::kevent { + ident: $id as ::libc::uintptr_t, + filter: $filter as Filter, + flags: $flags, + fflags: 0, + data: 0, + udata: $data as UData, + } + } +} + +pub struct Selector { + id: usize, + kq: RawFd, +} + +impl Selector { + pub fn new() -> io::Result<Selector> { + // offset by 1 to avoid choosing 0 as the id of a selector + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1; + let kq = unsafe { cvt(libc::kqueue())? }; + drop(set_cloexec(kq)); + + Ok(Selector { + id, + kq, + }) + } + + pub fn id(&self) -> usize { + self.id + } + + pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> { + let timeout = timeout.map(|to| { + libc::timespec { + tv_sec: cmp::min(to.as_secs(), time_t::max_value() as u64) as time_t, + // `Duration::subsec_nanos` is guaranteed to be less than one + // billion (the number of nanoseconds in a second), making the + // cast to i32 safe. The cast itself is needed for platforms + // where C's long is only 32 bits. + tv_nsec: libc::c_long::from(to.subsec_nanos() as i32), + } + }); + let timeout = timeout.as_ref().map(|s| s as *const _).unwrap_or(ptr::null_mut()); + + evts.clear(); + unsafe { + let cnt = cvt(libc::kevent(self.kq, + ptr::null(), + 0, + evts.sys_events.0.as_mut_ptr(), + evts.sys_events.0.capacity() as Count, + timeout))?; + evts.sys_events.0.set_len(cnt as usize); + Ok(evts.coalesce(awakener)) + } + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> { + trace!("registering; token={:?}; interests={:?}", token, interests); + + let flags = if opts.contains(PollOpt::edge()) { libc::EV_CLEAR } else { 0 } | + if opts.contains(PollOpt::oneshot()) { libc::EV_ONESHOT } else { 0 } | + libc::EV_RECEIPT; + + unsafe { + let r = if interests.contains(Ready::readable()) { libc::EV_ADD } else { libc::EV_DELETE }; + let w = if interests.contains(Ready::writable()) { libc::EV_ADD } else { libc::EV_DELETE }; + let mut changes = [ + kevent!(fd, libc::EVFILT_READ, flags | r, usize::from(token)), + kevent!(fd, libc::EVFILT_WRITE, flags | w, usize::from(token)), + ]; + + cvt(libc::kevent(self.kq, + changes.as_ptr(), + changes.len() as Count, + changes.as_mut_ptr(), + changes.len() as Count, + ::std::ptr::null()))?; + + for change in changes.iter() { + debug_assert_eq!(change.flags & libc::EV_ERROR, libc::EV_ERROR); + + // Test to see if an error happened + if change.data == 0 { + continue + } + + // Older versions of OSX (10.11 and 10.10 have been witnessed) + // can return EPIPE when registering a pipe file descriptor + // where the other end has already disappeared. For example code + // that creates a pipe, closes a file descriptor, and then + // registers the other end will see an EPIPE returned from + // `register`. + // + // It also turns out that kevent will still report events on the + // file descriptor, telling us that it's readable/hup at least + // after we've done this registration. As a result we just + // ignore `EPIPE` here instead of propagating it. + // + // More info can be found at carllerche/mio#582 + if change.data as i32 == libc::EPIPE && + change.filter == libc::EVFILT_WRITE as Filter { + continue + } + + // ignore ENOENT error for EV_DELETE + let orig_flags = if change.filter == libc::EVFILT_READ as Filter { r } else { w }; + if change.data as i32 == libc::ENOENT && orig_flags & libc::EV_DELETE != 0 { + continue + } + + return Err(::std::io::Error::from_raw_os_error(change.data as i32)); + } + Ok(()) + } + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> { + // Just need to call register here since EV_ADD is a mod if already + // registered + self.register(fd, token, interests, opts) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + unsafe { + // EV_RECEIPT is a nice way to apply changes and get back per-event results while not + // draining the actual changes. + let filter = libc::EV_DELETE | libc::EV_RECEIPT; +#[cfg(not(target_os = "netbsd"))] + let mut changes = [ + kevent!(fd, libc::EVFILT_READ, filter, ptr::null_mut()), + kevent!(fd, libc::EVFILT_WRITE, filter, ptr::null_mut()), + ]; + +#[cfg(target_os = "netbsd")] + let mut changes = [ + kevent!(fd, libc::EVFILT_READ, filter, 0), + kevent!(fd, libc::EVFILT_WRITE, filter, 0), + ]; + + cvt(libc::kevent(self.kq, + changes.as_ptr(), + changes.len() as Count, + changes.as_mut_ptr(), + changes.len() as Count, + ::std::ptr::null())).map(|_| ())?; + + if changes[0].data as i32 == libc::ENOENT && changes[1].data as i32 == libc::ENOENT { + return Err(::std::io::Error::from_raw_os_error(changes[0].data as i32)); + } + for change in changes.iter() { + debug_assert_eq!(libc::EV_ERROR & change.flags, libc::EV_ERROR); + if change.data != 0 && change.data as i32 != libc::ENOENT { + return Err(::std::io::Error::from_raw_os_error(changes[0].data as i32)); + } + } + Ok(()) + } + } +} + +impl fmt::Debug for Selector { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Selector") + .field("id", &self.id) + .field("kq", &self.kq) + .finish() + } +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + self.kq + } +} + +impl Drop for Selector { + fn drop(&mut self) { + unsafe { + let _ = libc::close(self.kq); + } + } +} + +pub struct Events { + sys_events: KeventList, + events: Vec<Event>, + event_map: HashMap<Token, usize>, +} + +struct KeventList(Vec<libc::kevent>); + +unsafe impl Send for KeventList {} +unsafe impl Sync for KeventList {} + +impl Events { + pub fn with_capacity(cap: usize) -> Events { + Events { + sys_events: KeventList(Vec::with_capacity(cap)), + events: Vec::with_capacity(cap), + event_map: HashMap::with_capacity(cap) + } + } + + #[inline] + pub fn len(&self) -> usize { + self.events.len() + } + + #[inline] + pub fn capacity(&self) -> usize { + self.events.capacity() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.events.is_empty() + } + + pub fn get(&self, idx: usize) -> Option<Event> { + self.events.get(idx).cloned() + } + + fn coalesce(&mut self, awakener: Token) -> bool { + let mut ret = false; + self.events.clear(); + self.event_map.clear(); + + for e in self.sys_events.0.iter() { + let token = Token(e.udata as usize); + let len = self.events.len(); + + if token == awakener { + // TODO: Should this return an error if event is an error. It + // is not critical as spurious wakeups are permitted. + ret = true; + continue; + } + + let idx = *self.event_map.entry(token) + .or_insert(len); + + if idx == len { + // New entry, insert the default + self.events.push(Event::new(Ready::empty(), token)); + + } + + if e.flags & libc::EV_ERROR != 0 { + event::kind_mut(&mut self.events[idx]).insert(*UnixReady::error()); + } + + if e.filter == libc::EVFILT_READ as Filter { + event::kind_mut(&mut self.events[idx]).insert(Ready::readable()); + } else if e.filter == libc::EVFILT_WRITE as Filter { + event::kind_mut(&mut self.events[idx]).insert(Ready::writable()); + } +#[cfg(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos"))] + { + if e.filter == libc::EVFILT_AIO { + event::kind_mut(&mut self.events[idx]).insert(UnixReady::aio()); + } + } +#[cfg(any(target_os = "freebsd"))] + { + if e.filter == libc::EVFILT_LIO { + event::kind_mut(&mut self.events[idx]).insert(UnixReady::lio()); + } + } + } + + ret + } + + pub fn push_event(&mut self, event: Event) { + self.events.push(event); + } + + pub fn clear(&mut self) { + self.sys_events.0.truncate(0); + self.events.truncate(0); + self.event_map.clear(); + } +} + +impl fmt::Debug for Events { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Events") + .field("len", &self.sys_events.0.len()) + .finish() + } +} + +#[test] +fn does_not_register_rw() { + use {Poll, Ready, PollOpt, Token}; + use unix::EventedFd; + + let kq = unsafe { libc::kqueue() }; + let kqf = EventedFd(&kq); + let poll = Poll::new().unwrap(); + + // registering kqueue fd will fail if write is requested (On anything but some versions of OS + // X) + poll.register(&kqf, Token(1234), Ready::readable(), + PollOpt::edge() | PollOpt::oneshot()).unwrap(); +} + +#[cfg(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos"))] +#[test] +fn test_coalesce_aio() { + let mut events = Events::with_capacity(1); + events.sys_events.0.push(kevent!(0x1234, libc::EVFILT_AIO, 0, 42)); + events.coalesce(Token(0)); + assert!(events.events[0].readiness() == UnixReady::aio().into()); + assert!(events.events[0].token() == Token(42)); +} diff --git a/third_party/rust/mio/src/sys/unix/mod.rs b/third_party/rust/mio/src/sys/unix/mod.rs new file mode 100644 index 0000000000..5bb83070d2 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/mod.rs @@ -0,0 +1,95 @@ +use libc::{self, c_int}; + +#[macro_use] +pub mod dlsym; + +#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] +mod epoll; + +#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] +pub use self::epoll::{Events, Selector}; + +#[cfg(any(target_os = "bitrig", target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos", + target_os = "netbsd", target_os = "openbsd"))] +mod kqueue; + +#[cfg(any(target_os = "bitrig", target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos", + target_os = "netbsd", target_os = "openbsd"))] +pub use self::kqueue::{Events, Selector}; + +mod awakener; +mod eventedfd; +mod io; +mod ready; +mod tcp; +mod udp; +mod uio; + +#[cfg(feature = "with-deprecated")] +mod uds; + +pub use self::awakener::Awakener; +pub use self::eventedfd::EventedFd; +pub use self::io::{Io, set_nonblock}; +pub use self::ready::{UnixReady, READY_ALL}; +pub use self::tcp::{TcpStream, TcpListener}; +pub use self::udp::UdpSocket; + +#[cfg(feature = "with-deprecated")] +pub use self::uds::UnixSocket; + +pub use iovec::IoVec; + +use std::os::unix::io::FromRawFd; + +pub fn pipe() -> ::io::Result<(Io, Io)> { + // Use pipe2 for atomically setting O_CLOEXEC if we can, but otherwise + // just fall back to using `pipe`. + dlsym!(fn pipe2(*mut c_int, c_int) -> c_int); + + let mut pipes = [0; 2]; + unsafe { + match pipe2.get() { + Some(pipe2_fn) => { + let flags = libc::O_NONBLOCK | libc::O_CLOEXEC; + cvt(pipe2_fn(pipes.as_mut_ptr(), flags))?; + Ok((Io::from_raw_fd(pipes[0]), Io::from_raw_fd(pipes[1]))) + } + None => { + cvt(libc::pipe(pipes.as_mut_ptr()))?; + // Ensure the pipe are closed if any of the system calls below + // fail. + let r = Io::from_raw_fd(pipes[0]); + let w = Io::from_raw_fd(pipes[1]); + cvt(libc::fcntl(pipes[0], libc::F_SETFD, libc::FD_CLOEXEC))?; + cvt(libc::fcntl(pipes[1], libc::F_SETFD, libc::FD_CLOEXEC))?; + cvt(libc::fcntl(pipes[0], libc::F_SETFL, libc::O_NONBLOCK))?; + cvt(libc::fcntl(pipes[1], libc::F_SETFL, libc::O_NONBLOCK))?; + Ok((r, w)) + } + } + } +} + +trait IsMinusOne { + fn is_minus_one(&self) -> bool; +} + +impl IsMinusOne for i32 { + fn is_minus_one(&self) -> bool { *self == -1 } +} +impl IsMinusOne for isize { + fn is_minus_one(&self) -> bool { *self == -1 } +} + +fn cvt<T: IsMinusOne>(t: T) -> ::io::Result<T> { + use std::io; + + if t.is_minus_one() { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } +} diff --git a/third_party/rust/mio/src/sys/unix/ready.rs b/third_party/rust/mio/src/sys/unix/ready.rs new file mode 100644 index 0000000000..1780ceae9f --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/ready.rs @@ -0,0 +1,499 @@ +use event_imp::{Ready, ready_as_usize, ready_from_usize}; + +use std::ops; +use std::fmt; + +/// Unix specific extensions to `Ready` +/// +/// Provides additional readiness event kinds that are available on unix +/// platforms. Unix platforms are able to provide readiness events for +/// additional socket events, such as HUP and error. +/// +/// HUP events occur when the remote end of a socket hangs up. In the TCP case, +/// this occurs when the remote end of a TCP socket shuts down writes. +/// +/// Error events occur when the socket enters an error state. In this case, the +/// socket will also receive a readable or writable event. Reading or writing to +/// the socket will result in an error. +/// +/// Conversion traits are implemented between `Ready` and `UnixReady`. See the +/// examples. +/// +/// For high level documentation on polling and readiness, see [`Poll`]. +/// +/// # Examples +/// +/// Most of the time, all that is needed is using bit operations +/// +/// ``` +/// use mio::Ready; +/// use mio::unix::UnixReady; +/// +/// let ready = Ready::readable() | UnixReady::hup(); +/// +/// assert!(ready.is_readable()); +/// assert!(UnixReady::from(ready).is_hup()); +/// ``` +/// +/// Basic conversion between ready types. +/// +/// ``` +/// use mio::Ready; +/// use mio::unix::UnixReady; +/// +/// // Start with a portable ready +/// let ready = Ready::readable(); +/// +/// // Convert to a unix ready, adding HUP +/// let mut unix_ready = UnixReady::from(ready) | UnixReady::hup(); +/// +/// unix_ready.insert(UnixReady::error()); +/// +/// // `unix_ready` maintains readable interest +/// assert!(unix_ready.is_readable()); +/// assert!(unix_ready.is_hup()); +/// assert!(unix_ready.is_error()); +/// +/// // Convert back to `Ready` +/// let ready = Ready::from(unix_ready); +/// +/// // Readable is maintained +/// assert!(ready.is_readable()); +/// ``` +/// +/// Registering readable and error interest on a socket +/// +/// ``` +/// # use std::error::Error; +/// # fn try_main() -> Result<(), Box<Error>> { +/// use mio::{Ready, Poll, PollOpt, Token}; +/// use mio::net::TcpStream; +/// use mio::unix::UnixReady; +/// +/// let addr = "216.58.193.68:80".parse()?; +/// let socket = TcpStream::connect(&addr)?; +/// +/// let poll = Poll::new()?; +/// +/// poll.register(&socket, +/// Token(0), +/// Ready::readable() | UnixReady::error(), +/// PollOpt::edge())?; +/// # Ok(()) +/// # } +/// # +/// # fn main() { +/// # try_main().unwrap(); +/// # } +/// ``` +/// +/// [`Poll`]: ../struct.Poll.html +/// [readiness]: struct.Poll.html#readiness-operations +#[derive(Copy, PartialEq, Eq, Clone, PartialOrd, Ord)] +pub struct UnixReady(Ready); + +const ERROR: usize = 0b00_0100; +const HUP: usize = 0b00_1000; + +#[cfg(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos"))] +const AIO: usize = 0b01_0000; + +#[cfg(not(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos")))] +const AIO: usize = 0b00_0000; + +#[cfg(any(target_os = "freebsd"))] +const LIO: usize = 0b10_0000; + +#[cfg(not(any(target_os = "freebsd")))] +const LIO: usize = 0b00_0000; + +#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] +const PRI: usize = 0b100_0000; + +#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "solaris")))] +const PRI: usize = 0; + +// Export to support `Ready::all` +pub const READY_ALL: usize = ERROR | HUP | AIO | LIO | PRI; + +#[test] +fn test_ready_all() { + let readable = Ready::readable().as_usize(); + let writable = Ready::writable().as_usize(); + + assert_eq!( + READY_ALL | readable | writable, + ERROR + HUP + AIO + LIO + PRI + readable + writable + ); + + // Issue #896. + #[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))] + assert!(!Ready::from(UnixReady::priority()).is_writable()); +} + +impl UnixReady { + /// Returns a `Ready` representing AIO completion readiness + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::aio(); + /// + /// assert!(ready.is_aio()); + /// ``` + /// + /// [`Poll`]: ../struct.Poll.html + #[inline] + #[cfg(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos"))] + pub fn aio() -> UnixReady { + UnixReady(ready_from_usize(AIO)) + } + + #[cfg(not(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos")))] + #[deprecated(since = "0.6.12", note = "this function is now platform specific")] + #[doc(hidden)] + pub fn aio() -> UnixReady { + UnixReady(Ready::empty()) + } + + /// Returns a `Ready` representing error readiness. + /// + /// **Note that only readable and writable readiness is guaranteed to be + /// supported on all platforms**. This means that `error` readiness + /// should be treated as a hint. For more details, see [readiness] in the + /// poll documentation. + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::error(); + /// + /// assert!(ready.is_error()); + /// ``` + /// + /// [`Poll`]: ../struct.Poll.html + /// [readiness]: ../struct.Poll.html#readiness-operations + #[inline] + pub fn error() -> UnixReady { + UnixReady(ready_from_usize(ERROR)) + } + + /// Returns a `Ready` representing HUP readiness. + /// + /// A HUP (or hang-up) signifies that a stream socket **peer** closed the + /// connection, or shut down the writing half of the connection. + /// + /// **Note that only readable and writable readiness is guaranteed to be + /// supported on all platforms**. This means that `hup` readiness + /// should be treated as a hint. For more details, see [readiness] in the + /// poll documentation. It is also unclear if HUP readiness will remain in 0.7. See + /// [here][issue-941]. + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::hup(); + /// + /// assert!(ready.is_hup()); + /// ``` + /// + /// [`Poll`]: ../struct.Poll.html + /// [readiness]: ../struct.Poll.html#readiness-operations + /// [issue-941]: https://github.com/tokio-rs/mio/issues/941 + #[inline] + pub fn hup() -> UnixReady { + UnixReady(ready_from_usize(HUP)) + } + + /// Returns a `Ready` representing LIO completion readiness + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::lio(); + /// + /// assert!(ready.is_lio()); + /// ``` + /// + /// [`Poll`]: struct.Poll.html + #[inline] + #[cfg(any(target_os = "freebsd"))] + pub fn lio() -> UnixReady { + UnixReady(ready_from_usize(LIO)) + } + + /// Returns a `Ready` representing priority (`EPOLLPRI`) readiness + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::priority(); + /// + /// assert!(ready.is_priority()); + /// ``` + /// + /// [`Poll`]: struct.Poll.html + #[inline] + #[cfg(any(target_os = "linux", + target_os = "android", target_os = "solaris"))] + pub fn priority() -> UnixReady { + UnixReady(ready_from_usize(PRI)) + } + + /// Returns true if `Ready` contains AIO readiness + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::aio(); + /// + /// assert!(ready.is_aio()); + /// ``` + /// + /// [`Poll`]: ../struct.Poll.html + #[inline] + #[cfg(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos"))] + pub fn is_aio(&self) -> bool { + self.contains(ready_from_usize(AIO)) + } + + #[deprecated(since = "0.6.12", note = "this function is now platform specific")] + #[cfg(feature = "with-deprecated")] + #[cfg(not(any(target_os = "dragonfly", + target_os = "freebsd", target_os = "ios", target_os = "macos")))] + #[doc(hidden)] + pub fn is_aio(&self) -> bool { + false + } + + /// Returns true if the value includes error readiness + /// + /// **Note that only readable and writable readiness is guaranteed to be + /// supported on all platforms**. This means that `error` readiness should + /// be treated as a hint. For more details, see [readiness] in the poll + /// documentation. + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::error(); + /// + /// assert!(ready.is_error()); + /// ``` + /// + /// [`Poll`]: ../struct.Poll.html + /// [readiness]: ../struct.Poll.html#readiness-operations + #[inline] + pub fn is_error(&self) -> bool { + self.contains(ready_from_usize(ERROR)) + } + + /// Returns true if the value includes HUP readiness + /// + /// A HUP (or hang-up) signifies that a stream socket **peer** closed the + /// connection, or shut down the writing half of the connection. + /// + /// **Note that only readable and writable readiness is guaranteed to be + /// supported on all platforms**. This means that `hup` readiness + /// should be treated as a hint. For more details, see [readiness] in the + /// poll documentation. + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::hup(); + /// + /// assert!(ready.is_hup()); + /// ``` + /// + /// [`Poll`]: ../struct.Poll.html + /// [readiness]: ../struct.Poll.html#readiness-operations + #[inline] + pub fn is_hup(&self) -> bool { + self.contains(ready_from_usize(HUP)) + } + + /// Returns true if `Ready` contains LIO readiness + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::lio(); + /// + /// assert!(ready.is_lio()); + /// ``` + #[inline] + #[cfg(any(target_os = "freebsd"))] + pub fn is_lio(&self) -> bool { + self.contains(ready_from_usize(LIO)) + } + + /// Returns true if `Ready` contains priority (`EPOLLPRI`) readiness + /// + /// See [`Poll`] for more documentation on polling. + /// + /// # Examples + /// + /// ``` + /// use mio::unix::UnixReady; + /// + /// let ready = UnixReady::priority(); + /// + /// assert!(ready.is_priority()); + /// ``` + /// + /// [`Poll`]: struct.Poll.html + #[inline] + #[cfg(any(target_os = "linux", + target_os = "android", target_os = "solaris"))] + pub fn is_priority(&self) -> bool { + self.contains(ready_from_usize(PRI)) + } +} + +impl From<Ready> for UnixReady { + fn from(src: Ready) -> UnixReady { + UnixReady(src) + } +} + +impl From<UnixReady> for Ready { + fn from(src: UnixReady) -> Ready { + src.0 + } +} + +impl ops::Deref for UnixReady { + type Target = Ready; + + fn deref(&self) -> &Ready { + &self.0 + } +} + +impl ops::DerefMut for UnixReady { + fn deref_mut(&mut self) -> &mut Ready { + &mut self.0 + } +} + +impl ops::BitOr for UnixReady { + type Output = UnixReady; + + #[inline] + fn bitor(self, other: UnixReady) -> UnixReady { + (self.0 | other.0).into() + } +} + +impl ops::BitXor for UnixReady { + type Output = UnixReady; + + #[inline] + fn bitxor(self, other: UnixReady) -> UnixReady { + (self.0 ^ other.0).into() + } +} + +impl ops::BitAnd for UnixReady { + type Output = UnixReady; + + #[inline] + fn bitand(self, other: UnixReady) -> UnixReady { + (self.0 & other.0).into() + } +} + +impl ops::Sub for UnixReady { + type Output = UnixReady; + + #[inline] + fn sub(self, other: UnixReady) -> UnixReady { + ready_from_usize(ready_as_usize(self.0) & !ready_as_usize(other.0)).into() + } +} + +#[deprecated(since = "0.6.10", note = "removed")] +#[cfg(feature = "with-deprecated")] +#[doc(hidden)] +impl ops::Not for UnixReady { + type Output = UnixReady; + + #[inline] + fn not(self) -> UnixReady { + (!self.0).into() + } +} + +impl fmt::Debug for UnixReady { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let mut one = false; + let flags = [ + (UnixReady(Ready::readable()), "Readable"), + (UnixReady(Ready::writable()), "Writable"), + (UnixReady::error(), "Error"), + (UnixReady::hup(), "Hup"), + #[allow(deprecated)] + (UnixReady::aio(), "Aio"), + #[cfg(any(target_os = "linux", + target_os = "android", target_os = "solaris"))] + (UnixReady::priority(), "Priority"), + ]; + + for &(flag, msg) in &flags { + if self.contains(flag) { + if one { write!(fmt, " | ")? } + write!(fmt, "{}", msg)?; + + one = true + } + } + + if !one { + fmt.write_str("(empty)")?; + } + + Ok(()) + } +} diff --git a/third_party/rust/mio/src/sys/unix/tcp.rs b/third_party/rust/mio/src/sys/unix/tcp.rs new file mode 100644 index 0000000000..79c18c74fd --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/tcp.rs @@ -0,0 +1,286 @@ +use std::fmt; +use std::io::{Read, Write}; +use std::net::{self, SocketAddr}; +use std::os::unix::io::{RawFd, FromRawFd, IntoRawFd, AsRawFd}; +use std::time::Duration; + +use libc; +use net2::TcpStreamExt; +use iovec::IoVec; + +use {io, Ready, Poll, PollOpt, Token}; +use event::Evented; + +use sys::unix::eventedfd::EventedFd; +use sys::unix::io::set_nonblock; +use sys::unix::uio::VecIo; + +pub struct TcpStream { + inner: net::TcpStream, +} + +pub struct TcpListener { + inner: net::TcpListener, +} + +impl TcpStream { + pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> { + set_nonblock(stream.as_raw_fd())?; + + match stream.connect(addr) { + Ok(..) => {} + Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(e) => return Err(e), + } + + Ok(TcpStream { + inner: stream, + }) + } + + pub fn from_stream(stream: net::TcpStream) -> TcpStream { + TcpStream { + inner: stream, + } + } + + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } + + pub fn try_clone(&self) -> io::Result<TcpStream> { + self.inner.try_clone().map(|s| { + TcpStream { + inner: s, + } + }) + } + + pub fn shutdown(&self, how: net::Shutdown) -> io::Result<()> { + self.inner.shutdown(how) + } + + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.inner.set_nodelay(nodelay) + } + + pub fn nodelay(&self) -> io::Result<bool> { + self.inner.nodelay() + } + + pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { + self.inner.set_recv_buffer_size(size) + } + + pub fn recv_buffer_size(&self) -> io::Result<usize> { + self.inner.recv_buffer_size() + } + + pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { + self.inner.set_send_buffer_size(size) + } + + pub fn send_buffer_size(&self) -> io::Result<usize> { + self.inner.send_buffer_size() + } + + pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { + self.inner.set_keepalive(keepalive) + } + + pub fn keepalive(&self) -> io::Result<Option<Duration>> { + self.inner.keepalive() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.inner.set_ttl(ttl) + } + + pub fn ttl(&self) -> io::Result<u32> { + self.inner.ttl() + } + + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.inner.set_only_v6(only_v6) + } + + pub fn only_v6(&self) -> io::Result<bool> { + self.inner.only_v6() + } + + pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { + self.inner.set_linger(dur) + } + + pub fn linger(&self) -> io::Result<Option<Duration>> { + self.inner.linger() + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.inner.take_error() + } + + pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.peek(buf) + } + + pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> { + self.inner.readv(bufs) + } + + pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> { + self.inner.writev(bufs) + } +} + +impl<'a> Read for &'a TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + (&self.inner).read(buf) + } +} + +impl<'a> Write for &'a TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + (&self.inner).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.inner).flush() + } +} + +impl Evented for TcpStream { + fn register(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).deregister(poll) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.inner, f) + } +} + +impl FromRawFd for TcpStream { + unsafe fn from_raw_fd(fd: RawFd) -> TcpStream { + TcpStream { + inner: net::TcpStream::from_raw_fd(fd), + } + } +} + +impl IntoRawFd for TcpStream { + fn into_raw_fd(self) -> RawFd { + self.inner.into_raw_fd() + } +} + +impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl TcpListener { + pub fn new(inner: net::TcpListener) -> io::Result<TcpListener> { + set_nonblock(inner.as_raw_fd())?; + Ok(TcpListener { + inner, + }) + } + + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } + + pub fn try_clone(&self) -> io::Result<TcpListener> { + self.inner.try_clone().map(|s| { + TcpListener { + inner: s, + } + }) + } + + pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> { + self.inner.accept() + } + + #[allow(deprecated)] + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.inner.set_only_v6(only_v6) + } + + #[allow(deprecated)] + pub fn only_v6(&self) -> io::Result<bool> { + self.inner.only_v6() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.inner.set_ttl(ttl) + } + + pub fn ttl(&self) -> io::Result<u32> { + self.inner.ttl() + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.inner.take_error() + } +} + +impl Evented for TcpListener { + fn register(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, + interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).deregister(poll) + } +} + +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.inner, f) + } +} + +impl FromRawFd for TcpListener { + unsafe fn from_raw_fd(fd: RawFd) -> TcpListener { + TcpListener { + inner: net::TcpListener::from_raw_fd(fd), + } + } +} + +impl IntoRawFd for TcpListener { + fn into_raw_fd(self) -> RawFd { + self.inner.into_raw_fd() + } +} + +impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + diff --git a/third_party/rust/mio/src/sys/unix/udp.rs b/third_party/rust/mio/src/sys/unix/udp.rs new file mode 100644 index 0000000000..c77a9d6380 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/udp.rs @@ -0,0 +1,181 @@ +use {io, Ready, Poll, PollOpt, Token}; +use event::Evented; +use unix::EventedFd; +use sys::unix::uio::VecIo; +use std::fmt; +use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::os::unix::io::{RawFd, IntoRawFd, AsRawFd, FromRawFd}; + +#[allow(unused_imports)] // only here for Rust 1.8 +use net2::UdpSocketExt; +use iovec::IoVec; + +pub struct UdpSocket { + io: net::UdpSocket, +} + +impl UdpSocket { + pub fn new(socket: net::UdpSocket) -> io::Result<UdpSocket> { + socket.set_nonblocking(true)?; + Ok(UdpSocket { + io: socket, + }) + } + + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.local_addr() + } + + pub fn try_clone(&self) -> io::Result<UdpSocket> { + self.io.try_clone().map(|io| { + UdpSocket { + io, + } + }) + } + + pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + self.io.send_to(buf, target) + } + + pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io.recv_from(buf) + } + + pub fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io.send(buf) + } + + pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io.recv(buf) + } + + pub fn connect(&self, addr: SocketAddr) + -> io::Result<()> { + self.io.connect(addr) + } + + pub fn broadcast(&self) -> io::Result<bool> { + self.io.broadcast() + } + + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.io.set_broadcast(on) + } + + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + self.io.multicast_loop_v4() + } + + pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.io.set_multicast_loop_v4(on) + } + + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + self.io.multicast_ttl_v4() + } + + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.io.set_multicast_ttl_v4(ttl) + } + + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + self.io.multicast_loop_v6() + } + + pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.io.set_multicast_loop_v6(on) + } + + pub fn ttl(&self) -> io::Result<u32> { + self.io.ttl() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.set_ttl(ttl) + } + + pub fn join_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.join_multicast_v4(multiaddr, interface) + } + + pub fn join_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.join_multicast_v6(multiaddr, interface) + } + + pub fn leave_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.leave_multicast_v4(multiaddr, interface) + } + + pub fn leave_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.leave_multicast_v6(multiaddr, interface) + } + + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.io.set_only_v6(only_v6) + } + + pub fn only_v6(&self) -> io::Result<bool> { + self.io.only_v6() + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.io.take_error() + } + + pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> { + self.io.readv(bufs) + } + + pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> { + self.io.writev(bufs) + } +} + +impl Evented for UdpSocket { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + EventedFd(&self.as_raw_fd()).deregister(poll) + } +} + +impl fmt::Debug for UdpSocket { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.io, f) + } +} + +impl FromRawFd for UdpSocket { + unsafe fn from_raw_fd(fd: RawFd) -> UdpSocket { + UdpSocket { + io: net::UdpSocket::from_raw_fd(fd), + } + } +} + +impl IntoRawFd for UdpSocket { + fn into_raw_fd(self) -> RawFd { + self.io.into_raw_fd() + } +} + +impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} diff --git a/third_party/rust/mio/src/sys/unix/uds.rs b/third_party/rust/mio/src/sys/unix/uds.rs new file mode 100644 index 0000000000..1bf8c5d260 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uds.rs @@ -0,0 +1,262 @@ +use std::io::{Read, Write}; +use std::mem; +use std::net::Shutdown; +use std::os::unix::prelude::*; +use std::path::Path; +use std::ptr; + +use libc; + +use {io, Ready, Poll, PollOpt, Token}; +use event::Evented; +use sys::unix::{cvt, Io}; +use sys::unix::io::{set_nonblock, set_cloexec}; + +trait MyInto<T> { + fn my_into(self) -> T; +} + +impl MyInto<u32> for usize { + fn my_into(self) -> u32 { self as u32 } +} + +impl MyInto<usize> for usize { + fn my_into(self) -> usize { self } +} + +unsafe fn sockaddr_un(path: &Path) + -> io::Result<(libc::sockaddr_un, libc::socklen_t)> { + let mut addr: libc::sockaddr_un = mem::zeroed(); + addr.sun_family = libc::AF_UNIX as libc::sa_family_t; + + let bytes = path.as_os_str().as_bytes(); + + if bytes.len() >= addr.sun_path.len() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, + "path must be shorter than SUN_LEN")) + } + for (dst, src) in addr.sun_path.iter_mut().zip(bytes.iter()) { + *dst = *src as libc::c_char; + } + // null byte for pathname addresses is already there because we zeroed the + // struct + + let mut len = sun_path_offset() + bytes.len(); + match bytes.get(0) { + Some(&0) | None => {} + Some(_) => len += 1, + } + Ok((addr, len as libc::socklen_t)) +} + +fn sun_path_offset() -> usize { + unsafe { + // Work with an actual instance of the type since using a null pointer is UB + let addr: libc::sockaddr_un = mem::uninitialized(); + let base = &addr as *const _ as usize; + let path = &addr.sun_path as *const _ as usize; + path - base + } +} + +#[derive(Debug)] +pub struct UnixSocket { + io: Io, +} + +impl UnixSocket { + /// Returns a new, unbound, non-blocking Unix domain socket + pub fn stream() -> io::Result<UnixSocket> { + #[cfg(target_os = "linux")] + use libc::{SOCK_CLOEXEC, SOCK_NONBLOCK}; + #[cfg(not(target_os = "linux"))] + const SOCK_CLOEXEC: libc::c_int = 0; + #[cfg(not(target_os = "linux"))] + const SOCK_NONBLOCK: libc::c_int = 0; + + unsafe { + if cfg!(target_os = "linux") { + let flags = libc::SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK; + match cvt(libc::socket(libc::AF_UNIX, flags, 0)) { + Ok(fd) => return Ok(UnixSocket::from_raw_fd(fd)), + Err(ref e) if e.raw_os_error() == Some(libc::EINVAL) => {} + Err(e) => return Err(e), + } + } + + let fd = cvt(libc::socket(libc::AF_UNIX, libc::SOCK_STREAM, 0))?; + let fd = UnixSocket::from_raw_fd(fd); + set_cloexec(fd.as_raw_fd())?; + set_nonblock(fd.as_raw_fd())?; + Ok(fd) + } + } + + /// Connect the socket to the specified address + pub fn connect<P: AsRef<Path> + ?Sized>(&self, addr: &P) -> io::Result<()> { + unsafe { + let (addr, len) = sockaddr_un(addr.as_ref())?; + cvt(libc::connect(self.as_raw_fd(), + &addr as *const _ as *const _, + len))?; + Ok(()) + } + } + + /// Listen for incoming requests + pub fn listen(&self, backlog: usize) -> io::Result<()> { + unsafe { + cvt(libc::listen(self.as_raw_fd(), backlog as i32))?; + Ok(()) + } + } + + pub fn accept(&self) -> io::Result<UnixSocket> { + unsafe { + let fd = cvt(libc::accept(self.as_raw_fd(), + ptr::null_mut(), + ptr::null_mut()))?; + let fd = Io::from_raw_fd(fd); + set_cloexec(fd.as_raw_fd())?; + set_nonblock(fd.as_raw_fd())?; + Ok(UnixSocket { io: fd }) + } + } + + /// Bind the socket to the specified address + pub fn bind<P: AsRef<Path> + ?Sized>(&self, addr: &P) -> io::Result<()> { + unsafe { + let (addr, len) = sockaddr_un(addr.as_ref())?; + cvt(libc::bind(self.as_raw_fd(), + &addr as *const _ as *const _, + len))?; + Ok(()) + } + } + + pub fn try_clone(&self) -> io::Result<UnixSocket> { + Ok(UnixSocket { io: self.io.try_clone()? }) + } + + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + let how = match how { + Shutdown::Read => libc::SHUT_RD, + Shutdown::Write => libc::SHUT_WR, + Shutdown::Both => libc::SHUT_RDWR, + }; + unsafe { + cvt(libc::shutdown(self.as_raw_fd(), how))?; + Ok(()) + } + } + + pub fn read_recv_fd(&mut self, buf: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> { + unsafe { + let mut iov = libc::iovec { + iov_base: buf.as_mut_ptr() as *mut _, + iov_len: buf.len(), + }; + struct Cmsg { + hdr: libc::cmsghdr, + data: [libc::c_int; 1], + } + let mut cmsg: Cmsg = mem::zeroed(); + let mut msg: libc::msghdr = mem::zeroed(); + msg.msg_iov = &mut iov; + msg.msg_iovlen = 1; + msg.msg_control = &mut cmsg as *mut _ as *mut _; + msg.msg_controllen = mem::size_of_val(&cmsg).my_into(); + let bytes = cvt(libc::recvmsg(self.as_raw_fd(), &mut msg, 0))?; + + const SCM_RIGHTS: libc::c_int = 1; + + let fd = if cmsg.hdr.cmsg_level == libc::SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS { + Some(cmsg.data[0]) + } else { + None + }; + Ok((bytes as usize, fd)) + } + } + + pub fn write_send_fd(&mut self, buf: &[u8], fd: RawFd) -> io::Result<usize> { + unsafe { + let mut iov = libc::iovec { + iov_base: buf.as_ptr() as *mut _, + iov_len: buf.len(), + }; + struct Cmsg { + hdr: libc::cmsghdr, + data: [libc::c_int; 1], + } + let mut cmsg: Cmsg = mem::zeroed(); + cmsg.hdr.cmsg_len = mem::size_of_val(&cmsg).my_into(); + cmsg.hdr.cmsg_level = libc::SOL_SOCKET; + cmsg.hdr.cmsg_type = 1; // SCM_RIGHTS + cmsg.data[0] = fd; + let mut msg: libc::msghdr = mem::zeroed(); + msg.msg_iov = &mut iov; + msg.msg_iovlen = 1; + msg.msg_control = &mut cmsg as *mut _ as *mut _; + msg.msg_controllen = mem::size_of_val(&cmsg).my_into(); + let bytes = cvt(libc::sendmsg(self.as_raw_fd(), &msg, 0))?; + Ok(bytes as usize) + } + } +} + +impl Read for UnixSocket { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.io.read(buf) + } +} + +impl Write for UnixSocket { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.io.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl Evented for UnixSocket { + fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.io.register(poll, token, interest, opts) + } + + fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { + self.io.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.io.deregister(poll) + } +} + + +impl From<Io> for UnixSocket { + fn from(io: Io) -> UnixSocket { + UnixSocket { io } + } +} + +impl FromRawFd for UnixSocket { + unsafe fn from_raw_fd(fd: RawFd) -> UnixSocket { + UnixSocket { io: Io::from_raw_fd(fd) } + } +} + +impl IntoRawFd for UnixSocket { + fn into_raw_fd(self) -> RawFd { + self.io.into_raw_fd() + } +} + +impl AsRawFd for UnixSocket { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} diff --git a/third_party/rust/mio/src/sys/unix/uio.rs b/third_party/rust/mio/src/sys/unix/uio.rs new file mode 100644 index 0000000000..e38cd4983b --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uio.rs @@ -0,0 +1,44 @@ +use std::cmp; +use std::io; +use std::os::unix::io::AsRawFd; +use libc; +use iovec::IoVec; +use iovec::unix as iovec; + +pub trait VecIo { + fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize>; + + fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize>; +} + +impl<T: AsRawFd> VecIo for T { + fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> { + unsafe { + let slice = iovec::as_os_slice_mut(bufs); + let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); + let rc = libc::readv(self.as_raw_fd(), + slice.as_ptr(), + len as libc::c_int); + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(rc as usize) + } + } + } + + fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> { + unsafe { + let slice = iovec::as_os_slice(bufs); + let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); + let rc = libc::writev(self.as_raw_fd(), + slice.as_ptr(), + len as libc::c_int); + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(rc as usize) + } + } + } +}
\ No newline at end of file |