summaryrefslogtreecommitdiffstats
path: root/third_party/rust/mio/src/poll.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/mio/src/poll.rs2783
1 files changed, 2783 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/poll.rs b/third_party/rust/mio/src/poll.rs
new file mode 100644
index 0000000000..7985d456cd
--- /dev/null
+++ b/third_party/rust/mio/src/poll.rs
@@ -0,0 +1,2783 @@
+use {sys, Token};
+use event_imp::{self as event, Ready, Event, Evented, PollOpt};
+use std::{fmt, io, ptr, usize};
+use std::cell::UnsafeCell;
+use std::{mem, ops, isize};
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+use std::os::unix::io::AsRawFd;
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+use std::os::unix::io::RawFd;
+use std::process;
+use std::sync::{Arc, Mutex, Condvar};
+use std::sync::atomic::{AtomicUsize, AtomicPtr, AtomicBool};
+use std::sync::atomic::Ordering::{self, Acquire, Release, AcqRel, Relaxed, SeqCst};
+use std::time::{Duration, Instant};
+
+// Poll is backed by two readiness queues. The first is a system readiness queue
+// represented by `sys::Selector`. The system readiness queue handles events
+// provided by the system, such as TCP and UDP. The second readiness queue is
+// implemented in user space by `ReadinessQueue`. It provides a way to implement
+// purely user space `Evented` types.
+//
+// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
+// list nodes. This significantly reduces the number of required allocations.
+// Each `Registration` / `SetReadiness` pair allocates a single readiness node
+// that is used for the lifetime of the registration.
+//
+// The readiness node also includes a single atomic variable, `state` that
+// tracks most of the state associated with the registration. This includes the
+// current readiness, interest, poll options, and internal state. When the node
+// state is mutated, it is queued in the MPSC channel. A call to
+// `ReadinessQueue::poll` will dequeue and process nodes. The node state can
+// still be mutated while it is queued in the channel for processing.
+// Intermediate state values do not matter as long as the final state is
+// included in the call to `poll`. This is the eventually consistent nature of
+// the readiness queue.
+//
+// The readiness node is ref counted using the `ref_count` field. On creation,
+// the ref_count is initialized to 3: one `Registration` handle, one
+// `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
+// doesn't *always* hold a handle to the node, we don't use the Arc type for
+// managing ref counts (this is to avoid constantly incrementing and
+// decrementing the ref count when pushing & popping from the queue). When the
+// `Registration` handle is dropped, the `dropped` flag is set on the node, then
+// the node is pushed into the registration queue. When Poll::poll pops the
+// node, it sees the drop flag is set, and decrements it's ref count.
+//
+// The MPSC queue is a modified version of the intrusive MPSC node based queue
+// described by 1024cores [1].
+//
+// The first modification is that two markers are used instead of a single
+// `stub`. The second marker is a `sleep_marker` which is used to signal to
+// producers that the consumer is going to sleep. This sleep_marker is only used
+// when the queue is empty, implying that the only node in the queue is
+// `end_marker`.
+//
+// The second modification is an `until` argument passed to the dequeue
+// function. When `poll` encounters a level-triggered node, the node will be
+// immediately pushed back into the queue. In order to avoid an infinite loop,
+// `poll` before pushing the node, the pointer is saved off and then passed
+// again as the `until` argument. If the next node to pop is `until`, then
+// `Dequeue::Empty` is returned.
+//
+// [1] http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+
+
+/// Polls for readiness events on all registered values.
+///
+/// `Poll` allows a program to monitor a large number of `Evented` types,
+/// waiting until one or more become "ready" for some class of operations; e.g.
+/// reading and writing. An `Evented` type is considered ready if it is possible
+/// to immediately perform a corresponding operation; e.g. [`read`] or
+/// [`write`].
+///
+/// To use `Poll`, an `Evented` type must first be registered with the `Poll`
+/// instance using the [`register`] method, supplying readiness interest. The
+/// readiness interest tells `Poll` which specific operations on the handle to
+/// monitor for readiness. A `Token` is also passed to the [`register`]
+/// function. When `Poll` returns a readiness event, it will include this token.
+/// This associates the event with the `Evented` handle that generated the
+/// event.
+///
+/// [`read`]: tcp/struct.TcpStream.html#method.read
+/// [`write`]: tcp/struct.TcpStream.html#method.write
+/// [`register`]: #method.register
+///
+/// # Examples
+///
+/// A basic example -- establishing a `TcpStream` connection.
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Events, Poll, Ready, PollOpt, Token};
+/// use mio::net::TcpStream;
+///
+/// use std::net::{TcpListener, SocketAddr};
+///
+/// // Bind a server socket to connect to.
+/// let addr: SocketAddr = "127.0.0.1:0".parse()?;
+/// let server = TcpListener::bind(&addr)?;
+///
+/// // Construct a new `Poll` handle as well as the `Events` we'll store into
+/// let poll = Poll::new()?;
+/// let mut events = Events::with_capacity(1024);
+///
+/// // Connect the stream
+/// let stream = TcpStream::connect(&server.local_addr()?)?;
+///
+/// // Register the stream with `Poll`
+/// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
+///
+/// // Wait for the socket to become ready. This has to happens in a loop to
+/// // handle spurious wakeups.
+/// loop {
+/// poll.poll(&mut events, None)?;
+///
+/// for event in &events {
+/// if event.token() == Token(0) && event.readiness().is_writable() {
+/// // The socket connected (probably, it could still be a spurious
+/// // wakeup)
+/// return Ok(());
+/// }
+/// }
+/// }
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// # Edge-triggered and level-triggered
+///
+/// An [`Evented`] registration may request edge-triggered events or
+/// level-triggered events. This is done by setting `register`'s
+/// [`PollOpt`] argument to either [`edge`] or [`level`].
+///
+/// The difference between the two can be described as follows. Supposed that
+/// this scenario happens:
+///
+/// 1. A [`TcpStream`] is registered with `Poll`.
+/// 2. The socket receives 2kb of data.
+/// 3. A call to [`Poll::poll`] returns the token associated with the socket
+/// indicating readable readiness.
+/// 4. 1kb is read from the socket.
+/// 5. Another call to [`Poll::poll`] is made.
+///
+/// If when the socket was registered with `Poll`, edge triggered events were
+/// requested, then the call to [`Poll::poll`] done in step **5** will
+/// (probably) hang despite there being another 1kb still present in the socket
+/// read buffer. The reason for this is that edge-triggered mode delivers events
+/// only when changes occur on the monitored [`Evented`]. So, in step *5* the
+/// caller might end up waiting for some data that is already present inside the
+/// socket buffer.
+///
+/// With edge-triggered events, operations **must** be performed on the
+/// `Evented` type until [`WouldBlock`] is returned. In other words, after
+/// receiving an event indicating readiness for a certain operation, one should
+/// assume that [`Poll::poll`] may never return another event for the same token
+/// and readiness until the operation returns [`WouldBlock`].
+///
+/// By contrast, when level-triggered notifications was requested, each call to
+/// [`Poll::poll`] will return an event for the socket as long as data remains
+/// in the socket buffer. Generally, level-triggered events should be avoided if
+/// high performance is a concern.
+///
+/// Since even with edge-triggered events, multiple events can be generated upon
+/// receipt of multiple chunks of data, the caller has the option to set the
+/// [`oneshot`] flag. This tells `Poll` to disable the associated [`Evented`]
+/// after the event is returned from [`Poll::poll`]. The subsequent calls to
+/// [`Poll::poll`] will no longer include events for [`Evented`] handles that
+/// are disabled even if the readiness state changes. The handle can be
+/// re-enabled by calling [`reregister`]. When handles are disabled, internal
+/// resources used to monitor the handle are maintained until the handle is
+/// dropped or deregistered. This makes re-registering the handle a fast
+/// operation.
+///
+/// For example, in the following scenario:
+///
+/// 1. A [`TcpStream`] is registered with `Poll`.
+/// 2. The socket receives 2kb of data.
+/// 3. A call to [`Poll::poll`] returns the token associated with the socket
+/// indicating readable readiness.
+/// 4. 2kb is read from the socket.
+/// 5. Another call to read is issued and [`WouldBlock`] is returned
+/// 6. The socket receives another 2kb of data.
+/// 7. Another call to [`Poll::poll`] is made.
+///
+/// Assuming the socket was registered with `Poll` with the [`edge`] and
+/// [`oneshot`] options, then the call to [`Poll::poll`] in step 7 would block. This
+/// is because, [`oneshot`] tells `Poll` to disable events for the socket after
+/// returning an event.
+///
+/// In order to receive the event for the data received in step 6, the socket
+/// would need to be reregistered using [`reregister`].
+///
+/// [`PollOpt`]: struct.PollOpt.html
+/// [`edge`]: struct.PollOpt.html#method.edge
+/// [`level`]: struct.PollOpt.html#method.level
+/// [`Poll::poll`]: struct.Poll.html#method.poll
+/// [`WouldBlock`]: https://doc.rust-lang.org/std/io/enum.ErrorKind.html#variant.WouldBlock
+/// [`Evented`]: event/trait.Evented.html
+/// [`TcpStream`]: tcp/struct.TcpStream.html
+/// [`reregister`]: #method.reregister
+/// [`oneshot`]: struct.PollOpt.html#method.oneshot
+///
+/// # Portability
+///
+/// Using `Poll` provides a portable interface across supported platforms as
+/// long as the caller takes the following into consideration:
+///
+/// ### Spurious events
+///
+/// [`Poll::poll`] may return readiness events even if the associated
+/// [`Evented`] handle is not actually ready. Given the same code, this may
+/// happen more on some platforms than others. It is important to never assume
+/// that, just because a readiness notification was received, that the
+/// associated operation will succeed as well.
+///
+/// If operation fails with [`WouldBlock`], then the caller should not treat
+/// this as an error, but instead should wait until another readiness event is
+/// received.
+///
+/// ### Draining readiness
+///
+/// When using edge-triggered mode, once a readiness event is received, the
+/// corresponding operation must be performed repeatedly until it returns
+/// [`WouldBlock`]. Unless this is done, there is no guarantee that another
+/// readiness event will be delivered, even if further data is received for the
+/// [`Evented`] handle.
+///
+/// For example, in the first scenario described above, after step 5, even if
+/// the socket receives more data there is no guarantee that another readiness
+/// event will be delivered.
+///
+/// ### Readiness operations
+///
+/// The only readiness operations that are guaranteed to be present on all
+/// supported platforms are [`readable`] and [`writable`]. All other readiness
+/// operations may have false negatives and as such should be considered
+/// **hints**. This means that if a socket is registered with [`readable`],
+/// [`error`], and [`hup`] interest, and either an error or hup is received, a
+/// readiness event will be generated for the socket, but it **may** only
+/// include `readable` readiness. Also note that, given the potential for
+/// spurious events, receiving a readiness event with `hup` or `error` doesn't
+/// actually mean that a `read` on the socket will return a result matching the
+/// readiness event.
+///
+/// In other words, portable programs that explicitly check for [`hup`] or
+/// [`error`] readiness should be doing so as an **optimization** and always be
+/// able to handle an error or HUP situation when performing the actual read
+/// operation.
+///
+/// [`readable`]: struct.Ready.html#method.readable
+/// [`writable`]: struct.Ready.html#method.writable
+/// [`error`]: unix/struct.UnixReady.html#method.error
+/// [`hup`]: unix/struct.UnixReady.html#method.hup
+///
+/// ### Registering handles
+///
+/// Unless otherwise noted, it should be assumed that types implementing
+/// [`Evented`] will never become ready unless they are registered with `Poll`.
+///
+/// For example:
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Poll, Ready, PollOpt, Token};
+/// use mio::net::TcpStream;
+/// use std::time::Duration;
+/// use std::thread;
+///
+/// let sock = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
+///
+/// thread::sleep(Duration::from_secs(1));
+///
+/// let poll = Poll::new()?;
+///
+/// // The connect is not guaranteed to have started until it is registered at
+/// // this point
+/// poll.register(&sock, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// # Implementation notes
+///
+/// `Poll` is backed by the selector provided by the operating system.
+///
+/// | OS | Selector |
+/// |------------|-----------|
+/// | Linux | [epoll] |
+/// | OS X, iOS | [kqueue] |
+/// | Windows | [IOCP] |
+/// | FreeBSD | [kqueue] |
+/// | Android | [epoll] |
+///
+/// On all supported platforms, socket operations are handled by using the
+/// system selector. Platform specific extensions (e.g. [`EventedFd`]) allow
+/// accessing other features provided by individual system selectors. For
+/// example, Linux's [`signalfd`] feature can be used by registering the FD with
+/// `Poll` via [`EventedFd`].
+///
+/// On all platforms except windows, a call to [`Poll::poll`] is mostly just a
+/// direct call to the system selector. However, [IOCP] uses a completion model
+/// instead of a readiness model. In this case, `Poll` must adapt the completion
+/// model Mio's API. While non-trivial, the bridge layer is still quite
+/// efficient. The most expensive part being calls to `read` and `write` require
+/// data to be copied into an intermediate buffer before it is passed to the
+/// kernel.
+///
+/// Notifications generated by [`SetReadiness`] are handled by an internal
+/// readiness queue. A single call to [`Poll::poll`] will collect events from
+/// both from the system selector and the internal readiness queue.
+///
+/// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+/// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
+/// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
+/// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html
+/// [`EventedFd`]: unix/struct.EventedFd.html
+/// [`SetReadiness`]: struct.SetReadiness.html
+/// [`Poll::poll`]: struct.Poll.html#method.poll
+pub struct Poll {
+ // Platform specific IO selector
+ selector: sys::Selector,
+
+ // Custom readiness queue
+ readiness_queue: ReadinessQueue,
+
+ // Use an atomic to first check if a full lock will be required. This is a
+ // fast-path check for single threaded cases avoiding the extra syscall
+ lock_state: AtomicUsize,
+
+ // Sequences concurrent calls to `Poll::poll`
+ lock: Mutex<()>,
+
+ // Wakeup the next waiter
+ condvar: Condvar,
+}
+
+/// Handle to a user space `Poll` registration.
+///
+/// `Registration` allows implementing [`Evented`] for types that cannot work
+/// with the [system selector]. A `Registration` is always paired with a
+/// `SetReadiness`, which allows updating the registration's readiness state.
+/// When [`set_readiness`] is called and the `Registration` is associated with a
+/// [`Poll`] instance, a readiness event will be created and eventually returned
+/// by [`poll`].
+///
+/// A `Registration` / `SetReadiness` pair is created by calling
+/// [`Registration::new2`]. At this point, the registration is not being
+/// monitored by a [`Poll`] instance, so calls to `set_readiness` will not
+/// result in any readiness notifications.
+///
+/// `Registration` implements [`Evented`], so it can be used with [`Poll`] using
+/// the same [`register`], [`reregister`], and [`deregister`] functions used
+/// with TCP, UDP, etc... types. Once registered with [`Poll`], readiness state
+/// changes result in readiness events being dispatched to the [`Poll`] instance
+/// with which `Registration` is registered.
+///
+/// **Note**, before using `Registration` be sure to read the
+/// [`set_readiness`] documentation and the [portability] notes. The
+/// guarantees offered by `Registration` may be weaker than expected.
+///
+/// For high level documentation, see [`Poll`].
+///
+/// # Examples
+///
+/// ```
+/// use mio::{Ready, Registration, Poll, PollOpt, Token};
+/// use mio::event::Evented;
+///
+/// use std::io;
+/// use std::time::Instant;
+/// use std::thread;
+///
+/// pub struct Deadline {
+/// when: Instant,
+/// registration: Registration,
+/// }
+///
+/// impl Deadline {
+/// pub fn new(when: Instant) -> Deadline {
+/// let (registration, set_readiness) = Registration::new2();
+///
+/// thread::spawn(move || {
+/// let now = Instant::now();
+///
+/// if now < when {
+/// thread::sleep(when - now);
+/// }
+///
+/// set_readiness.set_readiness(Ready::readable());
+/// });
+///
+/// Deadline {
+/// when: when,
+/// registration: registration,
+/// }
+/// }
+///
+/// pub fn is_elapsed(&self) -> bool {
+/// Instant::now() >= self.when
+/// }
+/// }
+///
+/// impl Evented for Deadline {
+/// fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
+/// -> io::Result<()>
+/// {
+/// self.registration.register(poll, token, interest, opts)
+/// }
+///
+/// fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
+/// -> io::Result<()>
+/// {
+/// self.registration.reregister(poll, token, interest, opts)
+/// }
+///
+/// fn deregister(&self, poll: &Poll) -> io::Result<()> {
+/// poll.deregister(&self.registration)
+/// }
+/// }
+/// ```
+///
+/// [system selector]: struct.Poll.html#implementation-notes
+/// [`Poll`]: struct.Poll.html
+/// [`Registration::new2`]: struct.Registration.html#method.new2
+/// [`Evented`]: event/trait.Evented.html
+/// [`set_readiness`]: struct.SetReadiness.html#method.set_readiness
+/// [`register`]: struct.Poll.html#method.register
+/// [`reregister`]: struct.Poll.html#method.reregister
+/// [`deregister`]: struct.Poll.html#method.deregister
+/// [portability]: struct.Poll.html#portability
+pub struct Registration {
+ inner: RegistrationInner,
+}
+
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+
+/// Updates the readiness state of the associated `Registration`.
+///
+/// See [`Registration`] for more documentation on using `SetReadiness` and
+/// [`Poll`] for high level polling documentation.
+///
+/// [`Poll`]: struct.Poll.html
+/// [`Registration`]: struct.Registration.html
+#[derive(Clone)]
+pub struct SetReadiness {
+ inner: RegistrationInner,
+}
+
+unsafe impl Send for SetReadiness {}
+unsafe impl Sync for SetReadiness {}
+
+/// Used to associate an IO type with a Selector
+#[derive(Debug)]
+pub struct SelectorId {
+ id: AtomicUsize,
+}
+
+struct RegistrationInner {
+ // Unsafe pointer to the registration's node. The node is ref counted. This
+ // cannot "simply" be tracked by an Arc because `Poll::poll` has an implicit
+ // handle though it isn't stored anywhere. In other words, `Poll::poll`
+ // needs to decrement the ref count before the node is freed.
+ node: *mut ReadinessNode,
+}
+
+#[derive(Clone)]
+struct ReadinessQueue {
+ inner: Arc<ReadinessQueueInner>,
+}
+
+unsafe impl Send for ReadinessQueue {}
+unsafe impl Sync for ReadinessQueue {}
+
+struct ReadinessQueueInner {
+ // Used to wake up `Poll` when readiness is set in another thread.
+ awakener: sys::Awakener,
+
+ // Head of the MPSC queue used to signal readiness to `Poll::poll`.
+ head_readiness: AtomicPtr<ReadinessNode>,
+
+ // Tail of the readiness queue.
+ //
+ // Only accessed by Poll::poll. Coordination will be handled by the poll fn
+ tail_readiness: UnsafeCell<*mut ReadinessNode>,
+
+ // Fake readiness node used to punctuate the end of the readiness queue.
+ // Before attempting to read from the queue, this node is inserted in order
+ // to partition the queue between nodes that are "owned" by the dequeue end
+ // and nodes that will be pushed on by producers.
+ end_marker: Box<ReadinessNode>,
+
+ // Similar to `end_marker`, but this node signals to producers that `Poll`
+ // has gone to sleep and must be woken up.
+ sleep_marker: Box<ReadinessNode>,
+
+ // Similar to `end_marker`, but the node signals that the queue is closed.
+ // This happens when `ReadyQueue` is dropped and signals to producers that
+ // the nodes should no longer be pushed into the queue.
+ closed_marker: Box<ReadinessNode>,
+}
+
+/// Node shared by a `Registration` / `SetReadiness` pair as well as the node
+/// queued into the MPSC channel.
+struct ReadinessNode {
+ // Node state, see struct docs for `ReadinessState`
+ //
+ // This variable is the primary point of coordination between all the
+ // various threads concurrently accessing the node.
+ state: AtomicState,
+
+ // The registration token cannot fit into the `state` variable, so it is
+ // broken out here. In order to atomically update both the state and token
+ // we have to jump through a few hoops.
+ //
+ // First, `state` includes `token_read_pos` and `token_write_pos`. These can
+ // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
+ // the token slot that contains the most up to date registration token.
+ // `token_read_pos` is the token slot that `poll` is currently reading from.
+ //
+ // When a call to `update` includes a different token than the one currently
+ // associated with the registration (token_write_pos), first an unused token
+ // slot is found. The unused slot is the one not represented by
+ // `token_read_pos` OR `token_write_pos`. The new token is written to this
+ // slot, then `state` is updated with the new `token_write_pos` value. This
+ // requires that there is only a *single* concurrent call to `update`.
+ //
+ // When `poll` reads a node state, it checks that `token_read_pos` matches
+ // `token_write_pos`. If they do not match, then it atomically updates
+ // `state` such that `token_read_pos` is set to `token_write_pos`. It will
+ // then read the token at the newly updated `token_read_pos`.
+ token_0: UnsafeCell<Token>,
+ token_1: UnsafeCell<Token>,
+ token_2: UnsafeCell<Token>,
+
+ // Used when the node is queued in the readiness linked list. Accessing
+ // this field requires winning the "queue" lock
+ next_readiness: AtomicPtr<ReadinessNode>,
+
+ // Ensures that there is only one concurrent call to `update`.
+ //
+ // Each call to `update` will attempt to swap `update_lock` from `false` to
+ // `true`. If the CAS succeeds, the thread has obtained the update lock. If
+ // the CAS fails, then the `update` call returns immediately and the update
+ // is discarded.
+ update_lock: AtomicBool,
+
+ // Pointer to Arc<ReadinessQueueInner>
+ readiness_queue: AtomicPtr<()>,
+
+ // Tracks the number of `ReadyRef` pointers
+ ref_count: AtomicUsize,
+}
+
+/// Stores the ReadinessNode state in an AtomicUsize. This wrapper around the
+/// atomic variable handles encoding / decoding `ReadinessState` values.
+struct AtomicState {
+ inner: AtomicUsize,
+}
+
+const MASK_2: usize = 4 - 1;
+const MASK_4: usize = 16 - 1;
+const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
+const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
+
+const READINESS_SHIFT: usize = 0;
+const INTEREST_SHIFT: usize = 4;
+const POLL_OPT_SHIFT: usize = 8;
+const TOKEN_RD_SHIFT: usize = 12;
+const TOKEN_WR_SHIFT: usize = 14;
+const QUEUED_SHIFT: usize = 16;
+const DROPPED_SHIFT: usize = 17;
+
+/// Tracks all state for a single `ReadinessNode`. The state is packed into a
+/// `usize` variable from low to high bit as follows:
+///
+/// 4 bits: Registration current readiness
+/// 4 bits: Registration interest
+/// 4 bits: Poll options
+/// 2 bits: Token position currently being read from by `poll`
+/// 2 bits: Token position last written to by `update`
+/// 1 bit: Queued flag, set when node is being pushed into MPSC queue.
+/// 1 bit: Dropped flag, set when all `Registration` handles have been dropped.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+struct ReadinessState(usize);
+
+/// Returned by `dequeue_node`. Represents the different states as described by
+/// the queue documentation on 1024cores.net.
+enum Dequeue {
+ Data(*mut ReadinessNode),
+ Empty,
+ Inconsistent,
+}
+
+const AWAKEN: Token = Token(usize::MAX);
+const MAX_REFCOUNT: usize = (isize::MAX) as usize;
+
+/*
+ *
+ * ===== Poll =====
+ *
+ */
+
+impl Poll {
+ /// Return a new `Poll` handle.
+ ///
+ /// This function will make a syscall to the operating system to create the
+ /// system selector. If this syscall fails, `Poll::new` will return with the
+ /// error.
+ ///
+ /// See [struct] level docs for more details.
+ ///
+ /// [struct]: struct.Poll.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Poll, Events};
+ /// use std::time::Duration;
+ ///
+ /// let poll = match Poll::new() {
+ /// Ok(poll) => poll,
+ /// Err(e) => panic!("failed to create Poll instance; err={:?}", e),
+ /// };
+ ///
+ /// // Create a structure to receive polled events
+ /// let mut events = Events::with_capacity(1024);
+ ///
+ /// // Wait for events, but none will be received because no `Evented`
+ /// // handles have been registered with this `Poll` instance.
+ /// let n = poll.poll(&mut events, Some(Duration::from_millis(500)))?;
+ /// assert_eq!(n, 0);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn new() -> io::Result<Poll> {
+ is_send::<Poll>();
+ is_sync::<Poll>();
+
+ let poll = Poll {
+ selector: sys::Selector::new()?,
+ readiness_queue: ReadinessQueue::new()?,
+ lock_state: AtomicUsize::new(0),
+ lock: Mutex::new(()),
+ condvar: Condvar::new(),
+ };
+
+ // Register the notification wakeup FD with the IO poller
+ poll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge())?;
+
+ Ok(poll)
+ }
+
+ /// Register an `Evented` handle with the `Poll` instance.
+ ///
+ /// Once registered, the `Poll` instance will monitor the `Evented` handle
+ /// for readiness state changes. When it notices a state change, it will
+ /// return a readiness event for the handle the next time [`poll`] is
+ /// called.
+ ///
+ /// See the [`struct`] docs for a high level overview.
+ ///
+ /// # Arguments
+ ///
+ /// `handle: &E: Evented`: This is the handle that the `Poll` instance
+ /// should monitor for readiness state changes.
+ ///
+ /// `token: Token`: The caller picks a token to associate with the socket.
+ /// When [`poll`] returns an event for the handle, this token is included.
+ /// This allows the caller to map the event to its handle. The token
+ /// associated with the `Evented` handle can be changed at any time by
+ /// calling [`reregister`].
+ ///
+ /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
+ /// usage.
+ ///
+ /// See documentation on [`Token`] for an example showing how to pick
+ /// [`Token`] values.
+ ///
+ /// `interest: Ready`: Specifies which operations `Poll` should monitor for
+ /// readiness. `Poll` will only return readiness events for operations
+ /// specified by this argument.
+ ///
+ /// If a socket is registered with readable interest and the socket becomes
+ /// writable, no event will be returned from [`poll`].
+ ///
+ /// The readiness interest for an `Evented` handle can be changed at any
+ /// time by calling [`reregister`].
+ ///
+ /// `opts: PollOpt`: Specifies the registration options. The most common
+ /// options being [`level`] for level-triggered events, [`edge`] for
+ /// edge-triggered events, and [`oneshot`].
+ ///
+ /// The registration options for an `Evented` handle can be changed at any
+ /// time by calling [`reregister`].
+ ///
+ /// # Notes
+ ///
+ /// Unless otherwise specified, the caller should assume that once an
+ /// `Evented` handle is registered with a `Poll` instance, it is bound to
+ /// that `Poll` instance for the lifetime of the `Evented` handle. This
+ /// remains true even if the `Evented` handle is deregistered from the poll
+ /// instance using [`deregister`].
+ ///
+ /// This function is **thread safe**. It can be called concurrently from
+ /// multiple threads.
+ ///
+ /// [`struct`]: #
+ /// [`reregister`]: #method.reregister
+ /// [`deregister`]: #method.deregister
+ /// [`poll`]: #method.poll
+ /// [`level`]: struct.PollOpt.html#method.level
+ /// [`edge`]: struct.PollOpt.html#method.edge
+ /// [`oneshot`]: struct.PollOpt.html#method.oneshot
+ /// [`Token`]: struct.Token.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Poll, Ready, PollOpt, Token};
+ /// use mio::net::TcpStream;
+ /// use std::time::{Duration, Instant};
+ ///
+ /// let poll = Poll::new()?;
+ /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
+ ///
+ /// // Register the socket with `poll`
+ /// poll.register(&socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
+ ///
+ /// let mut events = Events::with_capacity(1024);
+ /// let start = Instant::now();
+ /// let timeout = Duration::from_millis(500);
+ ///
+ /// loop {
+ /// let elapsed = start.elapsed();
+ ///
+ /// if elapsed >= timeout {
+ /// // Connection timed out
+ /// return Ok(());
+ /// }
+ ///
+ /// let remaining = timeout - elapsed;
+ /// poll.poll(&mut events, Some(remaining))?;
+ ///
+ /// for event in &events {
+ /// if event.token() == Token(0) {
+ /// // Something (probably) happened on the socket.
+ /// return Ok(());
+ /// }
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
+ where E: Evented
+ {
+ validate_args(token)?;
+
+ /*
+ * Undefined behavior:
+ * - Reusing a token with a different `Evented` without deregistering
+ * (or closing) the original `Evented`.
+ */
+ trace!("registering with poller");
+
+ // Register interests for this socket
+ handle.register(self, token, interest, opts)?;
+
+ Ok(())
+ }
+
+ /// Re-register an `Evented` handle with the `Poll` instance.
+ ///
+ /// Re-registering an `Evented` handle allows changing the details of the
+ /// registration. Specifically, it allows updating the associated `token`,
+ /// `interest`, and `opts` specified in previous `register` and `reregister`
+ /// calls.
+ ///
+ /// The `reregister` arguments fully override the previous values. In other
+ /// words, if a socket is registered with [`readable`] interest and the call
+ /// to `reregister` specifies [`writable`], then read interest is no longer
+ /// requested for the handle.
+ ///
+ /// The `Evented` handle must have previously been registered with this
+ /// instance of `Poll` otherwise the call to `reregister` will return with
+ /// an error.
+ ///
+ /// `token` cannot be `Token(usize::MAX)` as it is reserved for internal
+ /// usage.
+ ///
+ /// See the [`register`] documentation for details about the function
+ /// arguments and see the [`struct`] docs for a high level overview of
+ /// polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Poll, Ready, PollOpt, Token};
+ /// use mio::net::TcpStream;
+ ///
+ /// let poll = Poll::new()?;
+ /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
+ ///
+ /// // Register the socket with `poll`, requesting readable
+ /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
+ ///
+ /// // Reregister the socket specifying a different token and write interest
+ /// // instead. `PollOpt::edge()` must be specified even though that value
+ /// // is not being changed.
+ /// poll.reregister(&socket, Token(2), Ready::writable(), PollOpt::edge())?;
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`struct`]: #
+ /// [`register`]: #method.register
+ /// [`readable`]: struct.Ready.html#method.readable
+ /// [`writable`]: struct.Ready.html#method.writable
+ pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
+ where E: Evented
+ {
+ validate_args(token)?;
+
+ trace!("registering with poller");
+
+ // Register interests for this socket
+ handle.reregister(self, token, interest, opts)?;
+
+ Ok(())
+ }
+
+ /// Deregister an `Evented` handle with the `Poll` instance.
+ ///
+ /// When an `Evented` handle is deregistered, the `Poll` instance will
+ /// no longer monitor it for readiness state changes. Unlike disabling
+ /// handles with oneshot, deregistering clears up any internal resources
+ /// needed to track the handle.
+ ///
+ /// A handle can be passed back to `register` after it has been
+ /// deregistered; however, it must be passed back to the **same** `Poll`
+ /// instance.
+ ///
+ /// `Evented` handles are automatically deregistered when they are dropped.
+ /// It is common to never need to explicitly call `deregister`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Poll, Ready, PollOpt, Token};
+ /// use mio::net::TcpStream;
+ /// use std::time::Duration;
+ ///
+ /// let poll = Poll::new()?;
+ /// let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
+ ///
+ /// // Register the socket with `poll`
+ /// poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;
+ ///
+ /// poll.deregister(&socket)?;
+ ///
+ /// let mut events = Events::with_capacity(1024);
+ ///
+ /// // Set a timeout because this poll should never receive any events.
+ /// let n = poll.poll(&mut events, Some(Duration::from_secs(1)))?;
+ /// assert_eq!(0, n);
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
+ where E: Evented
+ {
+ trace!("deregistering handle with poller");
+
+ // Deregister interests for this socket
+ handle.deregister(self)?;
+
+ Ok(())
+ }
+
+ /// Wait for readiness events
+ ///
+ /// Blocks the current thread and waits for readiness events for any of the
+ /// `Evented` handles that have been registered with this `Poll` instance.
+ /// The function will block until either at least one readiness event has
+ /// been received or `timeout` has elapsed. A `timeout` of `None` means that
+ /// `poll` will block until a readiness event has been received.
+ ///
+ /// The supplied `events` will be cleared and newly received readiness events
+ /// will be pushed onto the end. At most `events.capacity()` events will be
+ /// returned. If there are further pending readiness events, they will be
+ /// returned on the next call to `poll`.
+ ///
+ /// A single call to `poll` may result in multiple readiness events being
+ /// returned for a single `Evented` handle. For example, if a TCP socket
+ /// becomes both readable and writable, it may be possible for a single
+ /// readiness event to be returned with both [`readable`] and [`writable`]
+ /// readiness **OR** two separate events may be returned, one with
+ /// [`readable`] set and one with [`writable`] set.
+ ///
+ /// Note that the `timeout` will be rounded up to the system clock
+ /// granularity (usually 1ms), and kernel scheduling delays mean that
+ /// the blocking interval may be overrun by a small amount.
+ ///
+ /// `poll` returns the number of readiness events that have been pushed into
+ /// `events` or `Err` when an error has been encountered with the system
+ /// selector. The value returned is deprecated and will be removed in 0.7.0.
+ /// Accessing the events by index is also deprecated. Events can be
+ /// inserted by other events triggering, thus making sequential access
+ /// problematic. Use the iterator API instead. See [`iter`].
+ ///
+ /// See the [struct] level documentation for a higher level discussion of
+ /// polling.
+ ///
+ /// [`readable`]: struct.Ready.html#method.readable
+ /// [`writable`]: struct.Ready.html#method.writable
+ /// [struct]: #
+ /// [`iter`]: struct.Events.html#method.iter
+ ///
+ /// # Examples
+ ///
+ /// A basic example -- establishing a `TcpStream` connection.
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Poll, Ready, PollOpt, Token};
+ /// use mio::net::TcpStream;
+ ///
+ /// use std::net::{TcpListener, SocketAddr};
+ /// use std::thread;
+ ///
+ /// // Bind a server socket to connect to.
+ /// let addr: SocketAddr = "127.0.0.1:0".parse()?;
+ /// let server = TcpListener::bind(&addr)?;
+ /// let addr = server.local_addr()?.clone();
+ ///
+ /// // Spawn a thread to accept the socket
+ /// thread::spawn(move || {
+ /// let _ = server.accept();
+ /// });
+ ///
+ /// // Construct a new `Poll` handle as well as the `Events` we'll store into
+ /// let poll = Poll::new()?;
+ /// let mut events = Events::with_capacity(1024);
+ ///
+ /// // Connect the stream
+ /// let stream = TcpStream::connect(&addr)?;
+ ///
+ /// // Register the stream with `Poll`
+ /// poll.register(&stream, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
+ ///
+ /// // Wait for the socket to become ready. This has to happens in a loop to
+ /// // handle spurious wakeups.
+ /// loop {
+ /// poll.poll(&mut events, None)?;
+ ///
+ /// for event in &events {
+ /// if event.token() == Token(0) && event.readiness().is_writable() {
+ /// // The socket connected (probably, it could still be a spurious
+ /// // wakeup)
+ /// return Ok(());
+ /// }
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [struct]: #
+ pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
+ self.poll1(events, timeout, false)
+ }
+
+ /// Like `poll`, but may be interrupted by a signal
+ ///
+ /// If `poll` is inturrupted while blocking, it will transparently retry the syscall. If you
+ /// want to handle signals yourself, however, use `poll_interruptible`.
+ pub fn poll_interruptible(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
+ self.poll1(events, timeout, true)
+ }
+
+ fn poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
+ let zero = Some(Duration::from_millis(0));
+
+ // At a high level, the synchronization strategy is to acquire access to
+ // the critical section by transitioning the atomic from unlocked ->
+ // locked. If the attempt fails, the thread will wait on the condition
+ // variable.
+ //
+ // # Some more detail
+ //
+ // The `lock_state` atomic usize combines:
+ //
+ // - locked flag, stored in the least significant bit
+ // - number of waiting threads, stored in the rest of the bits.
+ //
+ // When a thread transitions the locked flag from 0 -> 1, it has
+ // obtained access to the critical section.
+ //
+ // When entering `poll`, a compare-and-swap from 0 -> 1 is attempted.
+ // This is a fast path for the case when there are no concurrent calls
+ // to poll, which is very common.
+ //
+ // On failure, the mutex is locked, and the thread attempts to increment
+ // the number of waiting threads component of `lock_state`. If this is
+ // successfully done while the locked flag is set, then the thread can
+ // wait on the condition variable.
+ //
+ // When a thread exits the critical section, it unsets the locked flag.
+ // If there are any waiters, which is atomically determined while
+ // unsetting the locked flag, then the condvar is notified.
+
+ let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
+
+ if 0 != curr {
+ // Enter slower path
+ let mut lock = self.lock.lock().unwrap();
+ let mut inc = false;
+
+ loop {
+ if curr & 1 == 0 {
+ // The lock is currently free, attempt to grab it
+ let mut next = curr | 1;
+
+ if inc {
+ // The waiter count has previously been incremented, so
+ // decrement it here
+ next -= 2;
+ }
+
+ let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
+
+ if actual != curr {
+ curr = actual;
+ continue;
+ }
+
+ // Lock acquired, break from the loop
+ break;
+ }
+
+ if timeout == zero {
+ if inc {
+ self.lock_state.fetch_sub(2, SeqCst);
+ }
+
+ return Ok(0);
+ }
+
+ // The lock is currently held, so wait for it to become
+ // free. If the waiter count hasn't been incremented yet, do
+ // so now
+ if !inc {
+ let next = curr.checked_add(2).expect("overflow");
+ let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
+
+ if actual != curr {
+ curr = actual;
+ continue;
+ }
+
+ // Track that the waiter count has been incremented for
+ // this thread and fall through to the condvar waiting
+ inc = true;
+ }
+
+ lock = match timeout {
+ Some(to) => {
+ let now = Instant::now();
+
+ // Wait to be notified
+ let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
+
+ // See how much time was elapsed in the wait
+ let elapsed = now.elapsed();
+
+ // Update `timeout` to reflect how much time is left to
+ // wait.
+ if elapsed >= to {
+ timeout = zero;
+ } else {
+ // Update the timeout
+ timeout = Some(to - elapsed);
+ }
+
+ l
+ }
+ None => {
+ self.condvar.wait(lock).unwrap()
+ }
+ };
+
+ // Reload the state
+ curr = self.lock_state.load(SeqCst);
+
+ // Try to lock again...
+ }
+ }
+
+ let ret = self.poll2(events, timeout, interruptible);
+
+ // Release the lock
+ if 1 != self.lock_state.fetch_and(!1, Release) {
+ // Acquire the mutex
+ let _lock = self.lock.lock().unwrap();
+
+ // There is at least one waiting thread, so notify one
+ self.condvar.notify_one();
+ }
+
+ ret
+ }
+
+ #[inline]
+ #[cfg_attr(feature = "cargo-clippy", allow(clippy::if_same_then_else))]
+ fn poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
+ // Compute the timeout value passed to the system selector. If the
+ // readiness queue has pending nodes, we still want to poll the system
+ // selector for new events, but we don't want to block the thread to
+ // wait for new events.
+ if timeout == Some(Duration::from_millis(0)) {
+ // If blocking is not requested, then there is no need to prepare
+ // the queue for sleep
+ //
+ // The sleep_marker should be removed by readiness_queue.poll().
+ } else if self.readiness_queue.prepare_for_sleep() {
+ // The readiness queue is empty. The call to `prepare_for_sleep`
+ // inserts `sleep_marker` into the queue. This signals to any
+ // threads setting readiness that the `Poll::poll` is going to
+ // sleep, so the awakener should be used.
+ } else {
+ // The readiness queue is not empty, so do not block the thread.
+ timeout = Some(Duration::from_millis(0));
+ }
+
+ loop {
+ let now = Instant::now();
+ // First get selector events
+ let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
+ match res {
+ Ok(true) => {
+ // Some awakeners require reading from a FD.
+ self.readiness_queue.inner.awakener.cleanup();
+ break;
+ }
+ Ok(false) => break,
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {
+ // Interrupted by a signal; update timeout if necessary and retry
+ if let Some(to) = timeout {
+ let elapsed = now.elapsed();
+ if elapsed >= to {
+ break;
+ } else {
+ timeout = Some(to - elapsed);
+ }
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ }
+
+ // Poll custom event queue
+ self.readiness_queue.poll(&mut events.inner);
+
+ // Return number of polled events
+ Ok(events.inner.len())
+ }
+}
+
+fn validate_args(token: Token) -> io::Result<()> {
+ if token == AWAKEN {
+ return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
+ }
+
+ Ok(())
+}
+
+impl fmt::Debug for Poll {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Poll")
+ .finish()
+ }
+}
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+impl AsRawFd for Poll {
+ fn as_raw_fd(&self) -> RawFd {
+ self.selector.as_raw_fd()
+ }
+}
+
+/// A collection of readiness events.
+///
+/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
+/// receive any new readiness events received since the last poll. Usually, a
+/// single `Events` instance is created at the same time as a [`Poll`] and
+/// reused on each call to [`Poll::poll`].
+///
+/// See [`Poll`] for more documentation on polling.
+///
+/// # Examples
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Events, Poll};
+/// use std::time::Duration;
+///
+/// let mut events = Events::with_capacity(1024);
+/// let poll = Poll::new()?;
+///
+/// assert_eq!(0, events.len());
+///
+/// // Register `Evented` handles with `poll`
+///
+/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
+///
+/// for event in &events {
+/// println!("event={:?}", event);
+/// }
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Poll::poll`]: struct.Poll.html#method.poll
+/// [`Poll`]: struct.Poll.html
+pub struct Events {
+ inner: sys::Events,
+}
+
+/// [`Events`] iterator.
+///
+/// This struct is created by the [`iter`] method on [`Events`].
+///
+/// # Examples
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Events, Poll};
+/// use std::time::Duration;
+///
+/// let mut events = Events::with_capacity(1024);
+/// let poll = Poll::new()?;
+///
+/// // Register handles with `poll`
+///
+/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
+///
+/// for event in events.iter() {
+/// println!("event={:?}", event);
+/// }
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+///
+/// [`Events`]: struct.Events.html
+/// [`iter`]: struct.Events.html#method.iter
+#[derive(Debug, Clone)]
+pub struct Iter<'a> {
+ inner: &'a Events,
+ pos: usize,
+}
+
+/// Owned [`Events`] iterator.
+///
+/// This struct is created by the `into_iter` method on [`Events`].
+///
+/// # Examples
+///
+/// ```
+/// # use std::error::Error;
+/// # fn try_main() -> Result<(), Box<Error>> {
+/// use mio::{Events, Poll};
+/// use std::time::Duration;
+///
+/// let mut events = Events::with_capacity(1024);
+/// let poll = Poll::new()?;
+///
+/// // Register handles with `poll`
+///
+/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
+///
+/// for event in events {
+/// println!("event={:?}", event);
+/// }
+/// # Ok(())
+/// # }
+/// #
+/// # fn main() {
+/// # try_main().unwrap();
+/// # }
+/// ```
+/// [`Events`]: struct.Events.html
+#[derive(Debug)]
+pub struct IntoIter {
+ inner: Events,
+ pos: usize,
+}
+
+impl Events {
+ /// Return a new `Events` capable of holding up to `capacity` events.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::Events;
+ ///
+ /// let events = Events::with_capacity(1024);
+ ///
+ /// assert_eq!(1024, events.capacity());
+ /// ```
+ pub fn with_capacity(capacity: usize) -> Events {
+ Events {
+ inner: sys::Events::with_capacity(capacity),
+ }
+ }
+
+ #[deprecated(since="0.6.10", note="Index access removed in favor of iterator only API.")]
+ #[doc(hidden)]
+ pub fn get(&self, idx: usize) -> Option<Event> {
+ self.inner.get(idx)
+ }
+
+ #[doc(hidden)]
+ #[deprecated(since="0.6.10", note="Index access removed in favor of iterator only API.")]
+ pub fn len(&self) -> usize {
+ self.inner.len()
+ }
+
+ /// Returns the number of `Event` values that `self` can hold.
+ ///
+ /// ```
+ /// use mio::Events;
+ ///
+ /// let events = Events::with_capacity(1024);
+ ///
+ /// assert_eq!(1024, events.capacity());
+ /// ```
+ pub fn capacity(&self) -> usize {
+ self.inner.capacity()
+ }
+
+ /// Returns `true` if `self` contains no `Event` values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::Events;
+ ///
+ /// let events = Events::with_capacity(1024);
+ ///
+ /// assert!(events.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+
+ /// Returns an iterator over the `Event` values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Poll};
+ /// use std::time::Duration;
+ ///
+ /// let mut events = Events::with_capacity(1024);
+ /// let poll = Poll::new()?;
+ ///
+ /// // Register handles with `poll`
+ ///
+ /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
+ ///
+ /// for event in events.iter() {
+ /// println!("event={:?}", event);
+ /// }
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn iter(&self) -> Iter {
+ Iter {
+ inner: self,
+ pos: 0
+ }
+ }
+
+ /// Clearing all `Event` values from container explicitly.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Poll};
+ /// use std::time::Duration;
+ ///
+ /// let mut events = Events::with_capacity(1024);
+ /// let poll = Poll::new()?;
+ ///
+ /// // Register handles with `poll`
+ /// for _ in 0..2 {
+ /// events.clear();
+ /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
+ ///
+ /// for event in events.iter() {
+ /// println!("event={:?}", event);
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn clear(&mut self) {
+ self.inner.clear();
+ }
+}
+
+impl<'a> IntoIterator for &'a Events {
+ type Item = Event;
+ type IntoIter = Iter<'a>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter()
+ }
+}
+
+impl<'a> Iterator for Iter<'a> {
+ type Item = Event;
+
+ fn next(&mut self) -> Option<Event> {
+ let ret = self.inner.inner.get(self.pos);
+ self.pos += 1;
+ ret
+ }
+}
+
+impl IntoIterator for Events {
+ type Item = Event;
+ type IntoIter = IntoIter;
+
+ fn into_iter(self) -> Self::IntoIter {
+ IntoIter {
+ inner: self,
+ pos: 0,
+ }
+ }
+}
+
+impl Iterator for IntoIter {
+ type Item = Event;
+
+ fn next(&mut self) -> Option<Event> {
+ let ret = self.inner.inner.get(self.pos);
+ self.pos += 1;
+ ret
+ }
+}
+
+impl fmt::Debug for Events {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Events")
+ .field("capacity", &self.capacity())
+ .finish()
+ }
+}
+
+// ===== Accessors for internal usage =====
+
+pub fn selector(poll: &Poll) -> &sys::Selector {
+ &poll.selector
+}
+
+/*
+ *
+ * ===== Registration =====
+ *
+ */
+
+// TODO: get rid of this, windows depends on it for now
+#[allow(dead_code)]
+pub fn new_registration(poll: &Poll, token: Token, ready: Ready, opt: PollOpt)
+ -> (Registration, SetReadiness)
+{
+ Registration::new_priv(poll, token, ready, opt)
+}
+
+impl Registration {
+ /// Create and return a new `Registration` and the associated
+ /// `SetReadiness`.
+ ///
+ /// See [struct] documentation for more detail and [`Poll`]
+ /// for high level documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Ready, Registration, Poll, PollOpt, Token};
+ /// use std::thread;
+ ///
+ /// let (registration, set_readiness) = Registration::new2();
+ ///
+ /// thread::spawn(move || {
+ /// use std::time::Duration;
+ /// thread::sleep(Duration::from_millis(500));
+ ///
+ /// set_readiness.set_readiness(Ready::readable());
+ /// });
+ ///
+ /// let poll = Poll::new()?;
+ /// poll.register(&registration, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge())?;
+ ///
+ /// let mut events = Events::with_capacity(256);
+ ///
+ /// loop {
+ /// poll.poll(&mut events, None);
+ ///
+ /// for event in &events {
+ /// if event.token() == Token(0) && event.readiness().is_readable() {
+ /// return Ok(());
+ /// }
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ /// [struct]: #
+ /// [`Poll`]: struct.Poll.html
+ pub fn new2() -> (Registration, SetReadiness) {
+ // Allocate the registration node. The new node will have `ref_count`
+ // set to 2: one SetReadiness, one Registration.
+ let node = Box::into_raw(Box::new(ReadinessNode::new(
+ ptr::null_mut(), Token(0), Ready::empty(), PollOpt::empty(), 2)));
+
+ let registration = Registration {
+ inner: RegistrationInner {
+ node,
+ },
+ };
+
+ let set_readiness = SetReadiness {
+ inner: RegistrationInner {
+ node,
+ },
+ };
+
+ (registration, set_readiness)
+ }
+
+ #[deprecated(since = "0.6.5", note = "use `new2` instead")]
+ #[cfg(feature = "with-deprecated")]
+ #[doc(hidden)]
+ pub fn new(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
+ -> (Registration, SetReadiness)
+ {
+ Registration::new_priv(poll, token, interest, opt)
+ }
+
+ // TODO: Get rid of this (windows depends on it for now)
+ fn new_priv(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
+ -> (Registration, SetReadiness)
+ {
+ is_send::<Registration>();
+ is_sync::<Registration>();
+ is_send::<SetReadiness>();
+ is_sync::<SetReadiness>();
+
+ // Clone handle to the readiness queue, this bumps the ref count
+ let queue = poll.readiness_queue.inner.clone();
+
+ // Convert to a *mut () pointer
+ let queue: *mut () = unsafe { mem::transmute(queue) };
+
+ // Allocate the registration node. The new node will have `ref_count`
+ // set to 3: one SetReadiness, one Registration, and one Poll handle.
+ let node = Box::into_raw(Box::new(ReadinessNode::new(
+ queue, token, interest, opt, 3)));
+
+ let registration = Registration {
+ inner: RegistrationInner {
+ node,
+ },
+ };
+
+ let set_readiness = SetReadiness {
+ inner: RegistrationInner {
+ node,
+ },
+ };
+
+ (registration, set_readiness)
+ }
+
+ #[deprecated(since = "0.6.5", note = "use `Evented` impl")]
+ #[cfg(feature = "with-deprecated")]
+ #[doc(hidden)]
+ pub fn update(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.inner.update(poll, token, interest, opts)
+ }
+
+ #[deprecated(since = "0.6.5", note = "use `Poll::deregister` instead")]
+ #[cfg(feature = "with-deprecated")]
+ #[doc(hidden)]
+ pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
+ }
+}
+
+impl Evented for Registration {
+ fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.inner.update(poll, token, interest, opts)
+ }
+
+ fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
+ self.inner.update(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &Poll) -> io::Result<()> {
+ self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
+ }
+}
+
+impl Drop for Registration {
+ fn drop(&mut self) {
+ // `flag_as_dropped` toggles the `dropped` flag and notifies
+ // `Poll::poll` to release its handle (which is just decrementing
+ // the ref count).
+ if self.inner.state.flag_as_dropped() {
+ // Can't do anything if the queuing fails
+ let _ = self.inner.enqueue_with_wakeup();
+ }
+ }
+}
+
+impl fmt::Debug for Registration {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Registration")
+ .finish()
+ }
+}
+
+impl SetReadiness {
+ /// Returns the registration's current readiness.
+ ///
+ /// # Note
+ ///
+ /// There is no guarantee that `readiness` establishes any sort of memory
+ /// ordering. Any concurrent data access must be synchronized using another
+ /// strategy.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Registration, Ready};
+ ///
+ /// let (registration, set_readiness) = Registration::new2();
+ ///
+ /// assert!(set_readiness.readiness().is_empty());
+ ///
+ /// set_readiness.set_readiness(Ready::readable())?;
+ /// assert!(set_readiness.readiness().is_readable());
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ pub fn readiness(&self) -> Ready {
+ self.inner.readiness()
+ }
+
+ /// Set the registration's readiness
+ ///
+ /// If the associated `Registration` is registered with a [`Poll`] instance
+ /// and has requested readiness events that include `ready`, then a future
+ /// call to [`Poll::poll`] will receive a readiness event representing the
+ /// readiness state change.
+ ///
+ /// # Note
+ ///
+ /// There is no guarantee that `readiness` establishes any sort of memory
+ /// ordering. Any concurrent data access must be synchronized using another
+ /// strategy.
+ ///
+ /// There is also no guarantee as to when the readiness event will be
+ /// delivered to poll. A best attempt will be made to make the delivery in a
+ /// "timely" fashion. For example, the following is **not** guaranteed to
+ /// work:
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Events, Registration, Ready, Poll, PollOpt, Token};
+ ///
+ /// let poll = Poll::new()?;
+ /// let (registration, set_readiness) = Registration::new2();
+ ///
+ /// poll.register(&registration,
+ /// Token(0),
+ /// Ready::readable(),
+ /// PollOpt::edge())?;
+ ///
+ /// // Set the readiness, then immediately poll to try to get the readiness
+ /// // event
+ /// set_readiness.set_readiness(Ready::readable())?;
+ ///
+ /// let mut events = Events::with_capacity(1024);
+ /// poll.poll(&mut events, None)?;
+ ///
+ /// // There is NO guarantee that the following will work. It is possible
+ /// // that the readiness event will be delivered at a later time.
+ /// let event = events.get(0).unwrap();
+ /// assert_eq!(event.token(), Token(0));
+ /// assert!(event.readiness().is_readable());
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// # Examples
+ ///
+ /// A simple example, for a more elaborate example, see the [`Evented`]
+ /// documentation.
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// # fn try_main() -> Result<(), Box<Error>> {
+ /// use mio::{Registration, Ready};
+ ///
+ /// let (registration, set_readiness) = Registration::new2();
+ ///
+ /// assert!(set_readiness.readiness().is_empty());
+ ///
+ /// set_readiness.set_readiness(Ready::readable())?;
+ /// assert!(set_readiness.readiness().is_readable());
+ /// # Ok(())
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # try_main().unwrap();
+ /// # }
+ /// ```
+ ///
+ /// [`Registration`]: struct.Registration.html
+ /// [`Evented`]: event/trait.Evented.html#examples
+ /// [`Poll`]: struct.Poll.html
+ /// [`Poll::poll`]: struct.Poll.html#method.poll
+ pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
+ self.inner.set_readiness(ready)
+ }
+}
+
+impl fmt::Debug for SetReadiness {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("SetReadiness")
+ .finish()
+ }
+}
+
+impl RegistrationInner {
+ /// Get the registration's readiness.
+ fn readiness(&self) -> Ready {
+ self.state.load(Relaxed).readiness()
+ }
+
+ /// Set the registration's readiness.
+ ///
+ /// This function can be called concurrently by an arbitrary number of
+ /// SetReadiness handles.
+ fn set_readiness(&self, ready: Ready) -> io::Result<()> {
+ // Load the current atomic state.
+ let mut state = self.state.load(Acquire);
+ let mut next;
+
+ loop {
+ next = state;
+
+ if state.is_dropped() {
+ // Node is dropped, no more notifications
+ return Ok(());
+ }
+
+ // Update the readiness
+ next.set_readiness(ready);
+
+ // If the readiness is not blank, try to obtain permission to
+ // push the node into the readiness queue.
+ if !next.effective_readiness().is_empty() {
+ next.set_queued();
+ }
+
+ let actual = self.state.compare_and_swap(state, next, AcqRel);
+
+ if state == actual {
+ break;
+ }
+
+ state = actual;
+ }
+
+ if !state.is_queued() && next.is_queued() {
+ // We toggled the queued flag, making us responsible for queuing the
+ // node in the MPSC readiness queue.
+ self.enqueue_with_wakeup()?;
+ }
+
+ Ok(())
+ }
+
+ /// Update the registration details associated with the node
+ fn update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> {
+ // First, ensure poll instances match
+ //
+ // Load the queue pointer, `Relaxed` is sufficient here as only the
+ // pointer is being operated on. The actual memory is guaranteed to be
+ // visible the `poll: &Poll` ref passed as an argument to the function.
+ let mut queue = self.readiness_queue.load(Relaxed);
+ let other: &*mut () = unsafe {
+ &*(&poll.readiness_queue.inner as *const _ as *const *mut ())
+ };
+ let other = *other;
+
+ debug_assert!(mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>());
+
+ if queue.is_null() {
+ // Attempt to set the queue pointer. `Release` ordering synchronizes
+ // with `Acquire` in `ensure_with_wakeup`.
+ let actual = self.readiness_queue.compare_and_swap(
+ queue, other, Release);
+
+ if actual.is_null() {
+ // The CAS succeeded, this means that the node's ref count
+ // should be incremented to reflect that the `poll` function
+ // effectively owns the node as well.
+ //
+ // `Relaxed` ordering used for the same reason as in
+ // RegistrationInner::clone
+ self.ref_count.fetch_add(1, Relaxed);
+
+ // Note that the `queue` reference stored in our
+ // `readiness_queue` field is intended to be a strong reference,
+ // so now that we've successfully claimed the reference we bump
+ // the refcount here.
+ //
+ // Down below in `release_node` when we deallocate this
+ // `RegistrationInner` is where we'll transmute this back to an
+ // arc and decrement the reference count.
+ mem::forget(poll.readiness_queue.clone());
+ } else {
+ // The CAS failed, another thread set the queue pointer, so ensure
+ // that the pointer and `other` match
+ if actual != other {
+ return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
+ }
+ }
+
+ queue = other;
+ } else if queue != other {
+ return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
+ }
+
+ unsafe {
+ let actual = &poll.readiness_queue.inner as *const _ as *const usize;
+ debug_assert_eq!(queue as usize, *actual);
+ }
+
+ // The `update_lock` atomic is used as a flag ensuring only a single
+ // thread concurrently enters the `update` critical section. Any
+ // concurrent calls to update are discarded. If coordinated updates are
+ // required, the Mio user is responsible for handling that.
+ //
+ // Acquire / Release ordering is used on `update_lock` to ensure that
+ // data access to the `token_*` variables are scoped to the critical
+ // section.
+
+ // Acquire the update lock.
+ if self.update_lock.compare_and_swap(false, true, Acquire) {
+ // The lock is already held. Discard the update
+ return Ok(());
+ }
+
+ // Relaxed ordering is acceptable here as the only memory that needs to
+ // be visible as part of the update are the `token_*` variables, and
+ // ordering has already been handled by the `update_lock` access.
+ let mut state = self.state.load(Relaxed);
+ let mut next;
+
+ // Read the current token, again this memory has been ordered by the
+ // acquire on `update_lock`.
+ let curr_token_pos = state.token_write_pos();
+ let curr_token = unsafe { self::token(self, curr_token_pos) };
+
+ let mut next_token_pos = curr_token_pos;
+
+ // If the `update` call is changing the token, then compute the next
+ // available token slot and write the token there.
+ //
+ // Note that this computation is happening *outside* of the
+ // compare-and-swap loop. The update lock ensures that only a single
+ // thread could be mutating the write_token_position, so the
+ // `next_token_pos` will never need to be recomputed even if
+ // `token_read_pos` concurrently changes. This is because
+ // `token_read_pos` can ONLY concurrently change to the current value of
+ // `token_write_pos`, so `next_token_pos` will always remain valid.
+ if token != curr_token {
+ next_token_pos = state.next_token_pos();
+
+ // Update the token
+ match next_token_pos {
+ 0 => unsafe { *self.token_0.get() = token },
+ 1 => unsafe { *self.token_1.get() = token },
+ 2 => unsafe { *self.token_2.get() = token },
+ _ => unreachable!(),
+ }
+ }
+
+ // Now enter the compare-and-swap loop
+ loop {
+ next = state;
+
+ // The node is only dropped once all `Registration` handles are
+ // dropped. Only `Registration` can call `update`.
+ debug_assert!(!state.is_dropped());
+
+ // Update the write token position, this will also release the token
+ // to Poll::poll.
+ next.set_token_write_pos(next_token_pos);
+
+ // Update readiness and poll opts
+ next.set_interest(interest);
+ next.set_poll_opt(opt);
+
+ // If there is effective readiness, the node will need to be queued
+ // for processing. This exact behavior is still TBD, so we are
+ // conservative for now and always fire.
+ //
+ // See https://github.com/carllerche/mio/issues/535.
+ if !next.effective_readiness().is_empty() {
+ next.set_queued();
+ }
+
+ // compare-and-swap the state values. Only `Release` is needed here.
+ // The `Release` ensures that `Poll::poll` will see the token
+ // update and the update function doesn't care about any other
+ // memory visibility.
+ let actual = self.state.compare_and_swap(state, next, Release);
+
+ if actual == state {
+ break;
+ }
+
+ // CAS failed, but `curr_token_pos` should not have changed given
+ // that we still hold the update lock.
+ debug_assert_eq!(curr_token_pos, actual.token_write_pos());
+
+ state = actual;
+ }
+
+ // Release the lock
+ self.update_lock.store(false, Release);
+
+ if !state.is_queued() && next.is_queued() {
+ // We are responsible for enqueing the node.
+ enqueue_with_wakeup(queue, self)?;
+ }
+
+ Ok(())
+ }
+}
+
+impl ops::Deref for RegistrationInner {
+ type Target = ReadinessNode;
+
+ fn deref(&self) -> &ReadinessNode {
+ unsafe { &*self.node }
+ }
+}
+
+impl Clone for RegistrationInner {
+ fn clone(&self) -> RegistrationInner {
+ // Using a relaxed ordering is alright here, as knowledge of the
+ // original reference prevents other threads from erroneously deleting
+ // the object.
+ //
+ // As explained in the [Boost documentation][1], Increasing the
+ // reference counter can always be done with memory_order_relaxed: New
+ // references to an object can only be formed from an existing
+ // reference, and passing an existing reference from one thread to
+ // another must already provide any required synchronization.
+ //
+ // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
+ let old_size = self.ref_count.fetch_add(1, Relaxed);
+
+ // However we need to guard against massive refcounts in case someone
+ // is `mem::forget`ing Arcs. If we don't do this the count can overflow
+ // and users will use-after free. We racily saturate to `isize::MAX` on
+ // the assumption that there aren't ~2 billion threads incrementing
+ // the reference count at once. This branch will never be taken in
+ // any realistic program.
+ //
+ // We abort because such a program is incredibly degenerate, and we
+ // don't care to support it.
+ if old_size & !MAX_REFCOUNT != 0 {
+ process::abort();
+ }
+
+ RegistrationInner {
+ node: self.node,
+ }
+ }
+}
+
+impl Drop for RegistrationInner {
+ fn drop(&mut self) {
+ // Only handles releasing from `Registration` and `SetReadiness`
+ // handles. Poll has to call this itself.
+ release_node(self.node);
+ }
+}
+
+/*
+ *
+ * ===== ReadinessQueue =====
+ *
+ */
+
+impl ReadinessQueue {
+ /// Create a new `ReadinessQueue`.
+ fn new() -> io::Result<ReadinessQueue> {
+ is_send::<Self>();
+ is_sync::<Self>();
+
+ let end_marker = Box::new(ReadinessNode::marker());
+ let sleep_marker = Box::new(ReadinessNode::marker());
+ let closed_marker = Box::new(ReadinessNode::marker());
+
+ let ptr = &*end_marker as *const _ as *mut _;
+
+ Ok(ReadinessQueue {
+ inner: Arc::new(ReadinessQueueInner {
+ awakener: sys::Awakener::new()?,
+ head_readiness: AtomicPtr::new(ptr),
+ tail_readiness: UnsafeCell::new(ptr),
+ end_marker,
+ sleep_marker,
+ closed_marker,
+ })
+ })
+ }
+
+ /// Poll the queue for new events
+ fn poll(&self, dst: &mut sys::Events) {
+ // `until` is set with the first node that gets re-enqueued due to being
+ // set to have level-triggered notifications. This prevents an infinite
+ // loop where `Poll::poll` will keep dequeuing nodes it enqueues.
+ let mut until = ptr::null_mut();
+
+ if dst.len() == dst.capacity() {
+ // If `dst` is already full, the readiness queue won't be drained.
+ // This might result in `sleep_marker` staying in the queue and
+ // unecessary pipe writes occuring.
+ self.inner.clear_sleep_marker();
+ }
+
+ 'outer:
+ while dst.len() < dst.capacity() {
+ // Dequeue a node. If the queue is in an inconsistent state, then
+ // stop polling. `Poll::poll` will be called again shortly and enter
+ // a syscall, which should be enough to enable the other thread to
+ // finish the queuing process.
+ let ptr = match unsafe { self.inner.dequeue_node(until) } {
+ Dequeue::Empty | Dequeue::Inconsistent => break,
+ Dequeue::Data(ptr) => ptr,
+ };
+
+ let node = unsafe { &*ptr };
+
+ // Read the node state with Acquire ordering. This allows reading
+ // the token variables.
+ let mut state = node.state.load(Acquire);
+ let mut next;
+ let mut readiness;
+ let mut opt;
+
+ loop {
+ // Build up any changes to the readiness node's state and
+ // attempt the CAS at the end
+ next = state;
+
+ // Given that the node was just read from the queue, the
+ // `queued` flag should still be set.
+ debug_assert!(state.is_queued());
+
+ // The dropped flag means we need to release the node and
+ // perform no further processing on it.
+ if state.is_dropped() {
+ // Release the node and continue
+ release_node(ptr);
+ continue 'outer;
+ }
+
+ // Process the node
+ readiness = state.effective_readiness();
+ opt = state.poll_opt();
+
+ if opt.is_edge() {
+ // Mark the node as dequeued
+ next.set_dequeued();
+
+ if opt.is_oneshot() && !readiness.is_empty() {
+ next.disarm();
+ }
+ } else if readiness.is_empty() {
+ next.set_dequeued();
+ }
+
+ // Ensure `token_read_pos` is set to `token_write_pos` so that
+ // we read the most up to date token value.
+ next.update_token_read_pos();
+
+ if state == next {
+ break;
+ }
+
+ let actual = node.state.compare_and_swap(state, next, AcqRel);
+
+ if actual == state {
+ break;
+ }
+
+ state = actual;
+ }
+
+ // If the queued flag is still set, then the node must be requeued.
+ // This typically happens when using level-triggered notifications.
+ if next.is_queued() {
+ if until.is_null() {
+ // We never want to see the node again
+ until = ptr;
+ }
+
+ // Requeue the node
+ self.inner.enqueue_node(node);
+ }
+
+ if !readiness.is_empty() {
+ // Get the token
+ let token = unsafe { token(node, next.token_read_pos()) };
+
+ // Push the event
+ dst.push_event(Event::new(readiness, token));
+ }
+ }
+ }
+
+ /// Prepare the queue for the `Poll::poll` thread to block in the system
+ /// selector. This involves changing `head_readiness` to `sleep_marker`.
+ /// Returns true if successful and `poll` can block.
+ fn prepare_for_sleep(&self) -> bool {
+ let end_marker = self.inner.end_marker();
+ let sleep_marker = self.inner.sleep_marker();
+
+ let tail = unsafe { *self.inner.tail_readiness.get() };
+
+ // If the tail is currently set to the sleep_marker, then check if the
+ // head is as well. If it is, then the queue is currently ready to
+ // sleep. If it is not, then the queue is not empty and there should be
+ // no sleeping.
+ if tail == sleep_marker {
+ return self.inner.head_readiness.load(Acquire) == sleep_marker;
+ }
+
+ // If the tail is not currently set to `end_marker`, then the queue is
+ // not empty.
+ if tail != end_marker {
+ return false;
+ }
+
+ // The sleep marker is *not* currently in the readiness queue.
+ //
+ // The sleep marker is only inserted in this function. It is also only
+ // inserted in the tail position. This is guaranteed by first checking
+ // that the end marker is in the tail position, pushing the sleep marker
+ // after the end marker, then removing the end marker.
+ //
+ // Before inserting a node into the queue, the next pointer has to be
+ // set to null. Again, this is only safe to do when the node is not
+ // currently in the queue, but we already have ensured this.
+ self.inner.sleep_marker.next_readiness.store(ptr::null_mut(), Relaxed);
+
+ let actual = self.inner.head_readiness.compare_and_swap(
+ end_marker, sleep_marker, AcqRel);
+
+ debug_assert!(actual != sleep_marker);
+
+ if actual != end_marker {
+ // The readiness queue is not empty
+ return false;
+ }
+
+ // The current tail should be pointing to `end_marker`
+ debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
+ // The `end_marker` next pointer should be null
+ debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
+
+ // Update tail pointer.
+ unsafe { *self.inner.tail_readiness.get() = sleep_marker; }
+ true
+ }
+}
+
+impl Drop for ReadinessQueue {
+ fn drop(&mut self) {
+ // Close the queue by enqueuing the closed node
+ self.inner.enqueue_node(&*self.inner.closed_marker);
+
+ loop {
+ // Free any nodes that happen to be left in the readiness queue
+ let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
+ Dequeue::Empty => break,
+ Dequeue::Inconsistent => {
+ // This really shouldn't be possible as all other handles to
+ // `ReadinessQueueInner` are dropped, but handle this by
+ // spinning I guess?
+ continue;
+ }
+ Dequeue::Data(ptr) => ptr,
+ };
+
+ let node = unsafe { &*ptr };
+
+ let state = node.state.load(Acquire);
+
+ debug_assert!(state.is_queued());
+
+ release_node(ptr);
+ }
+ }
+}
+
+impl ReadinessQueueInner {
+ fn wakeup(&self) -> io::Result<()> {
+ self.awakener.wakeup()
+ }
+
+ /// Prepend the given node to the head of the readiness queue. This is done
+ /// with relaxed ordering. Returns true if `Poll` needs to be woken up.
+ fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
+ if self.enqueue_node(node) {
+ self.wakeup()?;
+ }
+
+ Ok(())
+ }
+
+ /// Push the node into the readiness queue
+ fn enqueue_node(&self, node: &ReadinessNode) -> bool {
+ // This is the 1024cores.net intrusive MPSC queue [1] "push" function.
+ let node_ptr = node as *const _ as *mut _;
+
+ // Relaxed used as the ordering is "released" when swapping
+ // `head_readiness`
+ node.next_readiness.store(ptr::null_mut(), Relaxed);
+
+ unsafe {
+ let mut prev = self.head_readiness.load(Acquire);
+
+ loop {
+ if prev == self.closed_marker() {
+ debug_assert!(node_ptr != self.closed_marker());
+ // debug_assert!(node_ptr != self.end_marker());
+ debug_assert!(node_ptr != self.sleep_marker());
+
+ if node_ptr != self.end_marker() {
+ // The readiness queue is shutdown, but the enqueue flag was
+ // set. This means that we are responsible for decrementing
+ // the ready queue's ref count
+ debug_assert!(node.ref_count.load(Relaxed) >= 2);
+ release_node(node_ptr);
+ }
+
+ return false;
+ }
+
+ let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
+
+ if prev == act {
+ break;
+ }
+
+ prev = act;
+ }
+
+ debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
+
+ (*prev).next_readiness.store(node_ptr, Release);
+
+ prev == self.sleep_marker()
+ }
+ }
+
+ fn clear_sleep_marker(&self) {
+ let end_marker = self.end_marker();
+ let sleep_marker = self.sleep_marker();
+
+ unsafe {
+ let tail = *self.tail_readiness.get();
+
+ if tail != self.sleep_marker() {
+ return;
+ }
+
+ // The empty markeer is *not* currently in the readiness queue
+ // (since the sleep markeris).
+ self.end_marker.next_readiness.store(ptr::null_mut(), Relaxed);
+
+ let actual = self.head_readiness.compare_and_swap(
+ sleep_marker, end_marker, AcqRel);
+
+ debug_assert!(actual != end_marker);
+
+ if actual != sleep_marker {
+ // The readiness queue is not empty, we cannot remove the sleep
+ // markeer
+ return;
+ }
+
+ // Update the tail pointer.
+ *self.tail_readiness.get() = end_marker;
+ }
+ }
+
+ /// Must only be called in `poll` or `drop`
+ unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
+ // This is the 1024cores.net intrusive MPSC queue [1] "pop" function
+ // with the modifications mentioned at the top of the file.
+ let mut tail = *self.tail_readiness.get();
+ let mut next = (*tail).next_readiness.load(Acquire);
+
+ if tail == self.end_marker() || tail == self.sleep_marker() || tail == self.closed_marker() {
+ if next.is_null() {
+ // Make sure the sleep marker is removed (as we are no longer
+ // sleeping
+ self.clear_sleep_marker();
+
+ return Dequeue::Empty;
+ }
+
+ *self.tail_readiness.get() = next;
+ tail = next;
+ next = (*next).next_readiness.load(Acquire);
+ }
+
+ // Only need to check `until` at this point. `until` is either null,
+ // which will never match tail OR it is a node that was pushed by
+ // the current thread. This means that either:
+ //
+ // 1) The queue is inconsistent, which is handled explicitly
+ // 2) We encounter `until` at this point in dequeue
+ // 3) we will pop a different node
+ if tail == until {
+ return Dequeue::Empty;
+ }
+
+ if !next.is_null() {
+ *self.tail_readiness.get() = next;
+ return Dequeue::Data(tail);
+ }
+
+ if self.head_readiness.load(Acquire) != tail {
+ return Dequeue::Inconsistent;
+ }
+
+ // Push the stub node
+ self.enqueue_node(&*self.end_marker);
+
+ next = (*tail).next_readiness.load(Acquire);
+
+ if !next.is_null() {
+ *self.tail_readiness.get() = next;
+ return Dequeue::Data(tail);
+ }
+
+ Dequeue::Inconsistent
+ }
+
+ fn end_marker(&self) -> *mut ReadinessNode {
+ &*self.end_marker as *const ReadinessNode as *mut ReadinessNode
+ }
+
+ fn sleep_marker(&self) -> *mut ReadinessNode {
+ &*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
+ }
+
+ fn closed_marker(&self) -> *mut ReadinessNode {
+ &*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
+ }
+}
+
+impl ReadinessNode {
+ /// Return a new `ReadinessNode`, initialized with a ref_count of 3.
+ fn new(queue: *mut (),
+ token: Token,
+ interest: Ready,
+ opt: PollOpt,
+ ref_count: usize) -> ReadinessNode
+ {
+ ReadinessNode {
+ state: AtomicState::new(interest, opt),
+ // Only the first token is set, the others are initialized to 0
+ token_0: UnsafeCell::new(token),
+ token_1: UnsafeCell::new(Token(0)),
+ token_2: UnsafeCell::new(Token(0)),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ update_lock: AtomicBool::new(false),
+ readiness_queue: AtomicPtr::new(queue),
+ ref_count: AtomicUsize::new(ref_count),
+ }
+ }
+
+ fn marker() -> ReadinessNode {
+ ReadinessNode {
+ state: AtomicState::new(Ready::empty(), PollOpt::empty()),
+ token_0: UnsafeCell::new(Token(0)),
+ token_1: UnsafeCell::new(Token(0)),
+ token_2: UnsafeCell::new(Token(0)),
+ next_readiness: AtomicPtr::new(ptr::null_mut()),
+ update_lock: AtomicBool::new(false),
+ readiness_queue: AtomicPtr::new(ptr::null_mut()),
+ ref_count: AtomicUsize::new(0),
+ }
+ }
+
+ fn enqueue_with_wakeup(&self) -> io::Result<()> {
+ let queue = self.readiness_queue.load(Acquire);
+
+ if queue.is_null() {
+ // Not associated with a queue, nothing to do
+ return Ok(());
+ }
+
+ enqueue_with_wakeup(queue, self)
+ }
+}
+
+fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
+ debug_assert!(!queue.is_null());
+ // This is ugly... but we don't want to bump the ref count.
+ let queue: &Arc<ReadinessQueueInner> = unsafe {
+ &*(&queue as *const *mut () as *const Arc<ReadinessQueueInner>)
+ };
+ queue.enqueue_node_with_wakeup(node)
+}
+
+unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
+ match pos {
+ 0 => *node.token_0.get(),
+ 1 => *node.token_1.get(),
+ 2 => *node.token_2.get(),
+ _ => unreachable!(),
+ }
+}
+
+fn release_node(ptr: *mut ReadinessNode) {
+ unsafe {
+ // `AcqRel` synchronizes with other `release_node` functions and ensures
+ // that the drop happens after any reads / writes on other threads.
+ if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
+ return;
+ }
+
+ let node = Box::from_raw(ptr);
+
+ // Decrement the readiness_queue Arc
+ let queue = node.readiness_queue.load(Acquire);
+
+ if queue.is_null() {
+ return;
+ }
+
+ let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
+ }
+}
+
+impl AtomicState {
+ fn new(interest: Ready, opt: PollOpt) -> AtomicState {
+ let state = ReadinessState::new(interest, opt);
+
+ AtomicState {
+ inner: AtomicUsize::new(state.into()),
+ }
+ }
+
+ /// Loads the current `ReadinessState`
+ fn load(&self, order: Ordering) -> ReadinessState {
+ self.inner.load(order).into()
+ }
+
+ /// Stores a state if the current state is the same as `current`.
+ fn compare_and_swap(&self, current: ReadinessState, new: ReadinessState, order: Ordering) -> ReadinessState {
+ self.inner.compare_and_swap(current.into(), new.into(), order).into()
+ }
+
+ // Returns `true` if the node should be queued
+ fn flag_as_dropped(&self) -> bool {
+ let prev: ReadinessState = self.inner.fetch_or(DROPPED_MASK | QUEUED_MASK, Release).into();
+ // The flag should not have been previously set
+ debug_assert!(!prev.is_dropped());
+
+ !prev.is_queued()
+ }
+}
+
+impl ReadinessState {
+ // Create a `ReadinessState` initialized with the provided arguments
+ #[inline]
+ fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
+ let interest = event::ready_as_usize(interest);
+ let opt = event::opt_as_usize(opt);
+
+ debug_assert!(interest <= MASK_4);
+ debug_assert!(opt <= MASK_4);
+
+ let mut val = interest << INTEREST_SHIFT;
+ val |= opt << POLL_OPT_SHIFT;
+
+ ReadinessState(val)
+ }
+
+ #[inline]
+ fn get(self, mask: usize, shift: usize) -> usize{
+ (self.0 >> shift) & mask
+ }
+
+ #[inline]
+ fn set(&mut self, val: usize, mask: usize, shift: usize) {
+ self.0 = (self.0 & !(mask << shift)) | (val << shift)
+ }
+
+ /// Get the readiness
+ #[inline]
+ fn readiness(self) -> Ready {
+ let v = self.get(MASK_4, READINESS_SHIFT);
+ event::ready_from_usize(v)
+ }
+
+ #[inline]
+ fn effective_readiness(self) -> Ready {
+ self.readiness() & self.interest()
+ }
+
+ /// Set the readiness
+ #[inline]
+ fn set_readiness(&mut self, v: Ready) {
+ self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
+ }
+
+ /// Get the interest
+ #[inline]
+ fn interest(self) -> Ready {
+ let v = self.get(MASK_4, INTEREST_SHIFT);
+ event::ready_from_usize(v)
+ }
+
+ /// Set the interest
+ #[inline]
+ fn set_interest(&mut self, v: Ready) {
+ self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
+ }
+
+ #[inline]
+ fn disarm(&mut self) {
+ self.set_interest(Ready::empty());
+ }
+
+ /// Get the poll options
+ #[inline]
+ fn poll_opt(self) -> PollOpt {
+ let v = self.get(MASK_4, POLL_OPT_SHIFT);
+ event::opt_from_usize(v)
+ }
+
+ /// Set the poll options
+ #[inline]
+ fn set_poll_opt(&mut self, v: PollOpt) {
+ self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
+ }
+
+ #[inline]
+ fn is_queued(self) -> bool {
+ self.0 & QUEUED_MASK == QUEUED_MASK
+ }
+
+ /// Set the queued flag
+ #[inline]
+ fn set_queued(&mut self) {
+ // Dropped nodes should never be queued
+ debug_assert!(!self.is_dropped());
+ self.0 |= QUEUED_MASK;
+ }
+
+ #[inline]
+ fn set_dequeued(&mut self) {
+ debug_assert!(self.is_queued());
+ self.0 &= !QUEUED_MASK
+ }
+
+ #[inline]
+ fn is_dropped(self) -> bool {
+ self.0 & DROPPED_MASK == DROPPED_MASK
+ }
+
+ #[inline]
+ fn token_read_pos(self) -> usize {
+ self.get(MASK_2, TOKEN_RD_SHIFT)
+ }
+
+ #[inline]
+ fn token_write_pos(self) -> usize {
+ self.get(MASK_2, TOKEN_WR_SHIFT)
+ }
+
+ #[inline]
+ fn next_token_pos(self) -> usize {
+ let rd = self.token_read_pos();
+ let wr = self.token_write_pos();
+
+ match wr {
+ 0 => {
+ match rd {
+ 1 => 2,
+ 2 => 1,
+ 0 => 1,
+ _ => unreachable!(),
+ }
+ }
+ 1 => {
+ match rd {
+ 0 => 2,
+ 2 => 0,
+ 1 => 2,
+ _ => unreachable!(),
+ }
+ }
+ 2 => {
+ match rd {
+ 0 => 1,
+ 1 => 0,
+ 2 => 0,
+ _ => unreachable!(),
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ #[inline]
+ fn set_token_write_pos(&mut self, val: usize) {
+ self.set(val, MASK_2, TOKEN_WR_SHIFT);
+ }
+
+ #[inline]
+ fn update_token_read_pos(&mut self) {
+ let val = self.token_write_pos();
+ self.set(val, MASK_2, TOKEN_RD_SHIFT);
+ }
+}
+
+impl From<ReadinessState> for usize {
+ fn from(src: ReadinessState) -> usize {
+ src.0
+ }
+}
+
+impl From<usize> for ReadinessState {
+ fn from(src: usize) -> ReadinessState {
+ ReadinessState(src)
+ }
+}
+
+fn is_send<T: Send>() {}
+fn is_sync<T: Sync>() {}
+
+impl SelectorId {
+ pub fn new() -> SelectorId {
+ SelectorId {
+ id: AtomicUsize::new(0),
+ }
+ }
+
+ pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
+ let selector_id = self.id.load(Ordering::SeqCst);
+
+ if selector_id != 0 && selector_id != poll.selector.id() {
+ Err(io::Error::new(io::ErrorKind::Other, "socket already registered"))
+ } else {
+ self.id.store(poll.selector.id(), Ordering::SeqCst);
+ Ok(())
+ }
+ }
+}
+
+impl Clone for SelectorId {
+ fn clone(&self) -> SelectorId {
+ SelectorId {
+ id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
+ }
+ }
+}
+
+#[test]
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+pub fn as_raw_fd() {
+ let poll = Poll::new().unwrap();
+ assert!(poll.as_raw_fd() > 0);
+}