summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-reactor/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-reactor/src
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-reactor/src')
-rw-r--r--third_party/rust/tokio-reactor/src/atomic_task.rs297
-rw-r--r--third_party/rust/tokio-reactor/src/background.rs217
-rw-r--r--third_party/rust/tokio-reactor/src/lib.rs773
-rw-r--r--third_party/rust/tokio-reactor/src/poll_evented.rs664
-rw-r--r--third_party/rust/tokio-reactor/src/registration.rs569
5 files changed, 2520 insertions, 0 deletions
diff --git a/third_party/rust/tokio-reactor/src/atomic_task.rs b/third_party/rust/tokio-reactor/src/atomic_task.rs
new file mode 100644
index 0000000000..b48dca402c
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/atomic_task.rs
@@ -0,0 +1,297 @@
+#![allow(dead_code)]
+
+use super::Task;
+
+use std::fmt;
+use std::cell::UnsafeCell;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{Acquire, Release, AcqRel};
+
+/// A synchronization primitive for task notification.
+///
+/// `AtomicTask` will coordinate concurrent notifications with the consumer
+/// potentially "updating" the underlying task to notify. This is useful in
+/// scenarios where a computation completes in another thread and wants to
+/// notify the consumer, but the consumer is in the process of being migrated to
+/// a new logical task.
+///
+/// Consumers should call `register` before checking the result of a computation
+/// and producers should call `notify` after producing the computation (this
+/// differs from the usual `thread::park` pattern). It is also permitted for
+/// `notify` to be called **before** `register`. This results in a no-op.
+///
+/// A single `AtomicTask` may be reused for any number of calls to `register` or
+/// `notify`.
+///
+/// `AtomicTask` does not provide any memory ordering guarantees, as such the
+/// user should use caution and use other synchronization primitives to guard
+/// the result of the underlying computation.
+pub(crate) struct AtomicTask {
+ state: AtomicUsize,
+ task: UnsafeCell<Option<Task>>,
+}
+
+// `AtomicTask` is a multi-consumer, single-producer transfer cell. The cell
+// stores a `Task` value produced by calls to `register` and many threads can
+// race to take the task (to notify it) by calling `notify.
+//
+// If a new `Task` instance is produced by calling `register` before an existing
+// one is consumed, then the existing one is overwritten.
+//
+// While `AtomicTask` is single-producer, the implementation ensures memory
+// safety. In the event of concurrent calls to `register`, there will be a
+// single winner whose task will get stored in the cell. The losers will not
+// have their tasks notified. As such, callers should ensure to add
+// synchronization to calls to `register`.
+//
+// The implementation uses a single `AtomicUsize` value to coordinate access to
+// the `Task` cell. There are two bits that are operated on independently. These
+// are represented by `REGISTERING` and `NOTIFYING`.
+//
+// The `REGISTERING` bit is set when a producer enters the critical section. The
+// `NOTIFYING` bit is set when a consumer enters the critical section. Neither
+// bit being set is represented by `WAITING`.
+//
+// A thread obtains an exclusive lock on the task cell by transitioning the
+// state from `WAITING` to `REGISTERING` or `NOTIFYING`, depending on the
+// operation the thread wishes to perform. When this transition is made, it is
+// guaranteed that no other thread will access the task cell.
+//
+// # Registering
+//
+// On a call to `register`, an attempt to transition the state from WAITING to
+// REGISTERING is made. On success, the caller obtains a lock on the task cell.
+//
+// If the lock is obtained, then the thread sets the task cell to the task
+// provided as an argument. Then it attempts to transition the state back from
+// `REGISTERING` -> `WAITING`.
+//
+// If this transition is successful, then the registering process is complete
+// and the next call to `notify` will observe the task.
+//
+// If the transition fails, then there was a concurrent call to `notify` that
+// was unable to access the task cell (due to the registering thread holding the
+// lock). To handle this, the registering thread removes the task it just set
+// from the cell and calls `notify` on it. This call to notify represents the
+// attempt to notify by the other thread (that set the `NOTIFYING` bit). The
+// state is then transitioned from `REGISTERING | NOTIFYING` back to `WAITING`.
+// This transition must succeed because, at this point, the state cannot be
+// transitioned by another thread.
+//
+// # Notifying
+//
+// On a call to `notify`, an attempt to transition the state from `WAITING` to
+// `NOTIFYING` is made. On success, the caller obtains a lock on the task cell.
+//
+// If the lock is obtained, then the thread takes ownership of the current value
+// in the task cell, and calls `notify` on it. The state is then transitioned
+// back to `WAITING`. This transition must succeed as, at this point, the state
+// cannot be transitioned by another thread.
+//
+// If the thread is unable to obtain the lock, the `NOTIFYING` bit is still.
+// This is because it has either been set by the current thread but the previous
+// value included the `REGISTERING` bit **or** a concurrent thread is in the
+// `NOTIFYING` critical section. Either way, no action must be taken.
+//
+// If the current thread is the only concurrent call to `notify` and another
+// thread is in the `register` critical section, when the other thread **exits**
+// the `register` critical section, it will observe the `NOTIFYING` bit and
+// handle the notify itself.
+//
+// If another thread is in the `notify` critical section, then it will handle
+// notifying the task.
+//
+// # A potential race (is safely handled).
+//
+// Imagine the following situation:
+//
+// * Thread A obtains the `notify` lock and notifies a task.
+//
+// * Before thread A releases the `notify` lock, the notified task is scheduled.
+//
+// * Thread B attempts to notify the task. In theory this should result in the
+// task being notified, but it cannot because thread A still holds the notify
+// lock.
+//
+// This case is handled by requiring users of `AtomicTask` to call `register`
+// **before** attempting to observe the application state change that resulted
+// in the task being notified. The notifiers also change the application state
+// before calling notify.
+//
+// Because of this, the task will do one of two things.
+//
+// 1) Observe the application state change that Thread B is notifying on. In
+// this case, it is OK for Thread B's notification to be lost.
+//
+// 2) Call register before attempting to observe the application state. Since
+// Thread A still holds the `notify` lock, the call to `register` will result
+// in the task notifying itself and get scheduled again.
+
+/// Idle state
+const WAITING: usize = 0;
+
+/// A new task value is being registered with the `AtomicTask` cell.
+const REGISTERING: usize = 0b01;
+
+/// The task currently registered with the `AtomicTask` cell is being notified.
+const NOTIFYING: usize = 0b10;
+
+impl AtomicTask {
+ /// Create an `AtomicTask` initialized with the given `Task`
+ pub fn new() -> AtomicTask {
+ // Make sure that task is Sync
+ trait AssertSync: Sync {}
+ impl AssertSync for Task {}
+
+ AtomicTask {
+ state: AtomicUsize::new(WAITING),
+ task: UnsafeCell::new(None),
+ }
+ }
+
+ /// Registers the provided task to be notified on calls to `notify`.
+ ///
+ /// The new task will take place of any previous tasks that were registered
+ /// by previous calls to `register`. Any calls to `notify` that happen after
+ /// a call to `register` (as defined by the memory ordering rules), will
+ /// notify the `register` caller's task.
+ ///
+ /// It is safe to call `register` with multiple other threads concurrently
+ /// calling `notify`. This will result in the `register` caller's current
+ /// task being notified once.
+ ///
+ /// This function is safe to call concurrently, but this is generally a bad
+ /// idea. Concurrent calls to `register` will attempt to register different
+ /// tasks to be notified. One of the callers will win and have its task set,
+ /// but there is no guarantee as to which caller will succeed.
+ pub fn register_task(&self, task: Task) {
+ match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
+ WAITING => {
+ unsafe {
+ // Locked acquired, update the waker cell
+ *self.task.get() = Some(task.clone());
+
+ // Release the lock. If the state transitioned to include
+ // the `NOTIFYING` bit, this means that a notify has been
+ // called concurrently, so we have to remove the task and
+ // notify it.`
+ //
+ // Start by assuming that the state is `REGISTERING` as this
+ // is what we jut set it to.
+ let mut curr = REGISTERING;
+
+ // If a task has to be notified, it will be set here.
+ let mut notify: Option<Task> = None;
+
+ loop {
+ let res = self.state.compare_exchange(
+ curr, WAITING, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => {
+ // The atomic exchange was successful, now
+ // notify the task (if set) and return.
+ if let Some(task) = notify {
+ task.notify();
+ }
+
+ return;
+ }
+ Err(actual) => {
+ // This branch can only be reached if a
+ // concurrent thread called `notify`. In this
+ // case, `actual` **must** be `REGISTERING |
+ // `NOTIFYING`.
+ debug_assert_eq!(actual, REGISTERING | NOTIFYING);
+
+ // Take the task to notify once the atomic operation has
+ // completed.
+ notify = (*self.task.get()).take();
+
+ // Update `curr` for the next iteration of the
+ // loop
+ curr = actual;
+ }
+ }
+ }
+ }
+ }
+ NOTIFYING => {
+ // Currently in the process of notifying the task, i.e.,
+ // `notify` is currently being called on the old task handle.
+ // So, we call notify on the new task handle
+ task.notify();
+ }
+ state => {
+ // In this case, a concurrent thread is holding the
+ // "registering" lock. This probably indicates a bug in the
+ // caller's code as racing to call `register` doesn't make much
+ // sense.
+ //
+ // We just want to maintain memory safety. It is ok to drop the
+ // call to `register`.
+ debug_assert!(
+ state == REGISTERING ||
+ state == REGISTERING | NOTIFYING);
+ }
+ }
+ }
+
+ /// Attempts to take the `Task` value out of the `AtomicTask` with the
+ /// intention that the caller will notify the task.
+ pub fn take_to_notify(&self) -> Option<Task> {
+ // AcqRel ordering is used in order to acquire the value of the `task`
+ // cell as well as to establish a `release` ordering with whatever
+ // memory the `AtomicTask` is associated with.
+ match self.state.fetch_or(NOTIFYING, AcqRel) {
+ WAITING => {
+ // The notifying lock has been acquired.
+ let task = unsafe { (*self.task.get()).take() };
+
+ // Release the lock
+ self.state.fetch_and(!NOTIFYING, Release);
+
+ task
+ }
+ state => {
+ // There is a concurrent thread currently updating the
+ // associated task.
+ //
+ // Nothing more to do as the `NOTIFYING` bit has been set. It
+ // doesn't matter if there are concurrent registering threads or
+ // not.
+ //
+ debug_assert!(
+ state == REGISTERING ||
+ state == REGISTERING | NOTIFYING ||
+ state == NOTIFYING);
+
+ None
+ }
+ }
+ }
+
+ /// Notifies the task that last called `register`.
+ ///
+ /// If `register` has not been called yet, then this does nothing.
+ pub fn notify(&self) {
+ if let Some(task) = self.take_to_notify() {
+ task.notify();
+ }
+ }
+}
+
+impl Default for AtomicTask {
+ fn default() -> Self {
+ AtomicTask::new()
+ }
+}
+
+impl fmt::Debug for AtomicTask {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "AtomicTask")
+ }
+}
+
+unsafe impl Send for AtomicTask {}
+unsafe impl Sync for AtomicTask {}
diff --git a/third_party/rust/tokio-reactor/src/background.rs b/third_party/rust/tokio-reactor/src/background.rs
new file mode 100644
index 0000000000..88f78a8bcb
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/background.rs
@@ -0,0 +1,217 @@
+use {Reactor, Handle, Task};
+use atomic_task::AtomicTask;
+
+use futures::{Future, Async, Poll, task};
+
+use std::io;
+use std::thread;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// Handle to the reactor running on a background thread.
+///
+/// Instances are created by calling [`Reactor::background`].
+///
+/// [`Reactor::background`]: struct.Reactor.html#method.background
+#[derive(Debug)]
+pub struct Background {
+ /// When `None`, the reactor thread will run until the process terminates.
+ inner: Option<Inner>,
+}
+
+/// Future that resolves when the reactor thread has shutdown.
+#[derive(Debug)]
+pub struct Shutdown {
+ inner: Inner,
+}
+
+/// Actual Background handle.
+#[derive(Debug)]
+struct Inner {
+ /// Handle to the reactor
+ handle: Handle,
+
+ /// Shared state between the background handle and the reactor thread.
+ shared: Arc<Shared>,
+}
+
+#[derive(Debug)]
+struct Shared {
+ /// Signal the reactor thread to shutdown.
+ shutdown: AtomicUsize,
+
+ /// Task to notify when the reactor thread enters a shutdown state.
+ shutdown_task: AtomicTask,
+}
+
+/// Notifies the reactor thread to shutdown once the reactor becomes idle.
+const SHUTDOWN_IDLE: usize = 1;
+
+/// Notifies the reactor thread to shutdown immediately.
+const SHUTDOWN_NOW: usize = 2;
+
+/// The reactor is currently shutdown.
+const SHUTDOWN: usize = 3;
+
+// ===== impl Background =====
+
+impl Background {
+ /// Launch a reactor in the background and return a handle to the thread.
+ pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
+ // Grab a handle to the reactor
+ let handle = reactor.handle().clone();
+
+ // Create the state shared between the background handle and the reactor
+ // thread.
+ let shared = Arc::new(Shared {
+ shutdown: AtomicUsize::new(0),
+ shutdown_task: AtomicTask::new(),
+ });
+
+ // For the reactor thread
+ let shared2 = shared.clone();
+
+ // Start the reactor thread
+ thread::Builder::new()
+ .spawn(move || run(reactor, shared2))?;
+
+ Ok(Background {
+ inner: Some(Inner {
+ handle,
+ shared,
+ }),
+ })
+ }
+
+ /// Returns a reference to the reactor handle.
+ pub fn handle(&self) -> &Handle {
+ &self.inner.as_ref().unwrap().handle
+ }
+
+ /// Shutdown the reactor on idle.
+ ///
+ /// Returns a future that completes once the reactor thread has shutdown.
+ pub fn shutdown_on_idle(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.shutdown_on_idle();
+
+ Shutdown { inner }
+ }
+
+ /// Shutdown the reactor immediately
+ ///
+ /// Returns a future that completes once the reactor thread has shutdown.
+ pub fn shutdown_now(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.shutdown_now();
+
+ Shutdown { inner }
+ }
+
+ /// Run the reactor on its thread until the process terminates.
+ pub fn forget(mut self) {
+ drop(self.inner.take());
+ }
+}
+
+impl Drop for Background {
+ fn drop(&mut self) {
+ let inner = match self.inner.take() {
+ Some(i) => i,
+ None => return,
+ };
+
+ inner.shutdown_now();
+
+ let shutdown = Shutdown { inner };
+ let _ = shutdown.wait();
+ }
+}
+
+// ===== impl Shutdown =====
+
+impl Future for Shutdown {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ let task = Task::Futures1(task::current());
+ self.inner.shared.shutdown_task.register_task(task);
+
+ if !self.inner.is_shutdown() {
+ return Ok(Async::NotReady);
+ }
+
+ Ok(().into())
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ /// Returns true if the reactor thread is shutdown.
+ fn is_shutdown(&self) -> bool {
+ self.shared.shutdown.load(SeqCst) == SHUTDOWN
+ }
+
+ /// Notify the reactor thread to shutdown once the reactor transitions to an
+ /// idle state.
+ fn shutdown_on_idle(&self) {
+ self.shared.shutdown
+ .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
+ self.handle.wakeup();
+ }
+
+ /// Notify the reactor thread to shutdown immediately.
+ fn shutdown_now(&self) {
+ let mut curr = self.shared.shutdown.load(SeqCst);
+
+ loop {
+ if curr >= SHUTDOWN_NOW {
+ return;
+ }
+
+ let act = self.shared.shutdown
+ .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
+
+ if act == curr {
+ self.handle.wakeup();
+ return;
+ }
+
+ curr = act;
+ }
+ }
+}
+
+// ===== impl Reactor thread =====
+
+fn run(mut reactor: Reactor, shared: Arc<Shared>) {
+ debug!("starting background reactor");
+ loop {
+ let shutdown = shared.shutdown.load(SeqCst);
+
+ if shutdown == SHUTDOWN_NOW {
+ debug!("shutting background reactor down NOW");
+ break;
+ }
+
+ if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
+ debug!("shutting background reactor on idle");
+ break;
+ }
+
+ reactor.turn(None).unwrap();
+ }
+
+ drop(reactor);
+
+ // Transition the state to shutdown
+ shared.shutdown.store(SHUTDOWN, SeqCst);
+
+ // Notify any waiters
+ shared.shutdown_task.notify();
+
+ debug!("background reactor has shutdown");
+}
diff --git a/third_party/rust/tokio-reactor/src/lib.rs b/third_party/rust/tokio-reactor/src/lib.rs
new file mode 100644
index 0000000000..381d77af03
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/lib.rs
@@ -0,0 +1,773 @@
+//! Event loop that drives Tokio I/O resources.
+//!
+//! The reactor is the engine that drives asynchronous I/O resources (like TCP and
+//! UDP sockets). It is backed by [`mio`] and acts as a bridge between [`mio`] and
+//! [`futures`].
+//!
+//! The crate provides:
+//!
+//! * [`Reactor`] is the main type of this crate. It performs the event loop logic.
+//!
+//! * [`Handle`] provides a reference to a reactor instance.
+//!
+//! * [`Registration`] and [`PollEvented`] allow third parties to implement I/O
+//! resources that are driven by the reactor.
+//!
+//! Application authors will not use this crate directly. Instead, they will use the
+//! `tokio` crate. Library authors should only depend on `tokio-reactor` if they
+//! are building a custom I/O resource.
+//!
+//! For more details, see [reactor module] documentation in the Tokio crate.
+//!
+//! [`mio`]: http://github.com/carllerche/mio
+//! [`futures`]: http://github.com/rust-lang-nursery/futures-rs
+//! [`Reactor`]: struct.Reactor.html
+//! [`Handle`]: struct.Handle.html
+//! [`Registration`]: struct.Registration.html
+//! [`PollEvented`]: struct.PollEvented.html
+//! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html
+
+#![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.3")]
+#![deny(missing_docs, warnings, missing_debug_implementations)]
+
+#[macro_use]
+extern crate futures;
+#[macro_use]
+extern crate log;
+extern crate mio;
+extern crate slab;
+extern crate tokio_executor;
+extern crate tokio_io;
+
+#[cfg(feature = "unstable-futures")]
+extern crate futures2;
+
+mod atomic_task;
+pub(crate) mod background;
+mod poll_evented;
+mod registration;
+
+// ===== Public re-exports =====
+
+pub use self::background::{Background, Shutdown};
+pub use self::registration::Registration;
+pub use self::poll_evented::PollEvented;
+
+// ===== Private imports =====
+
+use atomic_task::AtomicTask;
+
+use tokio_executor::Enter;
+use tokio_executor::park::{Park, Unpark};
+
+use std::{fmt, usize};
+use std::io;
+use std::mem;
+use std::cell::RefCell;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
+use std::sync::{Arc, Weak, RwLock};
+use std::time::{Duration, Instant};
+
+use log::Level;
+use mio::event::Evented;
+use slab::Slab;
+
+/// The core reactor, or event loop.
+///
+/// The event loop is the main source of blocking in an application which drives
+/// all other I/O events and notifications happening. Each event loop can have
+/// multiple handles pointing to it, each of which can then be used to create
+/// various I/O objects to interact with the event loop in interesting ways.
+pub struct Reactor {
+ /// Reuse the `mio::Events` value across calls to poll.
+ events: mio::Events,
+
+ /// State shared between the reactor and the handles.
+ inner: Arc<Inner>,
+
+ _wakeup_registration: mio::Registration,
+}
+
+/// A reference to a reactor.
+///
+/// A `Handle` is used for associating I/O objects with an event loop
+/// explicitly. Typically though you won't end up using a `Handle` that often
+/// and will instead use the default reactor for the execution context.
+///
+/// By default, most components bind lazily to reactors.
+/// To get this behavior when manually passing a `Handle`, use `default()`.
+#[derive(Clone)]
+pub struct Handle {
+ inner: Option<HandlePriv>,
+}
+
+/// Like `Handle`, but never `None`.
+#[derive(Clone)]
+struct HandlePriv {
+ inner: Weak<Inner>,
+}
+
+/// Return value from the `turn` method on `Reactor`.
+///
+/// Currently this value doesn't actually provide any functionality, but it may
+/// in the future give insight into what happened during `turn`.
+#[derive(Debug)]
+pub struct Turn {
+ _priv: (),
+}
+
+/// Error returned from `Handle::set_fallback`.
+#[derive(Clone, Debug)]
+pub struct SetFallbackError(());
+
+#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")]
+#[doc(hidden)]
+pub type SetDefaultError = SetFallbackError;
+
+#[test]
+fn test_handle_size() {
+ use std::mem;
+ assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
+}
+
+struct Inner {
+ /// The underlying system event queue.
+ io: mio::Poll,
+
+ /// ABA guard counter
+ next_aba_guard: AtomicUsize,
+
+ /// Dispatch slabs for I/O and futures events
+ io_dispatch: RwLock<Slab<ScheduledIo>>,
+
+ /// Used to wake up the reactor from a call to `turn`
+ wakeup: mio::SetReadiness
+}
+
+struct ScheduledIo {
+ aba_guard: usize,
+ readiness: AtomicUsize,
+ reader: AtomicTask,
+ writer: AtomicTask,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(crate) enum Direction {
+ Read,
+ Write,
+}
+
+/// The global fallback reactor.
+static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT;
+
+/// Tracks the reactor for the current execution context.
+thread_local!(static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None));
+
+const TOKEN_SHIFT: usize = 22;
+
+// Kind of arbitrary, but this reserves some token space for later usage.
+const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
+const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES);
+
+fn _assert_kinds() {
+ fn _assert<T: Send + Sync>() {}
+
+ _assert::<Handle>();
+}
+
+/// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task
+#[derive(Debug, Clone)]
+pub(crate) enum Task {
+ Futures1(futures::task::Task),
+ #[cfg(feature = "unstable-futures")]
+ Futures2(futures2::task::Waker),
+}
+
+// ===== impl Reactor =====
+
+/// Set the default reactor for the duration of the closure
+///
+/// # Panics
+///
+/// This function panics if there already is a default reactor set.
+pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R
+where F: FnOnce(&mut Enter) -> R
+{
+ // Ensure that the executor is removed from the thread-local context
+ // when leaving the scope. This handles cases that involve panicking.
+ struct Reset;
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ CURRENT_REACTOR.with(|current| {
+ let mut current = current.borrow_mut();
+ *current = None;
+ });
+ }
+ }
+
+ // This ensures the value for the current reactor gets reset even if there
+ // is a panic.
+ let _r = Reset;
+
+ CURRENT_REACTOR.with(|current| {
+ {
+ let mut current = current.borrow_mut();
+
+ assert!(current.is_none(), "default Tokio reactor already set \
+ for execution context");
+
+ let handle = match handle.as_priv() {
+ Some(handle) => handle,
+ None => {
+ panic!("`handle` does not reference a reactor");
+ }
+ };
+
+ *current = Some(handle.clone());
+ }
+
+ f(enter)
+ })
+}
+
+impl Reactor {
+ /// Creates a new event loop, returning any error that happened during the
+ /// creation.
+ pub fn new() -> io::Result<Reactor> {
+ let io = mio::Poll::new()?;
+ let wakeup_pair = mio::Registration::new2();
+
+ io.register(&wakeup_pair.0,
+ TOKEN_WAKEUP,
+ mio::Ready::readable(),
+ mio::PollOpt::level())?;
+
+ Ok(Reactor {
+ events: mio::Events::with_capacity(1024),
+ _wakeup_registration: wakeup_pair.0,
+ inner: Arc::new(Inner {
+ io: io,
+ next_aba_guard: AtomicUsize::new(0),
+ io_dispatch: RwLock::new(Slab::with_capacity(1)),
+ wakeup: wakeup_pair.1,
+ }),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ ///
+ /// Handles are cloneable and clones always refer to the same event loop.
+ /// This handle is typically passed into functions that create I/O objects
+ /// to bind them to this event loop.
+ pub fn handle(&self) -> Handle {
+ Handle {
+ inner: Some(HandlePriv {
+ inner: Arc::downgrade(&self.inner),
+ }),
+ }
+ }
+
+ /// Configures the fallback handle to be returned from `Handle::default`.
+ ///
+ /// The `Handle::default()` function will by default lazily spin up a global
+ /// thread and run a reactor on this global thread. This behavior is not
+ /// always desirable in all applications, however, and sometimes a different
+ /// fallback reactor is desired.
+ ///
+ /// This function will attempt to globally alter the return value of
+ /// `Handle::default()` to return the `handle` specified rather than a
+ /// lazily initialized global thread. If successful then all future calls to
+ /// `Handle::default()` which would otherwise fall back to the global thread
+ /// will instead return a clone of the handle specified.
+ ///
+ /// # Errors
+ ///
+ /// This function may not always succeed in configuring the fallback handle.
+ /// If this function was previously called (or perhaps concurrently called
+ /// on many threads) only the *first* invocation of this function will
+ /// succeed. All other invocations will return an error.
+ ///
+ /// Additionally if the global reactor thread has already been initialized
+ /// then this function will also return an error. (aka if `Handle::default`
+ /// has been called previously in this program).
+ pub fn set_fallback(&self) -> Result<(), SetFallbackError> {
+ set_fallback(self.handle().into_priv().unwrap())
+ }
+
+ /// Performs one iteration of the event loop, blocking on waiting for events
+ /// for at most `max_wait` (forever if `None`).
+ ///
+ /// This method is the primary method of running this reactor and processing
+ /// I/O events that occur. This method executes one iteration of an event
+ /// loop, blocking at most once waiting for events to happen.
+ ///
+ /// If a `max_wait` is specified then the method should block no longer than
+ /// the duration specified, but this shouldn't be used as a super-precise
+ /// timer but rather a "ballpark approximation"
+ ///
+ /// # Return value
+ ///
+ /// This function returns an instance of `Turn`
+ ///
+ /// `Turn` as of today has no extra information with it and can be safely
+ /// discarded. In the future `Turn` may contain information about what
+ /// happened while this reactor blocked.
+ ///
+ /// # Errors
+ ///
+ /// This function may also return any I/O error which occurs when polling
+ /// for readiness of I/O objects with the OS. This is quite unlikely to
+ /// arise and typically mean that things have gone horribly wrong at that
+ /// point. Currently this is primarily only known to happen for internal
+ /// bugs to `tokio` itself.
+ pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
+ self.poll(max_wait)?;
+ Ok(Turn { _priv: () })
+ }
+
+ /// Returns true if the reactor is currently idle.
+ ///
+ /// Idle is defined as all tasks that have been spawned have completed,
+ /// either successfully or with an error.
+ pub fn is_idle(&self) -> bool {
+ self.inner.io_dispatch
+ .read().unwrap()
+ .is_empty()
+ }
+
+ /// Run this reactor on a background thread.
+ ///
+ /// This function takes ownership, spawns a new thread, and moves the
+ /// reactor to this new thread. It then runs the reactor, driving all
+ /// associated I/O resources, until the `Background` handle is dropped or
+ /// explicitly shutdown.
+ pub fn background(self) -> io::Result<Background> {
+ Background::new(self)
+ }
+
+ fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
+ // Block waiting for an event to happen, peeling out how many events
+ // happened.
+ match self.inner.io.poll(&mut self.events, max_wait) {
+ Ok(_) => {}
+ Err(e) => return Err(e),
+ }
+
+ let start = if log_enabled!(Level::Debug) {
+ Some(Instant::now())
+ } else {
+ None
+ };
+
+ // Process all the events that came in, dispatching appropriately
+ let mut events = 0;
+ for event in self.events.iter() {
+ events += 1;
+ let token = event.token();
+ trace!("event {:?} {:?}", event.readiness(), event.token());
+
+ if token == TOKEN_WAKEUP {
+ self.inner.wakeup.set_readiness(mio::Ready::empty()).unwrap();
+ } else {
+ self.dispatch(token, event.readiness());
+ }
+ }
+
+ if let Some(start) = start {
+ let dur = start.elapsed();
+ debug!("loop process - {} events, {}.{:03}s",
+ events,
+ dur.as_secs(),
+ dur.subsec_nanos() / 1_000_000);
+ }
+
+ Ok(())
+ }
+
+ fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
+ let aba_guard = token.0 & !MAX_SOURCES;
+ let token = token.0 & MAX_SOURCES;
+
+ let mut rd = None;
+ let mut wr = None;
+
+ // Create a scope to ensure that notifying the tasks stays out of the
+ // lock's critical section.
+ {
+ let io_dispatch = self.inner.io_dispatch.read().unwrap();
+
+ let io = match io_dispatch.get(token) {
+ Some(io) => io,
+ None => return,
+ };
+
+ if aba_guard != io.aba_guard {
+ return;
+ }
+
+ io.readiness.fetch_or(ready.as_usize(), Relaxed);
+
+ if ready.is_writable() || platform::is_hup(&ready) {
+ wr = io.writer.take_to_notify();
+ }
+
+ if !(ready & (!mio::Ready::writable())).is_empty() {
+ rd = io.reader.take_to_notify();
+ }
+ }
+
+ if let Some(task) = rd {
+ task.notify();
+ }
+
+ if let Some(task) = wr {
+ task.notify();
+ }
+ }
+}
+
+impl Park for Reactor {
+ type Unpark = Handle;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.handle()
+ }
+
+ fn park(&mut self) -> io::Result<()> {
+ self.turn(None)?;
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
+ self.turn(Some(duration))?;
+ Ok(())
+ }
+}
+
+impl fmt::Debug for Reactor {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Reactor")
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ /// Returns a handle to the current reactor.
+ pub fn current() -> Handle {
+ // TODO: Should this panic on error?
+ HandlePriv::try_current()
+ .map(|handle| Handle {
+ inner: Some(handle),
+ })
+ .unwrap_or(Handle {
+ inner: Some(HandlePriv {
+ inner: Weak::new(),
+ })
+ })
+ }
+
+ fn as_priv(&self) -> Option<&HandlePriv> {
+ self.inner.as_ref()
+ }
+
+ fn into_priv(self) -> Option<HandlePriv> {
+ self.inner
+ }
+
+ fn wakeup(&self) {
+ if let Some(handle) = self.as_priv() {
+ handle.wakeup();
+ }
+ }
+}
+
+impl Unpark for Handle {
+ fn unpark(&self) {
+ if let Some(ref h) = self.inner {
+ h.wakeup();
+ }
+ }
+}
+
+impl Default for Handle {
+ /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor.
+ fn default() -> Handle {
+ Handle { inner: None }
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
+
+fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> {
+ unsafe {
+ let val = handle.into_usize();
+ match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ drop(HandlePriv::from_usize(val));
+ Err(SetFallbackError(()))
+ }
+ }
+ }
+}
+
+// ===== impl HandlePriv =====
+
+impl HandlePriv {
+ /// Try to get a handle to the current reactor.
+ ///
+ /// Returns `Err` if no handle is found.
+ pub(crate) fn try_current() -> io::Result<HandlePriv> {
+ CURRENT_REACTOR.with(|current| {
+ match *current.borrow() {
+ Some(ref handle) => Ok(handle.clone()),
+ None => HandlePriv::fallback(),
+ }
+ })
+ }
+
+ /// Returns a handle to the fallback reactor.
+ fn fallback() -> io::Result<HandlePriv> {
+ let mut fallback = HANDLE_FALLBACK.load(SeqCst);
+
+ // If the fallback hasn't been previously initialized then let's spin
+ // up a helper thread and try to initialize with that. If we can't
+ // actually create a helper thread then we'll just return a "defunct"
+ // handle which will return errors when I/O objects are attempted to be
+ // associated.
+ if fallback == 0 {
+ let reactor = match Reactor::new() {
+ Ok(reactor) => reactor,
+ Err(_) => return Err(io::Error::new(io::ErrorKind::Other,
+ "failed to create reactor")),
+ };
+
+ // If we successfully set ourselves as the actual fallback then we
+ // want to `forget` the helper thread to ensure that it persists
+ // globally. If we fail to set ourselves as the fallback that means
+ // that someone was racing with this call to `Handle::default`.
+ // They ended up winning so we'll destroy our helper thread (which
+ // shuts down the thread) and reload the fallback.
+ if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() {
+ let ret = reactor.handle().into_priv().unwrap();
+
+ match reactor.background() {
+ Ok(bg) => bg.forget(),
+ // The global handle is fubar, but y'all probably got bigger
+ // problems if a thread can't spawn.
+ Err(_) => {}
+ }
+
+ return Ok(ret);
+ }
+
+ fallback = HANDLE_FALLBACK.load(SeqCst);
+ }
+
+ // At this point our fallback handle global was configured so we use
+ // its value to reify a handle, clone it, and then forget our reified
+ // handle as we don't actually have an owning reference to it.
+ assert!(fallback != 0);
+
+ let ret = unsafe {
+ let handle = HandlePriv::from_usize(fallback);
+ let ret = handle.clone();
+
+ // This prevents `handle` from being dropped and having the ref
+ // count decremented.
+ drop(handle.into_usize());
+
+ ret
+ };
+
+ Ok(ret)
+ }
+
+ /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
+ /// makes the next call to `turn` return immediately.
+ ///
+ /// This method is intended to be used in situations where a notification
+ /// needs to otherwise be sent to the main reactor. If the reactor is
+ /// currently blocked inside of `turn` then it will wake up and soon return
+ /// after this method has been called. If the reactor is not currently
+ /// blocked in `turn`, then the next call to `turn` will not block and
+ /// return immediately.
+ fn wakeup(&self) {
+ if let Some(inner) = self.inner() {
+ inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
+ }
+ }
+
+ fn into_usize(self) -> usize {
+ unsafe {
+ mem::transmute::<Weak<Inner>, usize>(self.inner)
+ }
+ }
+
+ unsafe fn from_usize(val: usize) -> HandlePriv {
+ let inner = mem::transmute::<usize, Weak<Inner>>(val);;
+ HandlePriv { inner }
+ }
+
+ fn inner(&self) -> Option<Arc<Inner>> {
+ self.inner.upgrade()
+ }
+}
+
+impl fmt::Debug for HandlePriv {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "HandlePriv")
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ /// Register an I/O resource with the reactor.
+ ///
+ /// The registration token is returned.
+ fn add_source(&self, source: &Evented)
+ -> io::Result<usize>
+ {
+ // Get an ABA guard value
+ let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed);
+
+ let mut io_dispatch = self.io_dispatch.write().unwrap();
+
+ if io_dispatch.len() == MAX_SOURCES {
+ return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \
+ registered I/O resources"));
+ }
+
+ // Acquire a write lock
+ let key = io_dispatch.insert(ScheduledIo {
+ aba_guard,
+ readiness: AtomicUsize::new(0),
+ reader: AtomicTask::new(),
+ writer: AtomicTask::new(),
+ });
+
+ try!(self.io.register(source,
+ mio::Token(aba_guard | key),
+ mio::Ready::all(),
+ mio::PollOpt::edge()));
+
+ Ok(key)
+ }
+
+ /// Deregisters an I/O resource from the reactor.
+ fn deregister_source(&self, source: &Evented) -> io::Result<()> {
+ self.io.deregister(source)
+ }
+
+ fn drop_source(&self, token: usize) {
+ debug!("dropping I/O source: {}", token);
+ self.io_dispatch.write().unwrap().remove(token);
+ }
+
+ /// Registers interest in the I/O resource associated with `token`.
+ fn register(&self, token: usize, dir: Direction, t: Task) {
+ debug!("scheduling direction for: {}", token);
+ let io_dispatch = self.io_dispatch.read().unwrap();
+ let sched = io_dispatch.get(token).unwrap();
+
+ let (task, ready) = match dir {
+ Direction::Read => (&sched.reader, !mio::Ready::writable()),
+ Direction::Write => (&sched.writer, mio::Ready::writable()),
+ };
+
+ task.register_task(t);
+
+ if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
+ task.notify();
+ }
+ }
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ // When a reactor is dropped it needs to wake up all blocked tasks as
+ // they'll never receive a notification, and all connected I/O objects
+ // will start returning errors pretty quickly.
+ let io = self.io_dispatch.read().unwrap();
+ for (_, io) in io.iter() {
+ io.writer.notify();
+ io.reader.notify();
+ }
+ }
+}
+
+impl Direction {
+ fn mask(&self) -> mio::Ready {
+ match *self {
+ Direction::Read => {
+ // Everything except writable is signaled through read.
+ mio::Ready::all() - mio::Ready::writable()
+ }
+ Direction::Write => mio::Ready::writable() | platform::hup(),
+ }
+ }
+}
+
+impl Task {
+ fn notify(&self) {
+ match *self {
+ Task::Futures1(ref task) => task.notify(),
+
+ #[cfg(feature = "unstable-futures")]
+ Task::Futures2(ref waker) => waker.wake(),
+ }
+ }
+}
+
+#[cfg(unix)]
+mod platform {
+ use mio::Ready;
+ use mio::unix::UnixReady;
+
+ pub fn hup() -> Ready {
+ UnixReady::hup().into()
+ }
+
+ pub fn is_hup(ready: &Ready) -> bool {
+ UnixReady::from(*ready).is_hup()
+ }
+}
+
+#[cfg(windows)]
+mod platform {
+ use mio::Ready;
+
+ pub fn hup() -> Ready {
+ Ready::empty()
+ }
+
+ pub fn is_hup(_: &Ready) -> bool {
+ false
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> {
+ match old {
+ futures::Async::Ready(x) => futures2::Async::Ready(x),
+ futures::Async::NotReady => futures2::Async::Pending,
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> {
+ match new {
+ futures2::Async::Ready(x) => futures::Async::Ready(x),
+ futures2::Async::Pending => futures::Async::NotReady,
+ }
+}
diff --git a/third_party/rust/tokio-reactor/src/poll_evented.rs b/third_party/rust/tokio-reactor/src/poll_evented.rs
new file mode 100644
index 0000000000..99dd8aa467
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/poll_evented.rs
@@ -0,0 +1,664 @@
+use {Handle, Registration};
+
+use futures::{task, Async, Poll};
+use mio;
+use mio::event::Evented;
+use tokio_io::{AsyncRead, AsyncWrite};
+
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
+use std::fmt;
+use std::io::{self, Read, Write};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::Relaxed;
+
+/// Associates an I/O resource that implements the [`std::Read`] and / or
+/// [`std::Write`] traits with the reactor that drives it.
+///
+/// `PollEvented` uses [`Registration`] internally to take a type that
+/// implements [`mio::Evented`] as well as [`std::Read`] and or [`std::Write`]
+/// and associate it with a reactor that will drive it.
+///
+/// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be
+/// used from within the future's execution model. As such, the `PollEvented`
+/// type provides [`AsyncRead`] and [`AsyncWrite`] implementations using the
+/// underlying I/O resource as well as readiness events provided by the reactor.
+///
+/// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
+/// `Sync`), the caller must ensure that there are at most two tasks that use a
+/// `PollEvented` instance concurrently. One for reading and one for writing.
+/// While violating this requirement is "safe" from a Rust memory model point of
+/// view, it will result in unexpected behavior in the form of lost
+/// notifications and tasks hanging.
+///
+/// ## Readiness events
+///
+/// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
+/// this type also supports access to the underlying readiness event stream.
+/// While similar in function to what [`Registration`] provides, the semantics
+/// are a bit different.
+///
+/// Two functions are provided to access the readiness events:
+/// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
+/// current readiness state of the `PollEvented` instance. If
+/// [`poll_read_ready`] indicates read readiness, immediately calling
+/// [`poll_read_ready`] again will also indicate read readiness.
+///
+/// When the operation is attempted and is unable to succeed due to the I/O
+/// resource not being ready, the caller must call [`clear_read_ready`] or
+/// [`clear_write_ready`]. This clears the readiness state until a new readiness
+/// event is received.
+///
+/// This allows the caller to implement additional functions. For example,
+/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
+/// [`clear_read_ready`].
+///
+/// ```rust,ignore
+/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
+/// let ready = Ready::readable();
+///
+/// try_ready!(self.poll_evented.poll_read_ready(ready));
+///
+/// match self.poll_evented.get_ref().accept_std() {
+/// Ok(pair) => Ok(Async::Ready(pair)),
+/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+/// self.poll_evented.clear_read_ready(ready);
+/// Ok(Async::NotReady)
+/// }
+/// Err(e) => Err(e),
+/// }
+/// }
+/// ```
+///
+/// ## Platform-specific events
+///
+/// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
+/// These events are included as part of the read readiness event stream. The
+/// write readiness event stream is only for `Ready::writable()` events.
+///
+/// [`std::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
+/// [`std::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
+/// [`AsyncRead`]: ../io/trait.AsyncRead.html
+/// [`AsyncWrite`]: ../io/trait.AsyncWrite.html
+/// [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html
+/// [`Registration`]: struct.Registration.html
+/// [`TcpListener`]: ../net/struct.TcpListener.html
+/// [`clear_read_ready`]: #method.clear_read_ready
+/// [`clear_read_ready`]: #method.clear_read_ready
+/// [`poll_read_ready`]: #method.poll_read_ready
+/// [`poll_write_ready`]: #method.poll_write_ready
+pub struct PollEvented<E: Evented> {
+ io: Option<E>,
+ inner: Inner,
+}
+
+struct Inner {
+ registration: Registration,
+
+ /// Currently visible read readiness
+ read_readiness: AtomicUsize,
+
+ /// Currently visible write readiness
+ write_readiness: AtomicUsize,
+}
+
+// ===== impl PollEvented =====
+
+macro_rules! poll_ready {
+ ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
+ $me.register()?;
+
+ // Load cached & encoded readiness.
+ let mut cached = $me.inner.$cache.load(Relaxed);
+ let mask = $mask | ::platform::hup();
+
+ // See if the current readiness matches any bits.
+ let mut ret = mio::Ready::from_usize(cached) & $mask;
+
+ if ret.is_empty() {
+ // Readiness does not match, consume the registration's readiness
+ // stream. This happens in a loop to ensure that the stream gets
+ // drained.
+ loop {
+ let ready = try_ready!($poll);
+ cached |= ready.as_usize();
+
+ // Update the cache store
+ $me.inner.$cache.store(cached, Relaxed);
+
+ ret |= ready & mask;
+
+ if !ret.is_empty() {
+ return Ok(ret.into());
+ }
+ }
+ } else {
+ // Check what's new with the registration stream. This will not
+ // request to be notified
+ if let Some(ready) = $me.inner.registration.$take()? {
+ cached |= ready.as_usize();
+ $me.inner.$cache.store(cached, Relaxed);
+ }
+
+ Ok(mio::Ready::from_usize(cached).into())
+ }
+ }}
+}
+
+impl<E> PollEvented<E>
+where E: Evented
+{
+ /// Creates a new `PollEvented` associated with the default reactor.
+ pub fn new(io: E) -> PollEvented<E> {
+ PollEvented {
+ io: Some(io),
+ inner: Inner {
+ registration: Registration::new(),
+ read_readiness: AtomicUsize::new(0),
+ write_readiness: AtomicUsize::new(0),
+ }
+ }
+ }
+
+ /// Creates a new `PollEvented` associated with the specified reactor.
+ pub fn new_with_handle(io: E, handle: &Handle) -> io::Result<Self> {
+ let ret = PollEvented::new(io);
+
+ if let Some(handle) = handle.as_priv() {
+ ret.inner.registration
+ .register_with_priv(ret.io.as_ref().unwrap(), handle)?;
+ }
+
+ Ok(ret)
+ }
+
+ /// Returns a shared reference to the underlying I/O object this readiness
+ /// stream is wrapping.
+ pub fn get_ref(&self) -> &E {
+ self.io.as_ref().unwrap()
+ }
+
+ /// Returns a mutable reference to the underlying I/O object this readiness
+ /// stream is wrapping.
+ pub fn get_mut(&mut self) -> &mut E {
+ self.io.as_mut().unwrap()
+ }
+
+ /// Consumes self, returning the inner I/O object
+ ///
+ /// This function will deregister the I/O resource from the reactor before
+ /// returning. If the deregistration operation fails, an error is returned.
+ ///
+ /// Note that deregistering does not guarantee that the I/O resource can be
+ /// registered with a different reactor. Some I/O resource types can only be
+ /// associated with a single reactor instance for their lifetime.
+ pub fn into_inner(mut self) -> io::Result<E> {
+ let io = self.io.take().unwrap();
+ self.inner.registration.deregister(&io)?;
+ Ok(io)
+ }
+
+ /// Check the I/O resource's read readiness state.
+ ///
+ /// The mask argument allows specifying what readiness to notify on. This
+ /// can be any value, including platform specific readiness, **except**
+ /// `writable`. HUP is always implicitly included on platforms that support
+ /// it.
+ ///
+ /// If the resource is not ready for a read then `Async::NotReady` is
+ /// returned and the current task is notified once a new event is received.
+ ///
+ /// The I/O resource will remain in a read-ready state until readiness is
+ /// cleared by calling [`clear_read_ready`].
+ ///
+ /// [`clear_read_ready`]: #method.clear_read_ready
+ ///
+ /// # Panics
+ ///
+ /// This function panics if:
+ ///
+ /// * `ready` includes writable.
+ /// * called from outside of a task context.
+ pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
+ assert!(!mask.is_writable(), "cannot poll for write readiness");
+ poll_ready!(
+ self, mask, read_readiness, take_read_ready,
+ self.inner.registration.poll_read_ready()
+ )
+ }
+
+ /// Like `poll_read_ready` but compatible with futures 0.2.
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context, mask: mio::Ready)
+ -> futures2::Poll<mio::Ready, io::Error>
+ {
+ assert!(!mask.is_writable(), "cannot poll for write readiness");
+ let mut res = || poll_ready!(
+ self, mask, read_readiness, take_read_ready,
+ self.inner.registration.poll_read_ready2(cx).map(::lower_async)
+ );
+ res().map(::lift_async)
+ }
+
+ /// Clears the I/O resource's read readiness state and registers the current
+ /// task to be notified once a read readiness event is received.
+ ///
+ /// After calling this function, `poll_read_ready` will return `NotReady`
+ /// until a new read readiness event has been received.
+ ///
+ /// The `mask` argument specifies the readiness bits to clear. This may not
+ /// include `writable` or `hup`.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if:
+ ///
+ /// * `ready` includes writable or HUP
+ /// * called from outside of a task context.
+ pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
+ // Cannot clear write readiness
+ assert!(!ready.is_writable(), "cannot clear write readiness");
+ assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness");
+
+ self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed);
+
+ if self.poll_read_ready(ready)?.is_ready() {
+ // Notify the current task
+ task::current().notify();
+ }
+
+ Ok(())
+ }
+
+ /// Like `clear_read_ready` but compatible with futures 0.2.
+ #[cfg(feature = "unstable-futures")]
+ pub fn clear_read_ready2(&self, cx: &mut futures2::task::Context, ready: mio::Ready)
+ -> io::Result<()>
+ {
+ // Cannot clear write readiness
+ assert!(!ready.is_writable(), "cannot clear write readiness");
+ assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness");
+
+ self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed);
+
+ if self.poll_read_ready2(cx, ready)?.is_ready() {
+ // Notify the current task
+ cx.waker().wake()
+ }
+
+ Ok(())
+ }
+
+ /// Check the I/O resource's write readiness state.
+ ///
+ /// This always checks for writable readiness and also checks for HUP
+ /// readiness on platforms that support it.
+ ///
+ /// If the resource is not ready for a write then `Async::NotReady` is
+ /// returned and the current task is notified once a new event is received.
+ ///
+ /// The I/O resource will remain in a write-ready state until readiness is
+ /// cleared by calling [`clear_write_ready`].
+ ///
+ /// [`clear_write_ready`]: #method.clear_write_ready
+ ///
+ /// # Panics
+ ///
+ /// This function panics if:
+ ///
+ /// * `ready` contains bits besides `writable` and `hup`.
+ /// * called from outside of a task context.
+ pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
+ poll_ready!(
+ self,
+ mio::Ready::writable(),
+ write_readiness,
+ take_write_ready,
+ self.inner.registration.poll_write_ready()
+ )
+ }
+
+ /// Like `poll_write_ready` but compatible with futures 0.2.
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context)
+ -> futures2::Poll<mio::Ready, io::Error>
+ {
+ let mut res = || poll_ready!(
+ self,
+ mio::Ready::writable(),
+ write_readiness,
+ take_write_ready,
+ self.inner.registration.poll_write_ready2(cx).map(::lower_async)
+ );
+ res().map(::lift_async)
+ }
+
+
+ /// Resets the I/O resource's write readiness state and registers the current
+ /// task to be notified once a write readiness event is received.
+ ///
+ /// This only clears writable readiness. HUP (on platforms that support HUP)
+ /// cannot be cleared as it is a final state.
+ ///
+ /// After calling this function, `poll_write_ready(Ready::writable())` will
+ /// return `NotReady` until a new write readiness event has been received.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if called from outside of a task context.
+ pub fn clear_write_ready(&self) -> io::Result<()> {
+ let ready = mio::Ready::writable();
+
+ self.inner.write_readiness.fetch_and(!ready.as_usize(), Relaxed);
+
+ if self.poll_write_ready()?.is_ready() {
+ // Notify the current task
+ task::current().notify();
+ }
+
+ Ok(())
+ }
+
+ /// Like `clear_write_ready`, but compatible with futures 0.2.
+ #[cfg(feature = "unstable-futures")]
+ pub fn clear_write_ready2(&self, cx: &mut futures2::task::Context) -> io::Result<()> {
+ let ready = mio::Ready::writable();
+
+ self.inner.write_readiness.fetch_and(!ready.as_usize(), Relaxed);
+
+ if self.poll_write_ready2(cx)?.is_ready() {
+ // Notify the current task
+ cx.waker().wake()
+ }
+
+ Ok(())
+ }
+
+ /// Ensure that the I/O resource is registered with the reactor.
+ fn register(&self) -> io::Result<()> {
+ self.inner.registration.register(self.io.as_ref().unwrap())?;
+ Ok(())
+ }
+}
+
+// ===== Read / Write impls =====
+
+impl<E> Read for PollEvented<E>
+where E: Evented + Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let r = self.get_mut().read(buf);
+
+ if is_wouldblock(&r) {
+ self.clear_read_ready(mio::Ready::readable())?;
+ }
+
+ return r
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+impl<E> futures2::io::AsyncRead for PollEvented<E>
+ where E: Evented, E: Read,
+{
+ fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.poll_read_ready2(cx, mio::Ready::readable())? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.get_mut().read(buf) {
+ Ok(n) => Ok(futures2::Async::Ready(n)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_read_ready2(cx, mio::Ready::readable())?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl<E> Write for PollEvented<E>
+where E: Evented + Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ if let Async::NotReady = self.poll_write_ready()? {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let r = self.get_mut().write(buf);
+
+ if is_wouldblock(&r) {
+ self.clear_write_ready()?;
+ }
+
+ return r
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ if let Async::NotReady = self.poll_write_ready()? {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let r = self.get_mut().flush();
+
+ if is_wouldblock(&r) {
+ self.clear_write_ready()?;
+ }
+
+ return r
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+impl<E> futures2::io::AsyncWrite for PollEvented<E>
+ where E: Evented, E: Write,
+{
+ fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.get_mut().write(buf) {
+ Ok(n) => Ok(futures2::Async::Ready(n)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_write_ready2(cx)?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.get_mut().flush() {
+ Ok(_) => Ok(futures2::Async::Ready(())),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_write_ready2(cx)?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ futures2::io::AsyncWrite::poll_flush(self, cx)
+ }
+}
+
+
+impl<E> AsyncRead for PollEvented<E>
+where E: Evented + Read,
+{
+}
+
+impl<E> AsyncWrite for PollEvented<E>
+where E: Evented + Write,
+{
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+}
+
+// ===== &'a Read / &'a Write impls =====
+
+impl<'a, E> Read for &'a PollEvented<E>
+where E: Evented, &'a E: Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let r = self.get_ref().read(buf);
+
+ if is_wouldblock(&r) {
+ self.clear_read_ready(mio::Ready::readable())?;
+ }
+
+ return r
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+impl<'a, E> futures2::io::AsyncRead for &'a PollEvented<E>
+ where E: Evented, &'a E: Read,
+{
+ fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.poll_read_ready2(cx, mio::Ready::readable())? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.get_ref().read(buf) {
+ Ok(n) => Ok(futures2::Async::Ready(n)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_read_ready2(cx, mio::Ready::readable())?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl<'a, E> Write for &'a PollEvented<E>
+where E: Evented, &'a E: Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ if let Async::NotReady = self.poll_write_ready()? {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let r = self.get_ref().write(buf);
+
+ if is_wouldblock(&r) {
+ self.clear_write_ready()?;
+ }
+
+ return r
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ if let Async::NotReady = self.poll_write_ready()? {
+ return Err(io::ErrorKind::WouldBlock.into())
+ }
+
+ let r = self.get_ref().flush();
+
+ if is_wouldblock(&r) {
+ self.clear_write_ready()?;
+ }
+
+ return r
+ }
+}
+
+#[cfg(feature = "unstable-futures")]
+impl<'a, E> futures2::io::AsyncWrite for &'a PollEvented<E>
+ where E: Evented, &'a E: Write,
+{
+ fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.get_ref().write(buf) {
+ Ok(n) => Ok(futures2::Async::Ready(n)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_write_ready2(cx)?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
+ return Ok(futures2::Async::Pending);
+ }
+
+ match self.get_ref().flush() {
+ Ok(_) => Ok(futures2::Async::Ready(())),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_write_ready2(cx)?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
+ futures2::io::AsyncWrite::poll_flush(self, cx)
+ }
+}
+
+impl<'a, E> AsyncRead for &'a PollEvented<E>
+where E: Evented, &'a E: Read,
+{
+}
+
+impl<'a, E> AsyncWrite for &'a PollEvented<E>
+where E: Evented, &'a E: Write,
+{
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+}
+
+fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
+ match *r {
+ Ok(_) => false,
+ Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
+ }
+}
+
+impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("PollEvented")
+ .field("io", &self.io)
+ .finish()
+ }
+}
+
+impl<E: Evented> Drop for PollEvented<E> {
+ fn drop(&mut self) {
+ if let Some(io) = self.io.take() {
+ // Ignore errors
+ let _ = self.inner.registration.deregister(&io);
+ }
+ }
+}
diff --git a/third_party/rust/tokio-reactor/src/registration.rs b/third_party/rust/tokio-reactor/src/registration.rs
new file mode 100644
index 0000000000..278b57680c
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/registration.rs
@@ -0,0 +1,569 @@
+use {Handle, HandlePriv, Direction, Task};
+
+use futures::{Async, Poll, task};
+use mio::{self, Evented};
+
+#[cfg(feature = "unstable-futures")]
+use futures2;
+
+use std::{io, ptr, usize};
+use std::cell::UnsafeCell;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// Associates an I/O resource with the reactor instance that drives it.
+///
+/// A registration represents an I/O resource registered with a Reactor such
+/// that it will receive task notifications on readiness. This is the lowest
+/// level API for integrating with a reactor.
+///
+/// The association between an I/O resource is made by calling [`register`].
+/// Once the association is established, it remains established until the
+/// registration instance is dropped. Subsequent calls to [`register`] are
+/// no-ops.
+///
+/// A registration instance represents two separate readiness streams. One for
+/// the read readiness and one for write readiness. These streams are
+/// independent and can be consumed from separate tasks.
+///
+/// **Note**: while `Registration` is `Sync`, the caller must ensure that there
+/// are at most two tasks that use a registration instance concurrently. One
+/// task for [`poll_read_ready`] and one task for [`poll_write_ready`]. While
+/// violating this requirement is "safe" from a Rust memory safety point of
+/// view, it will result in unexpected behavior in the form of lost
+/// notifications and tasks hanging.
+///
+/// ## Platform-specific events
+///
+/// `Registration` also allows receiving platform-specific `mio::Ready` events.
+/// These events are included as part of the read readiness event stream. The
+/// write readiness event stream is only for `Ready::writable()` events.
+///
+/// [`register`]: #method.register
+/// [`poll_read_ready`]: #method.poll_read_ready`]
+/// [`poll_write_ready`]: #method.poll_write_ready`]
+#[derive(Debug)]
+pub struct Registration {
+ /// Stores the handle. Once set, the value is not changed.
+ ///
+ /// Setting this requires acquiring the lock from state.
+ inner: UnsafeCell<Option<Inner>>,
+
+ /// Tracks the state of the registration.
+ ///
+ /// The least significant 2 bits are used to track the lifecycle of the
+ /// registration. The rest of the `state` variable is a pointer to tasks
+ /// that must be notified once the lock is released.
+ state: AtomicUsize,
+}
+
+#[derive(Debug)]
+struct Inner {
+ handle: HandlePriv,
+ token: usize,
+}
+
+/// Tasks waiting on readiness notifications.
+#[derive(Debug)]
+struct Node {
+ direction: Direction,
+ task: Task,
+ next: *mut Node,
+}
+
+/// Initial state. The handle is not set and the registration is idle.
+const INIT: usize = 0;
+
+/// A thread locked the state and will associate a handle.
+const LOCKED: usize = 1;
+
+/// A handle has been associated with the registration.
+const READY: usize = 2;
+
+/// Masks the lifecycle state
+const LIFECYCLE_MASK: usize = 0b11;
+
+/// A fake token used to identify error situations
+const ERROR: usize = usize::MAX;
+
+// ===== impl Registration =====
+
+impl Registration {
+ /// Create a new `Registration`.
+ ///
+ /// This registration is not associated with a Reactor instance. Call
+ /// `register` to establish the association.
+ pub fn new() -> Registration {
+ Registration {
+ inner: UnsafeCell::new(None),
+ state: AtomicUsize::new(INIT),
+ }
+ }
+
+ /// Register the I/O resource with the default reactor.
+ ///
+ /// This function is safe to call concurrently and repeatedly. However, only
+ /// the first call will establish the registration. Subsequent calls will be
+ /// no-ops.
+ ///
+ /// # Return
+ ///
+ /// If the registration happened successfully, `Ok(true)` is returned.
+ ///
+ /// If an I/O resource has previously been successfully registered,
+ /// `Ok(false)` is returned.
+ ///
+ /// If an error is encountered during registration, `Err` is returned.
+ pub fn register<T>(&self, io: &T) -> io::Result<bool>
+ where T: Evented,
+ {
+ self.register2(io, || HandlePriv::try_current())
+ }
+
+ /// Deregister the I/O resource from the reactor it is associated with.
+ ///
+ /// This function must be called before the I/O resource associated with the
+ /// registration is dropped.
+ ///
+ /// Note that deregistering does not guarantee that the I/O resource can be
+ /// registered with a different reactor. Some I/O resource types can only be
+ /// associated with a single reactor instance for their lifetime.
+ ///
+ /// # Return
+ ///
+ /// If the deregistration was successful, `Ok` is returned. Any calls to
+ /// `Reactor::turn` that happen after a successful call to `deregister` will
+ /// no longer result in notifications getting sent for this registration.
+ ///
+ /// `Err` is returned if an error is encountered.
+ pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
+ where T: Evented,
+ {
+ // The state does not need to be checked and coordination is not
+ // necessary as this function takes `&mut self`. This guarantees a
+ // single thread is accessing the instance.
+ if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
+ inner.deregister(io)?;
+ }
+
+ Ok(())
+ }
+
+ /// Register the I/O resource with the specified reactor.
+ ///
+ /// This function is safe to call concurrently and repeatedly. However, only
+ /// the first call will establish the registration. Subsequent calls will be
+ /// no-ops.
+ ///
+ /// If the registration happened successfully, `Ok(true)` is returned.
+ ///
+ /// If an I/O resource has previously been successfully registered,
+ /// `Ok(false)` is returned.
+ ///
+ /// If an error is encountered during registration, `Err` is returned.
+ pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
+ where T: Evented,
+ {
+ self.register2(io, || {
+ match handle.as_priv() {
+ Some(handle) => Ok(handle.clone()),
+ None => HandlePriv::try_current(),
+ }
+ })
+ }
+
+ pub(crate) fn register_with_priv<T>(&self, io: &T, handle: &HandlePriv) -> io::Result<bool>
+ where T: Evented,
+ {
+ self.register2(io, || Ok(handle.clone()))
+ }
+
+ fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
+ where T: Evented,
+ F: Fn() -> io::Result<HandlePriv>,
+ {
+ let mut state = self.state.load(SeqCst);
+
+ loop {
+ match state {
+ INIT => {
+ // Registration is currently not associated with a handle.
+ // Get a handle then attempt to lock the state.
+ let handle = f()?;
+
+ let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ // Create the actual registration
+ let (inner, res) = Inner::new(io, handle);
+
+ unsafe { *self.inner.get() = Some(inner); }
+
+ // Transition out of the locked state. This acquires the
+ // current value, potentially having a list of tasks that
+ // are pending readiness notifications.
+ let actual = self.state.swap(READY, SeqCst);
+
+ // Consume the stack of nodes
+
+ let mut read = false;
+ let mut write = false;
+ let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node;
+
+ let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
+
+ while !ptr.is_null() {
+ let node = unsafe { Box::from_raw(ptr) };
+ let node = *node;
+ let Node {
+ direction,
+ task,
+ next,
+ } = node;
+
+ let flag = match direction {
+ Direction::Read => &mut read,
+ Direction::Write => &mut write,
+ };
+
+ if !*flag {
+ *flag = true;
+
+ inner.register(direction, task);
+ }
+
+ ptr = next;
+ }
+
+ return res.map(|_| true);
+ }
+ _ => return Ok(false),
+ }
+ }
+ }
+
+ /// Poll for events on the I/O resource's read readiness stream.
+ ///
+ /// If the I/O resource receives a new read readiness event since the last
+ /// call to `poll_read_ready`, it is returned. If it has not, the current
+ /// task is notified once a new event is received.
+ ///
+ /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
+ /// the function will always return `Ready(HUP)`. This should be treated as
+ /// the end of the readiness stream.
+ ///
+ /// Ensure that [`register`] has been called first.
+ ///
+ /// # Return value
+ ///
+ /// There are several possible return values:
+ ///
+ /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
+ /// a new readiness event. The readiness value is included.
+ ///
+ /// * `Ok(NotReady)` means that no new readiness events have been received
+ /// since the last call to `poll_read_ready`.
+ ///
+ /// * `Err(err)` means that the registration has encountered an error. This
+ /// error either represents a permanent internal error **or** the fact
+ /// that [`register`] was not called first.
+ ///
+ /// [`register`]: #method.register
+ /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
+ self.poll_ready(Direction::Read, true, || Task::Futures1(task::current()))
+ .map(|v| match v {
+ Some(v) => Async::Ready(v),
+ _ => Async::NotReady,
+ })
+ }
+
+ /// Like `poll_ready_ready`, but compatible with futures 0.2
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context)
+ -> futures2::Poll<mio::Ready, io::Error>
+ {
+ use futures2::Async as Async2;
+ self.poll_ready(Direction::Read, true, || Task::Futures2(cx.waker().clone()))
+ .map(|v| match v {
+ Some(v) => Async2::Ready(v),
+ _ => Async2::Pending,
+ })
+ }
+
+ /// Consume any pending read readiness event.
+ ///
+ /// This function is identical to [`poll_read_ready`] **except** that it
+ /// will not notify the current task when a new event is received. As such,
+ /// it is safe to call this function from outside of a task context.
+ ///
+ /// [`poll_read_ready`]: #method.poll_read_ready
+ pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
+ self.poll_ready(Direction::Read, false, || panic!())
+
+ }
+
+ /// Poll for events on the I/O resource's write readiness stream.
+ ///
+ /// If the I/O resource receives a new write readiness event since the last
+ /// call to `poll_write_ready`, it is returned. If it has not, the current
+ /// task is notified once a new event is received.
+ ///
+ /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
+ /// the function will always return `Ready(HUP)`. This should be treated as
+ /// the end of the readiness stream.
+ ///
+ /// Ensure that [`register`] has been called first.
+ ///
+ /// # Return value
+ ///
+ /// There are several possible return values:
+ ///
+ /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received
+ /// a new readiness event. The readiness value is included.
+ ///
+ /// * `Ok(NotReady)` means that no new readiness events have been received
+ /// since the last call to `poll_write_ready`.
+ ///
+ /// * `Err(err)` means that the registration has encountered an error. This
+ /// error either represents a permanent internal error **or** the fact
+ /// that [`register`] was not called first.
+ ///
+ /// [`register`]: #method.register
+ /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
+ self.poll_ready(Direction::Write, true, || Task::Futures1(task::current()))
+ .map(|v| match v {
+ Some(v) => Async::Ready(v),
+ _ => Async::NotReady,
+ })
+ }
+
+ /// Like `poll_write_ready`, but compatible with futures 0.2
+ #[cfg(feature = "unstable-futures")]
+ pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context)
+ -> futures2::Poll<mio::Ready, io::Error>
+ {
+ use futures2::Async as Async2;
+ self.poll_ready(Direction::Write, true, || Task::Futures2(cx.waker().clone()))
+ .map(|v| match v {
+ Some(v) => Async2::Ready(v),
+ _ => Async2::Pending,
+ })
+ }
+
+ /// Consume any pending write readiness event.
+ ///
+ /// This function is identical to [`poll_write_ready`] **except** that it
+ /// will not notify the current task when a new event is received. As such,
+ /// it is safe to call this function from outside of a task context.
+ ///
+ /// [`poll_write_ready`]: #method.poll_write_ready
+ pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
+ self.poll_ready(Direction::Write, false, || unreachable!())
+ }
+
+ fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F)
+ -> io::Result<Option<mio::Ready>>
+ where F: Fn() -> Task
+ {
+ let mut state = self.state.load(SeqCst);
+
+ // Cache the node pointer
+ let mut node = None;
+
+ loop {
+ match state {
+ INIT => {
+ return Err(io::Error::new(io::ErrorKind::Other, "must call `register`
+ before poll_read_ready"));
+ }
+ READY => {
+ let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
+ return inner.poll_ready(direction, notify, task);
+ }
+ LOCKED => {
+ if !notify {
+ // Skip the notification tracking junk.
+ return Ok(None);
+ }
+
+ let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
+
+ let task = task();
+
+ // Get the node
+ let mut n = node.take().unwrap_or_else(|| {
+ Box::new(Node {
+ direction,
+ task: task,
+ next: ptr::null_mut(),
+ })
+ });
+
+ n.next = next_ptr;
+
+ let node_ptr = Box::into_raw(n);
+ let next = node_ptr as usize | (state & LIFECYCLE_MASK);
+
+ let actual = self.state.compare_and_swap(state, next, SeqCst);
+
+ if actual != state {
+ // Back out of the node boxing
+ let n = unsafe { Box::from_raw(node_ptr) };
+
+ // Save this for next loop
+ node = Some(n);
+
+ state = actual;
+ continue;
+ }
+
+ return Ok(None);
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+}
+
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+
+// ===== impl Inner =====
+
+impl Inner {
+ fn new<T>(io: &T, handle: HandlePriv) -> (Self, io::Result<()>)
+ where T: Evented,
+ {
+ let mut res = Ok(());
+
+ let token = match handle.inner() {
+ Some(inner) => match inner.add_source(io) {
+ Ok(token) => token,
+ Err(e) => {
+ res = Err(e);
+ ERROR
+ }
+ },
+ None => {
+ res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
+ ERROR
+ }
+ };
+
+ let inner = Inner {
+ handle,
+ token,
+ };
+
+ (inner, res)
+ }
+
+ fn register(&self, direction: Direction, task: Task) {
+ if self.token == ERROR {
+ task.notify();
+ return;
+ }
+
+ let inner = match self.handle.inner() {
+ Some(inner) => inner,
+ None => {
+ task.notify();
+ return;
+ }
+ };
+
+ inner.register(self.token, direction, task);
+ }
+
+ fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
+ if self.token == ERROR {
+ return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
+ }
+
+ let inner = match self.handle.inner() {
+ Some(inner) => inner,
+ None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
+ };
+
+ inner.deregister_source(io)
+ }
+
+ fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F)
+ -> io::Result<Option<mio::Ready>>
+ where F: FnOnce() -> Task
+ {
+ if self.token == ERROR {
+ return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
+ }
+
+ let inner = match self.handle.inner() {
+ Some(inner) => inner,
+ None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
+ };
+
+ let mask = direction.mask();
+ let mask_no_hup = (mask - ::platform::hup()).as_usize();
+
+ let io_dispatch = inner.io_dispatch.read().unwrap();
+ let sched = &io_dispatch[self.token];
+
+ // This consumes the current readiness state **except** for HUP. HUP is
+ // excluded because a) it is a final state and never transitions out of
+ // HUP and b) both the read AND the write directions need to be able to
+ // observe this state.
+ //
+ // If HUP were to be cleared when `direction` is `Read`, then when
+ // `poll_ready` is called again with a _`direction` of `Write`, the HUP
+ // state would not be visible.
+ let mut ready = mask & mio::Ready::from_usize(
+ sched.readiness.fetch_and(!mask_no_hup, SeqCst));
+
+ if ready.is_empty() && notify {
+ let task = task();
+ // Update the task info
+ match direction {
+ Direction::Read => sched.reader.register_task(task),
+ Direction::Write => sched.writer.register_task(task),
+ }
+
+ // Try again
+ ready = mask & mio::Ready::from_usize(
+ sched.readiness.fetch_and(!mask_no_hup, SeqCst));
+ }
+
+ if ready.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some(ready))
+ }
+ }
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ if self.token == ERROR {
+ return;
+ }
+
+ let inner = match self.handle.inner() {
+ Some(inner) => inner,
+ None => return,
+ };
+
+ inner.drop_source(self.token);
+ }
+}