From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../rust/mio-0.6.23/src/sys/fuchsia/awakener.rs | 73 ++++ .../rust/mio-0.6.23/src/sys/fuchsia/eventedfd.rs | 263 ++++++++++++ .../rust/mio-0.6.23/src/sys/fuchsia/handles.rs | 78 ++++ third_party/rust/mio-0.6.23/src/sys/fuchsia/mod.rs | 177 ++++++++ third_party/rust/mio-0.6.23/src/sys/fuchsia/net.rs | 444 +++++++++++++++++++++ .../rust/mio-0.6.23/src/sys/fuchsia/ready.rs | 181 +++++++++ .../rust/mio-0.6.23/src/sys/fuchsia/selector.rs | 353 ++++++++++++++++ 7 files changed, 1569 insertions(+) create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/awakener.rs create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/eventedfd.rs create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/handles.rs create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/mod.rs create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/net.rs create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/ready.rs create mode 100644 third_party/rust/mio-0.6.23/src/sys/fuchsia/selector.rs (limited to 'third_party/rust/mio-0.6.23/src/sys/fuchsia') diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/awakener.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/awakener.rs new file mode 100644 index 0000000000..19bc762429 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/awakener.rs @@ -0,0 +1,73 @@ +use {io, poll, Evented, Ready, Poll, PollOpt, Token}; +use zircon; +use std::sync::{Arc, Mutex, Weak}; + +pub struct Awakener { + /// Token and weak reference to the port on which Awakener was registered. + /// + /// When `Awakener::wakeup` is called, these are used to send a wakeup message to the port. + inner: Mutex)>>, +} + +impl Awakener { + /// Create a new `Awakener`. + pub fn new() -> io::Result { + Ok(Awakener { + inner: Mutex::new(None) + }) + } + + /// Send a wakeup signal to the `Selector` on which the `Awakener` was registered. + pub fn wakeup(&self) -> io::Result<()> { + let inner_locked = self.inner.lock().unwrap(); + let &(token, ref weak_port) = + inner_locked.as_ref().expect("Called wakeup on unregistered awakener."); + + let port = weak_port.upgrade().expect("Tried to wakeup a closed port."); + + let status = 0; // arbitrary + let packet = zircon::Packet::from_user_packet( + token.0 as u64, status, zircon::UserPacket::from_u8_array([0; 32])); + + Ok(port.queue(&packet)?) + } + + pub fn cleanup(&self) {} +} + +impl Evented for Awakener { + fn register(&self, + poll: &Poll, + token: Token, + _events: Ready, + _opts: PollOpt) -> io::Result<()> + { + let mut inner_locked = self.inner.lock().unwrap(); + if inner_locked.is_some() { + panic!("Called register on already-registered Awakener."); + } + *inner_locked = Some((token, Arc::downgrade(poll::selector(poll).port()))); + + Ok(()) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + _events: Ready, + _opts: PollOpt) -> io::Result<()> + { + let mut inner_locked = self.inner.lock().unwrap(); + *inner_locked = Some((token, Arc::downgrade(poll::selector(poll).port()))); + + Ok(()) + } + + fn deregister(&self, _poll: &Poll) -> io::Result<()> + { + let mut inner_locked = self.inner.lock().unwrap(); + *inner_locked = None; + + Ok(()) + } +} \ No newline at end of file diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/eventedfd.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/eventedfd.rs new file mode 100644 index 0000000000..e23d0c4a1e --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/eventedfd.rs @@ -0,0 +1,263 @@ +use {io, poll, Evented, Ready, Poll, PollOpt, Token}; +use libc; +use zircon; +use zircon::AsHandleRef; +use sys::fuchsia::{DontDrop, poll_opts_to_wait_async, sys}; +use std::mem; +use std::os::unix::io::RawFd; +use std::sync::{Arc, Mutex}; + +/// Properties of an `EventedFd`'s current registration +#[derive(Debug)] +pub struct EventedFdRegistration { + token: Token, + handle: DontDrop, + rereg_signals: Option<(zircon::Signals, zircon::WaitAsyncOpts)>, +} + +impl EventedFdRegistration { + unsafe fn new(token: Token, + raw_handle: sys::zx_handle_t, + rereg_signals: Option<(zircon::Signals, zircon::WaitAsyncOpts)>, + ) -> Self + { + EventedFdRegistration { + token: token, + handle: DontDrop::new(zircon::Handle::from_raw(raw_handle)), + rereg_signals: rereg_signals + } + } + + pub fn rereg_signals(&self) -> Option<(zircon::Signals, zircon::WaitAsyncOpts)> { + self.rereg_signals + } +} + +/// An event-ed file descriptor. The file descriptor is owned by this structure. +#[derive(Debug)] +pub struct EventedFdInner { + /// Properties of the current registration. + registration: Mutex>, + + /// Owned file descriptor. + /// + /// `fd` is closed on `Drop`, so modifying `fd` is a memory-unsafe operation. + fd: RawFd, + + /// Owned `fdio_t` pointer. + fdio: *const sys::fdio_t, +} + +impl EventedFdInner { + pub fn rereg_for_level(&self, port: &zircon::Port) { + let registration_opt = self.registration.lock().unwrap(); + if let Some(ref registration) = *registration_opt { + if let Some((rereg_signals, rereg_opts)) = registration.rereg_signals { + let _res = + registration + .handle.inner_ref() + .wait_async_handle( + port, + registration.token.0 as u64, + rereg_signals, + rereg_opts); + } + } + } + + pub fn registration(&self) -> &Mutex> { + &self.registration + } + + pub fn fdio(&self) -> &sys::fdio_t { + unsafe { &*self.fdio } + } +} + +impl Drop for EventedFdInner { + fn drop(&mut self) { + unsafe { + sys::__fdio_release(self.fdio); + let _ = libc::close(self.fd); + } + } +} + +// `EventedInner` must be manually declared `Send + Sync` because it contains a `RawFd` and a +// `*const sys::fdio_t`. These are only used to make thread-safe system calls, so accessing +// them is entirely thread-safe. +// +// Note: one minor exception to this are the calls to `libc::close` and `__fdio_release`, which +// happen on `Drop`. These accesses are safe because `drop` can only be called at most once from +// a single thread, and after it is called no other functions can be called on the `EventedFdInner`. +unsafe impl Sync for EventedFdInner {} +unsafe impl Send for EventedFdInner {} + +#[derive(Clone, Debug)] +pub struct EventedFd { + pub inner: Arc +} + +impl EventedFd { + pub unsafe fn new(fd: RawFd) -> Self { + let fdio = sys::__fdio_fd_to_io(fd); + assert!(fdio != ::std::ptr::null(), "FileDescriptor given to EventedFd must be valid."); + + EventedFd { + inner: Arc::new(EventedFdInner { + registration: Mutex::new(None), + fd: fd, + fdio: fdio, + }) + } + } + + fn handle_and_signals_for_events(&self, interest: Ready, opts: PollOpt) + -> (sys::zx_handle_t, zircon::Signals) + { + let epoll_events = ioevent_to_epoll(interest, opts); + + unsafe { + let mut raw_handle: sys::zx_handle_t = mem::uninitialized(); + let mut signals: sys::zx_signals_t = mem::uninitialized(); + sys::__fdio_wait_begin(self.inner.fdio, epoll_events, &mut raw_handle, &mut signals); + + (raw_handle, signals) + } + } + + fn register_with_lock( + &self, + registration: &mut Option, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + if registration.is_some() { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "Called register on an already registered file descriptor.")); + } + + let (raw_handle, signals) = self.handle_and_signals_for_events(interest, opts); + + let needs_rereg = opts.is_level() && !opts.is_oneshot(); + + // If we need to reregister, then each registration should be `oneshot` + let opts = opts | if needs_rereg { PollOpt::oneshot() } else { PollOpt::empty() }; + + let rereg_signals = if needs_rereg { + Some((signals, poll_opts_to_wait_async(opts))) + } else { + None + }; + + *registration = Some( + unsafe { EventedFdRegistration::new(token, raw_handle, rereg_signals) } + ); + + // We don't have ownership of the handle, so we can't drop it + let handle = DontDrop::new(unsafe { zircon::Handle::from_raw(raw_handle) }); + + let registered = poll::selector(poll) + .register_fd(handle.inner_ref(), self, token, signals, opts); + + if registered.is_err() { + *registration = None; + } + + registered + } + + fn deregister_with_lock( + &self, + registration: &mut Option, + poll: &Poll) -> io::Result<()> + { + let old_registration = if let Some(old_reg) = registration.take() { + old_reg + } else { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "Called rereregister on an unregistered file descriptor.")) + }; + + poll::selector(poll) + .deregister_fd(old_registration.handle.inner_ref(), old_registration.token) + } +} + +impl Evented for EventedFd { + fn register(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.register_with_lock( + &mut *self.inner.registration.lock().unwrap(), + poll, + token, + interest, + opts) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + // Take out the registration lock + let mut registration_lock = self.inner.registration.lock().unwrap(); + + // Deregister + self.deregister_with_lock(&mut *registration_lock, poll)?; + + self.register_with_lock( + &mut *registration_lock, + poll, + token, + interest, + opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + let mut registration_lock = self.inner.registration.lock().unwrap(); + self.deregister_with_lock(&mut *registration_lock, poll) + } +} + +fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 { + use event_imp::ready_from_usize; + const HUP: usize = 0b01000; + + let mut kind = 0; + + if interest.is_readable() { + kind |= libc::EPOLLIN; + } + + if interest.is_writable() { + kind |= libc::EPOLLOUT; + } + + if interest.contains(ready_from_usize(HUP)) { + kind |= libc::EPOLLRDHUP; + } + + if opts.is_edge() { + kind |= libc::EPOLLET; + } + + if opts.is_oneshot() { + kind |= libc::EPOLLONESHOT; + } + + if opts.is_level() { + kind &= !libc::EPOLLET; + } + + kind as u32 +} diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/handles.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/handles.rs new file mode 100644 index 0000000000..ae6f07f6d9 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/handles.rs @@ -0,0 +1,78 @@ +use {io, poll, Evented, Ready, Poll, PollOpt, Token}; +use zircon_sys::zx_handle_t; +use std::sync::Mutex; + +/// Wrapper for registering a `HandleBase` type with mio. +#[derive(Debug)] +pub struct EventedHandle { + /// The handle to be registered. + handle: zx_handle_t, + + /// The current `Token` with which the handle is registered with mio. + token: Mutex>, +} + +impl EventedHandle { + /// Create a new `EventedHandle` which can be registered with mio + /// in order to receive event notifications. + /// + /// The underlying handle must not be dropped while the + /// `EventedHandle` still exists. + pub unsafe fn new(handle: zx_handle_t) -> Self { + EventedHandle { + handle: handle, + token: Mutex::new(None), + } + } + + /// Get the underlying handle being registered. + pub fn get_handle(&self) -> zx_handle_t { + self.handle + } +} + +impl Evented for EventedHandle { + fn register(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + let mut this_token = self.token.lock().unwrap(); + { + poll::selector(poll).register_handle(self.handle, token, interest, opts)?; + *this_token = Some(token); + } + Ok(()) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + let mut this_token = self.token.lock().unwrap(); + { + poll::selector(poll).deregister_handle(self.handle, token)?; + *this_token = None; + poll::selector(poll).register_handle(self.handle, token, interest, opts)?; + *this_token = Some(token); + } + Ok(()) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + let mut this_token = self.token.lock().unwrap(); + let token = if let Some(token) = *this_token { token } else { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "Attempted to deregister an unregistered handle.")) + }; + { + poll::selector(poll).deregister_handle(self.handle, token)?; + *this_token = None; + } + Ok(()) + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/mod.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/mod.rs new file mode 100644 index 0000000000..10728fc8dc --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/mod.rs @@ -0,0 +1,177 @@ +use {io, Ready, PollOpt}; +use libc; +use zircon; +use std::mem; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::RawFd; + +mod awakener; +mod handles; +mod eventedfd; +mod net; +mod ready; +mod selector; + +use self::eventedfd::{EventedFd, EventedFdInner}; +use self::ready::assert_fuchsia_ready_repr; + +pub use self::awakener::Awakener; +pub use self::handles::EventedHandle; +pub use self::net::{TcpListener, TcpStream, UdpSocket}; +pub use self::selector::{Events, Selector}; +pub use self::ready::{FuchsiaReady, zx_signals_t}; + +// Set non-blocking (workaround since the std version doesn't work in fuchsia) +// TODO: fix the std version and replace this +pub fn set_nonblock(fd: RawFd) -> io::Result<()> { + cvt(unsafe { libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) }).map(|_| ()) +} + +/// Workaround until fuchsia's recv_from is fixed +unsafe fn recv_from(fd: RawFd, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + let flags = 0; + + let n = cvt( + libc::recv(fd, + buf.as_mut_ptr() as *mut libc::c_void, + buf.len(), + flags) + )?; + + // random address-- we don't use it + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + Ok((n as usize, addr)) +} + +mod sys { + #![allow(non_camel_case_types)] + use std::os::unix::io::RawFd; + pub use zircon_sys::{zx_handle_t, zx_signals_t}; + + // 17 fn pointers we don't need for mio :) + pub type fdio_ops_t = [usize; 17]; + + pub type atomic_int_fast32_t = usize; // TODO: https://github.com/rust-lang/libc/issues/631 + + #[repr(C)] + pub struct fdio_t { + pub ops: *const fdio_ops_t, + pub magic: u32, + pub refcount: atomic_int_fast32_t, + pub dupcount: u32, + pub flags: u32, + } + + #[link(name="fdio")] + extern { + pub fn __fdio_fd_to_io(fd: RawFd) -> *const fdio_t; + pub fn __fdio_release(io: *const fdio_t); + + pub fn __fdio_wait_begin( + io: *const fdio_t, + events: u32, + handle_out: &mut zx_handle_t, + signals_out: &mut zx_signals_t, + ); + pub fn __fdio_wait_end( + io: *const fdio_t, + signals: zx_signals_t, + events_out: &mut u32, + ); + } +} + +fn epoll_event_to_ready(epoll: u32) -> Ready { + let epoll = epoll as i32; // casts the bits directly + let mut kind = Ready::empty(); + + if (epoll & libc::EPOLLIN) != 0 || (epoll & libc::EPOLLPRI) != 0 { + kind = kind | Ready::readable(); + } + + if (epoll & libc::EPOLLOUT) != 0 { + kind = kind | Ready::writable(); + } + + kind + + /* TODO: support? + // EPOLLHUP - Usually means a socket error happened + if (epoll & libc::EPOLLERR) != 0 { + kind = kind | UnixReady::error(); + } + + if (epoll & libc::EPOLLRDHUP) != 0 || (epoll & libc::EPOLLHUP) != 0 { + kind = kind | UnixReady::hup(); + } + */ +} + +fn poll_opts_to_wait_async(poll_opts: PollOpt) -> zircon::WaitAsyncOpts { + if poll_opts.is_oneshot() { + zircon::WaitAsyncOpts::Once + } else { + zircon::WaitAsyncOpts::Repeating + } +} + +trait IsMinusOne { + fn is_minus_one(&self) -> bool; +} + +impl IsMinusOne for i32 { + fn is_minus_one(&self) -> bool { *self == -1 } +} + +impl IsMinusOne for isize { + fn is_minus_one(&self) -> bool { *self == -1 } +} + +fn cvt(t: T) -> ::io::Result { + use std::io; + + if t.is_minus_one() { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } +} + +/// Utility type to prevent the type inside of it from being dropped. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct DontDrop(Option); + +impl DontDrop { + fn new(t: T) -> DontDrop { + DontDrop(Some(t)) + } + + fn inner_ref(&self) -> &T { + self.0.as_ref().unwrap() + } + + fn inner_mut(&mut self) -> &mut T { + self.0.as_mut().unwrap() + } +} + +impl Deref for DontDrop { + type Target = T; + fn deref(&self) -> &Self::Target { + self.inner_ref() + } +} + +impl DerefMut for DontDrop { + fn deref_mut(&mut self) -> &mut Self::Target { + self.inner_mut() + } +} + +impl Drop for DontDrop { + fn drop(&mut self) { + let inner = self.0.take(); + mem::forget(inner); + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/net.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/net.rs new file mode 100644 index 0000000000..d43ad27bb5 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/net.rs @@ -0,0 +1,444 @@ +use {io, Evented, Ready, Poll, PollOpt, Token}; +use iovec::IoVec; +use iovec::unix as iovec; +use libc; +use net2::TcpStreamExt; +#[allow(unused_imports)] // only here for Rust 1.8 +use net2::UdpSocketExt; +use sys::fuchsia::{recv_from, set_nonblock, EventedFd, DontDrop}; +use std::cmp; +use std::io::{Read, Write}; +use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::os::unix::io::AsRawFd; +use std::time::Duration; + +#[derive(Debug)] +pub struct TcpStream { + io: DontDrop, + evented_fd: EventedFd, +} + +impl TcpStream { + pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result { + try!(set_nonblock(stream.as_raw_fd())); + + let connected = stream.connect(addr); + match connected { + Ok(..) => {} + Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(e) => return Err(e), + } + + let evented_fd = unsafe { EventedFd::new(stream.as_raw_fd()) }; + + return Ok(TcpStream { + io: DontDrop::new(stream), + evented_fd: evented_fd, + }) + } + + pub fn from_stream(stream: net::TcpStream) -> TcpStream { + let evented_fd = unsafe { EventedFd::new(stream.as_raw_fd()) }; + + TcpStream { + io: DontDrop::new(stream), + evented_fd: evented_fd, + } + } + + pub fn peer_addr(&self) -> io::Result { + self.io.peer_addr() + } + + pub fn local_addr(&self) -> io::Result { + self.io.local_addr() + } + + pub fn try_clone(&self) -> io::Result { + self.io.try_clone().map(|s| { + let evented_fd = unsafe { EventedFd::new(s.as_raw_fd()) }; + TcpStream { + io: DontDrop::new(s), + evented_fd: evented_fd, + } + }) + } + + pub fn shutdown(&self, how: net::Shutdown) -> io::Result<()> { + self.io.shutdown(how) + } + + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.io.set_nodelay(nodelay) + } + + pub fn nodelay(&self) -> io::Result { + self.io.nodelay() + } + + pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.set_recv_buffer_size(size) + } + + pub fn recv_buffer_size(&self) -> io::Result { + self.io.recv_buffer_size() + } + + pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.set_send_buffer_size(size) + } + + pub fn send_buffer_size(&self) -> io::Result { + self.io.send_buffer_size() + } + + pub fn set_keepalive(&self, keepalive: Option) -> io::Result<()> { + self.io.set_keepalive(keepalive) + } + + pub fn keepalive(&self) -> io::Result> { + self.io.keepalive() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.set_ttl(ttl) + } + + pub fn ttl(&self) -> io::Result { + self.io.ttl() + } + + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.io.set_only_v6(only_v6) + } + + pub fn only_v6(&self) -> io::Result { + self.io.only_v6() + } + + pub fn set_linger(&self, dur: Option) -> io::Result<()> { + self.io.set_linger(dur) + } + + pub fn linger(&self) -> io::Result> { + self.io.linger() + } + + pub fn take_error(&self) -> io::Result> { + self.io.take_error() + } + + pub fn peek(&self, buf: &mut [u8]) -> io::Result { + self.io.peek(buf) + } + + pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result { + unsafe { + let slice = iovec::as_os_slice_mut(bufs); + let len = cmp::min(::max_value() as usize, slice.len()); + let rc = libc::readv(self.io.as_raw_fd(), + slice.as_ptr(), + len as libc::c_int); + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(rc as usize) + } + } + } + + pub fn writev(&self, bufs: &[&IoVec]) -> io::Result { + unsafe { + let slice = iovec::as_os_slice(bufs); + let len = cmp::min(::max_value() as usize, slice.len()); + let rc = libc::writev(self.io.as_raw_fd(), + slice.as_ptr(), + len as libc::c_int); + if rc < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(rc as usize) + } + } + } +} + +impl<'a> Read for &'a TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.io.inner_ref().read(buf) + } +} + +impl<'a> Write for &'a TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.io.inner_ref().write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.io.inner_ref().flush() + } +} + +impl Evented for TcpStream { + fn register(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.evented_fd.register(poll, token, interest, opts) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.evented_fd.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.evented_fd.deregister(poll) + } +} + +#[derive(Debug)] +pub struct TcpListener { + io: DontDrop, + evented_fd: EventedFd, +} + +impl TcpListener { + pub fn new(inner: net::TcpListener) -> io::Result { + set_nonblock(inner.as_raw_fd())?; + + let evented_fd = unsafe { EventedFd::new(inner.as_raw_fd()) }; + + Ok(TcpListener { + io: DontDrop::new(inner), + evented_fd: evented_fd, + }) + } + + pub fn local_addr(&self) -> io::Result { + self.io.local_addr() + } + + pub fn try_clone(&self) -> io::Result { + self.io.try_clone().map(|io| { + let evented_fd = unsafe { EventedFd::new(io.as_raw_fd()) }; + TcpListener { + io: DontDrop::new(io), + evented_fd: evented_fd, + } + }) + } + + pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + self.io.accept().and_then(|(s, a)| { + set_nonblock(s.as_raw_fd())?; + let evented_fd = unsafe { EventedFd::new(s.as_raw_fd()) }; + return Ok((TcpStream { + io: DontDrop::new(s), + evented_fd: evented_fd, + }, a)) + }) + } + + #[allow(deprecated)] + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.io.set_only_v6(only_v6) + } + + #[allow(deprecated)] + pub fn only_v6(&self) -> io::Result { + self.io.only_v6() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.set_ttl(ttl) + } + + pub fn ttl(&self) -> io::Result { + self.io.ttl() + } + + pub fn take_error(&self) -> io::Result> { + self.io.take_error() + } +} + +impl Evented for TcpListener { + fn register(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.evented_fd.register(poll, token, interest, opts) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.evented_fd.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.evented_fd.deregister(poll) + } +} + +#[derive(Debug)] +pub struct UdpSocket { + io: DontDrop, + evented_fd: EventedFd, +} + +impl UdpSocket { + pub fn new(socket: net::UdpSocket) -> io::Result { + set_nonblock(socket.as_raw_fd())?; + + let evented_fd = unsafe { EventedFd::new(socket.as_raw_fd()) }; + + Ok(UdpSocket { + io: DontDrop::new(socket), + evented_fd: evented_fd, + }) + } + + pub fn local_addr(&self) -> io::Result { + self.io.local_addr() + } + + pub fn try_clone(&self) -> io::Result { + self.io.try_clone().and_then(|io| { + UdpSocket::new(io) + }) + } + + pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result { + self.io.send_to(buf, target) + } + + pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + unsafe { recv_from(self.io.as_raw_fd(), buf) } + } + + pub fn send(&self, buf: &[u8]) -> io::Result { + self.io.send(buf) + } + + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.io.recv(buf) + } + + pub fn connect(&self, addr: SocketAddr) + -> io::Result<()> { + self.io.connect(addr) + } + + pub fn broadcast(&self) -> io::Result { + self.io.broadcast() + } + + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.io.set_broadcast(on) + } + + pub fn multicast_loop_v4(&self) -> io::Result { + self.io.multicast_loop_v4() + } + + pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.io.set_multicast_loop_v4(on) + } + + pub fn multicast_ttl_v4(&self) -> io::Result { + self.io.multicast_ttl_v4() + } + + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.io.set_multicast_ttl_v4(ttl) + } + + pub fn multicast_loop_v6(&self) -> io::Result { + self.io.multicast_loop_v6() + } + + pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.io.set_multicast_loop_v6(on) + } + + pub fn ttl(&self) -> io::Result { + self.io.ttl() + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.set_ttl(ttl) + } + + pub fn join_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.join_multicast_v4(multiaddr, interface) + } + + pub fn join_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.join_multicast_v6(multiaddr, interface) + } + + pub fn leave_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.leave_multicast_v4(multiaddr, interface) + } + + pub fn leave_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.leave_multicast_v6(multiaddr, interface) + } + + pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { + self.io.set_only_v6(only_v6) + } + + pub fn only_v6(&self) -> io::Result { + self.io.only_v6() + } + + + pub fn take_error(&self) -> io::Result> { + self.io.take_error() + } +} + +impl Evented for UdpSocket { + fn register(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.evented_fd.register(poll, token, interest, opts) + } + + fn reregister(&self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt) -> io::Result<()> + { + self.evented_fd.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> io::Result<()> { + self.evented_fd.deregister(poll) + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/ready.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/ready.rs new file mode 100644 index 0000000000..97854f8c07 --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/ready.rs @@ -0,0 +1,181 @@ +use event_imp::{Ready, ready_as_usize, ready_from_usize}; +pub use zircon_sys::{ + zx_signals_t, + ZX_OBJECT_READABLE, + ZX_OBJECT_WRITABLE, +}; +use std::ops; + +// The following impls are valid because Fuchsia and mio both represent +// "readable" as `1 << 0` and "writable" as `1 << 2`. +// We define this assertion here and call it from `Selector::new`, +// since `Selector:;new` is guaranteed to be called during a standard mio runtime, +// unlike the functions in this file. +#[inline] +pub fn assert_fuchsia_ready_repr() { + debug_assert!( + ZX_OBJECT_READABLE.bits() as usize == ready_as_usize(Ready::readable()), + "Zircon ZX_OBJECT_READABLE should have the same repr as Ready::readable()" + ); + debug_assert!( + ZX_OBJECT_WRITABLE.bits() as usize == ready_as_usize(Ready::writable()), + "Zircon ZX_OBJECT_WRITABLE should have the same repr as Ready::writable()" + ); +} + +/// Fuchsia specific extensions to `Ready` +/// +/// Provides additional readiness event kinds that are available on Fuchsia. +/// +/// Conversion traits are implemented between `Ready` and `FuchsiaReady`. +/// +/// For high level documentation on polling and readiness, see [`Poll`]. +/// +/// [`Poll`]: struct.Poll.html +#[derive(Debug, Copy, PartialEq, Eq, Clone, PartialOrd, Ord)] +pub struct FuchsiaReady(Ready); + +impl FuchsiaReady { + /// Returns the `FuchsiaReady` as raw zircon signals. + /// This function is just a more explicit, non-generic version of + /// `FuchsiaReady::into`. + #[inline] + pub fn into_zx_signals(self) -> zx_signals_t { + zx_signals_t::from_bits_truncate(ready_as_usize(self.0) as u32) + } +} + +impl Into for FuchsiaReady { + #[inline] + fn into(self) -> zx_signals_t { + self.into_zx_signals() + } +} + +impl From for FuchsiaReady { + #[inline] + fn from(src: zx_signals_t) -> Self { + FuchsiaReady(src.into()) + } +} + +impl From for Ready { + #[inline] + fn from(src: zx_signals_t) -> Self { + ready_from_usize(src.bits() as usize) + } +} + +impl From for FuchsiaReady { + #[inline] + fn from(src: Ready) -> FuchsiaReady { + FuchsiaReady(src) + } +} + +impl From for Ready { + #[inline] + fn from(src: FuchsiaReady) -> Ready { + src.0 + } +} + +impl ops::Deref for FuchsiaReady { + type Target = Ready; + + #[inline] + fn deref(&self) -> &Ready { + &self.0 + } +} + +impl ops::DerefMut for FuchsiaReady { + #[inline] + fn deref_mut(&mut self) -> &mut Ready { + &mut self.0 + } +} + +impl ops::BitOr for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn bitor(self, other: FuchsiaReady) -> FuchsiaReady { + (self.0 | other.0).into() + } +} + +impl ops::BitXor for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn bitxor(self, other: FuchsiaReady) -> FuchsiaReady { + (self.0 ^ other.0).into() + } +} + +impl ops::BitAnd for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn bitand(self, other: FuchsiaReady) -> FuchsiaReady { + (self.0 & other.0).into() + } +} + +impl ops::Sub for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn sub(self, other: FuchsiaReady) -> FuchsiaReady { + (self.0 & !other.0).into() + } +} + +#[deprecated(since = "0.6.10", note = "removed")] +#[cfg(feature = "with-deprecated")] +#[doc(hidden)] +impl ops::Not for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn not(self) -> FuchsiaReady { + (!self.0).into() + } +} + +impl ops::BitOr for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn bitor(self, other: zx_signals_t) -> FuchsiaReady { + self | FuchsiaReady::from(other) + } +} + +impl ops::BitXor for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn bitxor(self, other: zx_signals_t) -> FuchsiaReady { + self ^ FuchsiaReady::from(other) + } +} + +impl ops::BitAnd for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn bitand(self, other: zx_signals_t) -> FuchsiaReady { + self & FuchsiaReady::from(other) + } +} + +impl ops::Sub for FuchsiaReady { + type Output = FuchsiaReady; + + #[inline] + fn sub(self, other: zx_signals_t) -> FuchsiaReady { + self - FuchsiaReady::from(other) + } +} diff --git a/third_party/rust/mio-0.6.23/src/sys/fuchsia/selector.rs b/third_party/rust/mio-0.6.23/src/sys/fuchsia/selector.rs new file mode 100644 index 0000000000..27226ac5ff --- /dev/null +++ b/third_party/rust/mio-0.6.23/src/sys/fuchsia/selector.rs @@ -0,0 +1,353 @@ +use {io, Event, PollOpt, Ready, Token}; +use sys::fuchsia::{ + assert_fuchsia_ready_repr, + epoll_event_to_ready, + poll_opts_to_wait_async, + EventedFd, + EventedFdInner, + FuchsiaReady, +}; +use zircon; +use zircon::AsHandleRef; +use zircon_sys::zx_handle_t; +use std::collections::hash_map; +use std::fmt; +use std::mem; +use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; +use sys; + +/// The kind of registration-- file descriptor or handle. +/// +/// The last bit of a token is set to indicate the type of the registration. +#[derive(Copy, Clone, Eq, PartialEq)] +enum RegType { + Fd, + Handle, +} + +fn key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result { + let key = token.0 as u64; + let msb = 1u64 << 63; + if (key & msb) != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Most-significant bit of token must remain unset.")); + } + + Ok(match reg_type { + RegType::Fd => key, + RegType::Handle => key | msb, + }) +} + +fn token_and_type_from_key(key: u64) -> (Token, RegType) { + let msb = 1u64 << 63; + ( + Token((key & !msb) as usize), + if (key & msb) == 0 { + RegType::Fd + } else { + RegType::Handle + } + ) +} + +/// Each Selector has a globally unique(ish) ID associated with it. This ID +/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first +/// registered with the `Selector`. If a type that is previously associated with +/// a `Selector` attempts to register itself with a different `Selector`, the +/// operation will return with an error. This matches windows behavior. +static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; + +pub struct Selector { + id: usize, + + /// Zircon object on which the handles have been registered, and on which events occur + port: Arc, + + /// Whether or not `tokens_to_rereg` contains any elements. This is a best-effort attempt + /// used to prevent having to lock `tokens_to_rereg` when it is empty. + has_tokens_to_rereg: AtomicBool, + + /// List of `Token`s corresponding to registrations that need to be reregistered before the + /// next `port::wait`. This is necessary to provide level-triggered behavior for + /// `Async::repeating` registrations. + /// + /// When a level-triggered `Async::repeating` event is seen, its token is added to this list so + /// that it will be reregistered before the next `port::wait` call, making `port::wait` return + /// immediately if the signal was high during the reregistration. + /// + /// Note: when used at the same time, the `tokens_to_rereg` lock should be taken out _before_ + /// `token_to_fd`. + tokens_to_rereg: Mutex>, + + /// Map from tokens to weak references to `EventedFdInner`-- a structure describing a + /// file handle, its associated `fdio` object, and its current registration. + token_to_fd: Mutex>>, +} + +impl Selector { + pub fn new() -> io::Result { + // Assertion from fuchsia/ready.rs to make sure that FuchsiaReady's representation is + // compatible with Ready. + assert_fuchsia_ready_repr(); + + let port = Arc::new( + zircon::Port::create(zircon::PortOpts::Default)? + ); + + // offset by 1 to avoid choosing 0 as the id of a selector + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1; + + let has_tokens_to_rereg = AtomicBool::new(false); + let tokens_to_rereg = Mutex::new(Vec::new()); + let token_to_fd = Mutex::new(hash_map::HashMap::new()); + + Ok(Selector { + id: id, + port: port, + has_tokens_to_rereg: has_tokens_to_rereg, + tokens_to_rereg: tokens_to_rereg, + token_to_fd: token_to_fd, + }) + } + + pub fn id(&self) -> usize { + self.id + } + + /// Returns a reference to the underlying port `Arc`. + pub fn port(&self) -> &Arc { &self.port } + + /// Reregisters all registrations pointed to by the `tokens_to_rereg` list + /// if `has_tokens_to_rereg`. + fn reregister_handles(&self) -> io::Result<()> { + // We use `Ordering::Acquire` to make sure that we see all `tokens_to_rereg` + // written before the store using `Ordering::Release`. + if self.has_tokens_to_rereg.load(Ordering::Acquire) { + let mut tokens = self.tokens_to_rereg.lock().unwrap(); + let token_to_fd = self.token_to_fd.lock().unwrap(); + for token in tokens.drain(0..) { + if let Some(eventedfd) = token_to_fd.get(&token) + .and_then(|h| h.upgrade()) { + eventedfd.rereg_for_level(&self.port); + } + } + self.has_tokens_to_rereg.store(false, Ordering::Release); + } + Ok(()) + } + + pub fn select(&self, + evts: &mut Events, + _awakener: Token, + timeout: Option) -> io::Result + { + evts.clear(); + + self.reregister_handles()?; + + let deadline = match timeout { + Some(duration) => { + let nanos = duration.as_secs().saturating_mul(1_000_000_000) + .saturating_add(duration.subsec_nanos() as u64); + + zircon::deadline_after(nanos) + } + None => zircon::ZX_TIME_INFINITE, + }; + + let packet = match self.port.wait(deadline) { + Ok(packet) => packet, + Err(zircon::Status::ErrTimedOut) => return Ok(false), + Err(e) => Err(e)?, + }; + + let observed_signals = match packet.contents() { + zircon::PacketContents::SignalOne(signal_packet) => { + signal_packet.observed() + } + zircon::PacketContents::SignalRep(signal_packet) => { + signal_packet.observed() + } + zircon::PacketContents::User(_user_packet) => { + // User packets are only ever sent by an Awakener + return Ok(true); + } + }; + + let key = packet.key(); + let (token, reg_type) = token_and_type_from_key(key); + + match reg_type { + RegType::Handle => { + // We can return immediately-- no lookup or registration necessary. + evts.events.push(Event::new(Ready::from(observed_signals), token)); + Ok(false) + }, + RegType::Fd => { + // Convert the signals to epoll events using __fdio_wait_end, + // and add to reregistration list if necessary. + let events: u32; + { + let handle = if let Some(handle) = + self.token_to_fd.lock().unwrap() + .get(&token) + .and_then(|h| h.upgrade()) { + handle + } else { + // This handle is apparently in the process of removal. + // It has been removed from the list, but port_cancel has not been called. + return Ok(false); + }; + + events = unsafe { + let mut events: u32 = mem::uninitialized(); + sys::fuchsia::sys::__fdio_wait_end(handle.fdio(), observed_signals, &mut events); + events + }; + + // If necessary, queue to be reregistered before next port_await + let needs_to_rereg = { + let registration_lock = handle.registration().lock().unwrap(); + + registration_lock + .as_ref() + .and_then(|r| r.rereg_signals()) + .is_some() + }; + + if needs_to_rereg { + let mut tokens_to_rereg_lock = self.tokens_to_rereg.lock().unwrap(); + tokens_to_rereg_lock.push(token); + // We use `Ordering::Release` to make sure that we see all `tokens_to_rereg` + // written before the store. + self.has_tokens_to_rereg.store(true, Ordering::Release); + } + } + + evts.events.push(Event::new(epoll_event_to_ready(events), token)); + Ok(false) + }, + } + } + + /// Register event interests for the given IO handle with the OS + pub fn register_fd(&self, + handle: &zircon::Handle, + fd: &EventedFd, + token: Token, + signals: zircon::Signals, + poll_opts: PollOpt) -> io::Result<()> + { + { + let mut token_to_fd = self.token_to_fd.lock().unwrap(); + match token_to_fd.entry(token) { + hash_map::Entry::Occupied(_) => + return Err(io::Error::new(io::ErrorKind::AlreadyExists, + "Attempted to register a filedescriptor on an existing token.")), + hash_map::Entry::Vacant(slot) => slot.insert(Arc::downgrade(&fd.inner)), + }; + } + + let wait_async_opts = poll_opts_to_wait_async(poll_opts); + + let wait_res = handle.wait_async_handle(&self.port, token.0 as u64, signals, wait_async_opts); + + if wait_res.is_err() { + self.token_to_fd.lock().unwrap().remove(&token); + } + + Ok(wait_res?) + } + + /// Deregister event interests for the given IO handle with the OS + pub fn deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()> { + self.token_to_fd.lock().unwrap().remove(&token); + + // We ignore NotFound errors since oneshots are automatically deregistered, + // but mio will attempt to deregister them manually. + self.port.cancel(&*handle, token.0 as u64) + .map_err(io::Error::from) + .or_else(|e| if e.kind() == io::ErrorKind::NotFound { + Ok(()) + } else { + Err(e) + }) + } + + pub fn register_handle(&self, + handle: zx_handle_t, + token: Token, + interests: Ready, + poll_opts: PollOpt) -> io::Result<()> + { + if poll_opts.is_level() && !poll_opts.is_oneshot() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, + "Repeated level-triggered events are not supported on Fuchsia handles.")); + } + + let temp_handle = unsafe { zircon::Handle::from_raw(handle) }; + + let res = temp_handle.wait_async_handle( + &self.port, + key_from_token_and_type(token, RegType::Handle)?, + FuchsiaReady::from(interests).into_zx_signals(), + poll_opts_to_wait_async(poll_opts)); + + mem::forget(temp_handle); + + Ok(res?) + } + + + pub fn deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()> + { + let temp_handle = unsafe { zircon::Handle::from_raw(handle) }; + let res = self.port.cancel(&temp_handle, key_from_token_and_type(token, RegType::Handle)?); + + mem::forget(temp_handle); + + Ok(res?) + } +} + +pub struct Events { + events: Vec +} + +impl Events { + pub fn with_capacity(_u: usize) -> Events { + // The Fuchsia selector only handles one event at a time, + // so we ignore the default capacity and set it to one. + Events { events: Vec::with_capacity(1) } + } + pub fn len(&self) -> usize { + self.events.len() + } + pub fn capacity(&self) -> usize { + self.events.capacity() + } + pub fn is_empty(&self) -> bool { + self.events.is_empty() + } + pub fn get(&self, idx: usize) -> Option { + self.events.get(idx).map(|e| *e) + } + pub fn push_event(&mut self, event: Event) { + self.events.push(event) + } + pub fn clear(&mut self) { + self.events.events.drain(0..); + } +} + +impl fmt::Debug for Events { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Events") + .field("len", &self.len()) + .finish() + } +} -- cgit v1.2.3