//! # Notes //! //! The current implementation is somewhat limited. The `Waker` is not //! implemented, as at the time of writing there is no way to support to wake-up //! a thread from calling `poll_oneoff`. //! //! Furthermore the (re/de)register functions also don't work while concurrently //! polling as both registering and polling requires a lock on the //! `subscriptions`. //! //! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't //! work. However this could be implemented by use of an `Arc`. //! //! In summary, this only (barely) works using a single thread. use std::cmp::min; use std::io; #[cfg(all(feature = "net", debug_assertions))] use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; #[cfg(feature = "net")] use crate::{Interest, Token}; cfg_net! { pub(crate) mod tcp { use std::io; use std::net::{self, SocketAddr}; pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { let (stream, addr) = listener.accept()?; stream.set_nonblocking(true)?; Ok((stream, addr)) } } } /// Unique id for use as `SelectorId`. #[cfg(all(debug_assertions, feature = "net"))] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); pub(crate) struct Selector { #[cfg(all(debug_assertions, feature = "net"))] id: usize, /// Subscriptions (reads events) we're interested in. subscriptions: Arc>>, } impl Selector { pub(crate) fn new() -> io::Result { Ok(Selector { #[cfg(all(debug_assertions, feature = "net"))] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), subscriptions: Arc::new(Mutex::new(Vec::new())), }) } #[cfg(all(debug_assertions, feature = "net"))] pub(crate) fn id(&self) -> usize { self.id } pub(crate) fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { events.clear(); let mut subscriptions = self.subscriptions.lock().unwrap(); // If we want to a use a timeout in the `wasi_poll_oneoff()` function // we need another subscription to the list. if let Some(timeout) = timeout { subscriptions.push(timeout_subscription(timeout)); } // `poll_oneoff` needs the same number of events as subscriptions. let length = subscriptions.len(); events.reserve(length); debug_assert!(events.capacity() >= length); #[cfg(debug_assertions)] if length == 0 { warn!( "calling mio::Poll::poll with empty subscriptions, this likely not what you want" ); } let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) }; // Remove the timeout subscription we possibly added above. if timeout.is_some() { let timeout_sub = subscriptions.pop(); debug_assert_eq!( timeout_sub.unwrap().u.tag, wasi::EVENTTYPE_CLOCK.raw(), "failed to remove timeout subscription" ); } drop(subscriptions); // Unlock. match res { Ok(n_events) => { // Safety: `poll_oneoff` initialises the `events` for us. unsafe { events.set_len(n_events) }; // Remove the timeout event. if timeout.is_some() { if let Some(index) = events.iter().position(is_timeout_event) { events.swap_remove(index); } } check_errors(&events) } Err(err) => Err(io_err(err)), } } pub(crate) fn try_clone(&self) -> io::Result { Ok(Selector { #[cfg(all(debug_assertions, feature = "net"))] id: self.id, subscriptions: self.subscriptions.clone(), }) } #[cfg(feature = "net")] pub(crate) fn register( &self, fd: wasi::Fd, token: Token, interests: Interest, ) -> io::Result<()> { let mut subscriptions = self.subscriptions.lock().unwrap(); if interests.is_writable() { let subscription = wasi::Subscription { userdata: token.0 as wasi::Userdata, u: wasi::SubscriptionU { tag: wasi::EVENTTYPE_FD_WRITE.raw(), u: wasi::SubscriptionUU { fd_write: wasi::SubscriptionFdReadwrite { file_descriptor: fd, }, }, }, }; subscriptions.push(subscription); } if interests.is_readable() { let subscription = wasi::Subscription { userdata: token.0 as wasi::Userdata, u: wasi::SubscriptionU { tag: wasi::EVENTTYPE_FD_READ.raw(), u: wasi::SubscriptionUU { fd_read: wasi::SubscriptionFdReadwrite { file_descriptor: fd, }, }, }, }; subscriptions.push(subscription); } Ok(()) } #[cfg(feature = "net")] pub(crate) fn reregister( &self, fd: wasi::Fd, token: Token, interests: Interest, ) -> io::Result<()> { self.deregister(fd) .and_then(|()| self.register(fd, token, interests)) } #[cfg(feature = "net")] pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> { let mut subscriptions = self.subscriptions.lock().unwrap(); let predicate = |subscription: &wasi::Subscription| { // Safety: `subscription.u.tag` defines the type of the union in // `subscription.u.u`. match subscription.u.tag { t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe { subscription.u.u.fd_write.file_descriptor == fd }, t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe { subscription.u.u.fd_read.file_descriptor == fd }, _ => false, } }; let mut ret = Err(io::ErrorKind::NotFound.into()); while let Some(index) = subscriptions.iter().position(predicate) { subscriptions.swap_remove(index); ret = Ok(()) } ret } } /// Token used to a add a timeout subscription, also used in removing it again. const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::max_value(); /// Returns a `wasi::Subscription` for `timeout`. fn timeout_subscription(timeout: Duration) -> wasi::Subscription { wasi::Subscription { userdata: TIMEOUT_TOKEN, u: wasi::SubscriptionU { tag: wasi::EVENTTYPE_CLOCK.raw(), u: wasi::SubscriptionUU { clock: wasi::SubscriptionClock { id: wasi::CLOCKID_MONOTONIC, // Timestamp is in nanoseconds. timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos()) as wasi::Timestamp, // Give the implementation another millisecond to coalesce // events. precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp, // Zero means the `timeout` is considered relative to the // current time. flags: 0, }, }, }, } } fn is_timeout_event(event: &wasi::Event) -> bool { event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN } /// Check all events for possible errors, it returns the first error found. fn check_errors(events: &[Event]) -> io::Result<()> { for event in events { if event.error != wasi::ERRNO_SUCCESS { return Err(io_err(event.error)); } } Ok(()) } /// Convert `wasi::Errno` into an `io::Error`. fn io_err(errno: wasi::Errno) -> io::Error { // TODO: check if this is valid. io::Error::from_raw_os_error(errno.raw() as i32) } pub(crate) type Events = Vec; pub(crate) type Event = wasi::Event; pub(crate) mod event { use std::fmt; use crate::sys::Event; use crate::Token; pub(crate) fn token(event: &Event) -> Token { Token(event.userdata as usize) } pub(crate) fn is_readable(event: &Event) -> bool { event.type_ == wasi::EVENTTYPE_FD_READ } pub(crate) fn is_writable(event: &Event) -> bool { event.type_ == wasi::EVENTTYPE_FD_WRITE } pub(crate) fn is_error(_: &Event) -> bool { // Not supported? It could be that `wasi::Event.error` could be used for // this, but the docs say `error that occurred while processing the // subscription request`, so it's checked in `Select::select` already. false } pub(crate) fn is_read_closed(event: &Event) -> bool { event.type_ == wasi::EVENTTYPE_FD_READ // Safety: checked the type of the union above. && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 } pub(crate) fn is_write_closed(event: &Event) -> bool { event.type_ == wasi::EVENTTYPE_FD_WRITE // Safety: checked the type of the union above. && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 } pub(crate) fn is_priority(_: &Event) -> bool { // Not supported. false } pub(crate) fn is_aio(_: &Event) -> bool { // Not supported. false } pub(crate) fn is_lio(_: &Event) -> bool { // Not supported. false } pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { debug_detail!( TypeDetails(wasi::Eventtype), PartialEq::eq, wasi::EVENTTYPE_CLOCK, wasi::EVENTTYPE_FD_READ, wasi::EVENTTYPE_FD_WRITE, ); #[allow(clippy::trivially_copy_pass_by_ref)] fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool { (got & want) != 0 } debug_detail!( EventrwflagsDetails(wasi::Eventrwflags), check_flag, wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP, ); struct EventFdReadwriteDetails(wasi::EventFdReadwrite); impl fmt::Debug for EventFdReadwriteDetails { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("EventFdReadwrite") .field("nbytes", &self.0.nbytes) .field("flags", &self.0.flags) .finish() } } f.debug_struct("Event") .field("userdata", &event.userdata) .field("error", &event.error) .field("type", &TypeDetails(event.type_)) .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite)) .finish() } } cfg_os_poll! { cfg_io_source! { pub(crate) struct IoSourceState; impl IoSourceState { pub(crate) fn new() -> IoSourceState { IoSourceState } pub(crate) fn do_io(&self, f: F, io: &T) -> io::Result where F: FnOnce(&T) -> io::Result, { // We don't hold state, so we can just call the function and // return. f(io) } } } }