diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-reactor/src | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-reactor/src')
-rw-r--r-- | third_party/rust/tokio-reactor/src/atomic_task.rs | 297 | ||||
-rw-r--r-- | third_party/rust/tokio-reactor/src/background.rs | 217 | ||||
-rw-r--r-- | third_party/rust/tokio-reactor/src/lib.rs | 773 | ||||
-rw-r--r-- | third_party/rust/tokio-reactor/src/poll_evented.rs | 664 | ||||
-rw-r--r-- | third_party/rust/tokio-reactor/src/registration.rs | 569 |
5 files changed, 2520 insertions, 0 deletions
diff --git a/third_party/rust/tokio-reactor/src/atomic_task.rs b/third_party/rust/tokio-reactor/src/atomic_task.rs new file mode 100644 index 0000000000..b48dca402c --- /dev/null +++ b/third_party/rust/tokio-reactor/src/atomic_task.rs @@ -0,0 +1,297 @@ +#![allow(dead_code)] + +use super::Task; + +use std::fmt; +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, Release, AcqRel}; + +/// A synchronization primitive for task notification. +/// +/// `AtomicTask` will coordinate concurrent notifications with the consumer +/// potentially "updating" the underlying task to notify. This is useful in +/// scenarios where a computation completes in another thread and wants to +/// notify the consumer, but the consumer is in the process of being migrated to +/// a new logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `notify` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `notify` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicTask` may be reused for any number of calls to `register` or +/// `notify`. +/// +/// `AtomicTask` does not provide any memory ordering guarantees, as such the +/// user should use caution and use other synchronization primitives to guard +/// the result of the underlying computation. +pub(crate) struct AtomicTask { + state: AtomicUsize, + task: UnsafeCell<Option<Task>>, +} + +// `AtomicTask` is a multi-consumer, single-producer transfer cell. The cell +// stores a `Task` value produced by calls to `register` and many threads can +// race to take the task (to notify it) by calling `notify. +// +// If a new `Task` instance is produced by calling `register` before an existing +// one is consumed, then the existing one is overwritten. +// +// While `AtomicTask` is single-producer, the implementation ensures memory +// safety. In the event of concurrent calls to `register`, there will be a +// single winner whose task will get stored in the cell. The losers will not +// have their tasks notified. As such, callers should ensure to add +// synchronization to calls to `register`. +// +// The implementation uses a single `AtomicUsize` value to coordinate access to +// the `Task` cell. There are two bits that are operated on independently. These +// are represented by `REGISTERING` and `NOTIFYING`. +// +// The `REGISTERING` bit is set when a producer enters the critical section. The +// `NOTIFYING` bit is set when a consumer enters the critical section. Neither +// bit being set is represented by `WAITING`. +// +// A thread obtains an exclusive lock on the task cell by transitioning the +// state from `WAITING` to `REGISTERING` or `NOTIFYING`, depending on the +// operation the thread wishes to perform. When this transition is made, it is +// guaranteed that no other thread will access the task cell. +// +// # Registering +// +// On a call to `register`, an attempt to transition the state from WAITING to +// REGISTERING is made. On success, the caller obtains a lock on the task cell. +// +// If the lock is obtained, then the thread sets the task cell to the task +// provided as an argument. Then it attempts to transition the state back from +// `REGISTERING` -> `WAITING`. +// +// If this transition is successful, then the registering process is complete +// and the next call to `notify` will observe the task. +// +// If the transition fails, then there was a concurrent call to `notify` that +// was unable to access the task cell (due to the registering thread holding the +// lock). To handle this, the registering thread removes the task it just set +// from the cell and calls `notify` on it. This call to notify represents the +// attempt to notify by the other thread (that set the `NOTIFYING` bit). The +// state is then transitioned from `REGISTERING | NOTIFYING` back to `WAITING`. +// This transition must succeed because, at this point, the state cannot be +// transitioned by another thread. +// +// # Notifying +// +// On a call to `notify`, an attempt to transition the state from `WAITING` to +// `NOTIFYING` is made. On success, the caller obtains a lock on the task cell. +// +// If the lock is obtained, then the thread takes ownership of the current value +// in the task cell, and calls `notify` on it. The state is then transitioned +// back to `WAITING`. This transition must succeed as, at this point, the state +// cannot be transitioned by another thread. +// +// If the thread is unable to obtain the lock, the `NOTIFYING` bit is still. +// This is because it has either been set by the current thread but the previous +// value included the `REGISTERING` bit **or** a concurrent thread is in the +// `NOTIFYING` critical section. Either way, no action must be taken. +// +// If the current thread is the only concurrent call to `notify` and another +// thread is in the `register` critical section, when the other thread **exits** +// the `register` critical section, it will observe the `NOTIFYING` bit and +// handle the notify itself. +// +// If another thread is in the `notify` critical section, then it will handle +// notifying the task. +// +// # A potential race (is safely handled). +// +// Imagine the following situation: +// +// * Thread A obtains the `notify` lock and notifies a task. +// +// * Before thread A releases the `notify` lock, the notified task is scheduled. +// +// * Thread B attempts to notify the task. In theory this should result in the +// task being notified, but it cannot because thread A still holds the notify +// lock. +// +// This case is handled by requiring users of `AtomicTask` to call `register` +// **before** attempting to observe the application state change that resulted +// in the task being notified. The notifiers also change the application state +// before calling notify. +// +// Because of this, the task will do one of two things. +// +// 1) Observe the application state change that Thread B is notifying on. In +// this case, it is OK for Thread B's notification to be lost. +// +// 2) Call register before attempting to observe the application state. Since +// Thread A still holds the `notify` lock, the call to `register` will result +// in the task notifying itself and get scheduled again. + +/// Idle state +const WAITING: usize = 0; + +/// A new task value is being registered with the `AtomicTask` cell. +const REGISTERING: usize = 0b01; + +/// The task currently registered with the `AtomicTask` cell is being notified. +const NOTIFYING: usize = 0b10; + +impl AtomicTask { + /// Create an `AtomicTask` initialized with the given `Task` + pub fn new() -> AtomicTask { + // Make sure that task is Sync + trait AssertSync: Sync {} + impl AssertSync for Task {} + + AtomicTask { + state: AtomicUsize::new(WAITING), + task: UnsafeCell::new(None), + } + } + + /// Registers the provided task to be notified on calls to `notify`. + /// + /// The new task will take place of any previous tasks that were registered + /// by previous calls to `register`. Any calls to `notify` that happen after + /// a call to `register` (as defined by the memory ordering rules), will + /// notify the `register` caller's task. + /// + /// It is safe to call `register` with multiple other threads concurrently + /// calling `notify`. This will result in the `register` caller's current + /// task being notified once. + /// + /// This function is safe to call concurrently, but this is generally a bad + /// idea. Concurrent calls to `register` will attempt to register different + /// tasks to be notified. One of the callers will win and have its task set, + /// but there is no guarantee as to which caller will succeed. + pub fn register_task(&self, task: Task) { + match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + WAITING => { + unsafe { + // Locked acquired, update the waker cell + *self.task.get() = Some(task.clone()); + + // Release the lock. If the state transitioned to include + // the `NOTIFYING` bit, this means that a notify has been + // called concurrently, so we have to remove the task and + // notify it.` + // + // Start by assuming that the state is `REGISTERING` as this + // is what we jut set it to. + let mut curr = REGISTERING; + + // If a task has to be notified, it will be set here. + let mut notify: Option<Task> = None; + + loop { + let res = self.state.compare_exchange( + curr, WAITING, AcqRel, Acquire); + + match res { + Ok(_) => { + // The atomic exchange was successful, now + // notify the task (if set) and return. + if let Some(task) = notify { + task.notify(); + } + + return; + } + Err(actual) => { + // This branch can only be reached if a + // concurrent thread called `notify`. In this + // case, `actual` **must** be `REGISTERING | + // `NOTIFYING`. + debug_assert_eq!(actual, REGISTERING | NOTIFYING); + + // Take the task to notify once the atomic operation has + // completed. + notify = (*self.task.get()).take(); + + // Update `curr` for the next iteration of the + // loop + curr = actual; + } + } + } + } + } + NOTIFYING => { + // Currently in the process of notifying the task, i.e., + // `notify` is currently being called on the old task handle. + // So, we call notify on the new task handle + task.notify(); + } + state => { + // In this case, a concurrent thread is holding the + // "registering" lock. This probably indicates a bug in the + // caller's code as racing to call `register` doesn't make much + // sense. + // + // We just want to maintain memory safety. It is ok to drop the + // call to `register`. + debug_assert!( + state == REGISTERING || + state == REGISTERING | NOTIFYING); + } + } + } + + /// Attempts to take the `Task` value out of the `AtomicTask` with the + /// intention that the caller will notify the task. + pub fn take_to_notify(&self) -> Option<Task> { + // AcqRel ordering is used in order to acquire the value of the `task` + // cell as well as to establish a `release` ordering with whatever + // memory the `AtomicTask` is associated with. + match self.state.fetch_or(NOTIFYING, AcqRel) { + WAITING => { + // The notifying lock has been acquired. + let task = unsafe { (*self.task.get()).take() }; + + // Release the lock + self.state.fetch_and(!NOTIFYING, Release); + + task + } + state => { + // There is a concurrent thread currently updating the + // associated task. + // + // Nothing more to do as the `NOTIFYING` bit has been set. It + // doesn't matter if there are concurrent registering threads or + // not. + // + debug_assert!( + state == REGISTERING || + state == REGISTERING | NOTIFYING || + state == NOTIFYING); + + None + } + } + } + + /// Notifies the task that last called `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn notify(&self) { + if let Some(task) = self.take_to_notify() { + task.notify(); + } + } +} + +impl Default for AtomicTask { + fn default() -> Self { + AtomicTask::new() + } +} + +impl fmt::Debug for AtomicTask { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "AtomicTask") + } +} + +unsafe impl Send for AtomicTask {} +unsafe impl Sync for AtomicTask {} diff --git a/third_party/rust/tokio-reactor/src/background.rs b/third_party/rust/tokio-reactor/src/background.rs new file mode 100644 index 0000000000..88f78a8bcb --- /dev/null +++ b/third_party/rust/tokio-reactor/src/background.rs @@ -0,0 +1,217 @@ +use {Reactor, Handle, Task}; +use atomic_task::AtomicTask; + +use futures::{Future, Async, Poll, task}; + +use std::io; +use std::thread; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// Handle to the reactor running on a background thread. +/// +/// Instances are created by calling [`Reactor::background`]. +/// +/// [`Reactor::background`]: struct.Reactor.html#method.background +#[derive(Debug)] +pub struct Background { + /// When `None`, the reactor thread will run until the process terminates. + inner: Option<Inner>, +} + +/// Future that resolves when the reactor thread has shutdown. +#[derive(Debug)] +pub struct Shutdown { + inner: Inner, +} + +/// Actual Background handle. +#[derive(Debug)] +struct Inner { + /// Handle to the reactor + handle: Handle, + + /// Shared state between the background handle and the reactor thread. + shared: Arc<Shared>, +} + +#[derive(Debug)] +struct Shared { + /// Signal the reactor thread to shutdown. + shutdown: AtomicUsize, + + /// Task to notify when the reactor thread enters a shutdown state. + shutdown_task: AtomicTask, +} + +/// Notifies the reactor thread to shutdown once the reactor becomes idle. +const SHUTDOWN_IDLE: usize = 1; + +/// Notifies the reactor thread to shutdown immediately. +const SHUTDOWN_NOW: usize = 2; + +/// The reactor is currently shutdown. +const SHUTDOWN: usize = 3; + +// ===== impl Background ===== + +impl Background { + /// Launch a reactor in the background and return a handle to the thread. + pub(crate) fn new(reactor: Reactor) -> io::Result<Background> { + // Grab a handle to the reactor + let handle = reactor.handle().clone(); + + // Create the state shared between the background handle and the reactor + // thread. + let shared = Arc::new(Shared { + shutdown: AtomicUsize::new(0), + shutdown_task: AtomicTask::new(), + }); + + // For the reactor thread + let shared2 = shared.clone(); + + // Start the reactor thread + thread::Builder::new() + .spawn(move || run(reactor, shared2))?; + + Ok(Background { + inner: Some(Inner { + handle, + shared, + }), + }) + } + + /// Returns a reference to the reactor handle. + pub fn handle(&self) -> &Handle { + &self.inner.as_ref().unwrap().handle + } + + /// Shutdown the reactor on idle. + /// + /// Returns a future that completes once the reactor thread has shutdown. + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.shutdown_on_idle(); + + Shutdown { inner } + } + + /// Shutdown the reactor immediately + /// + /// Returns a future that completes once the reactor thread has shutdown. + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.shutdown_now(); + + Shutdown { inner } + } + + /// Run the reactor on its thread until the process terminates. + pub fn forget(mut self) { + drop(self.inner.take()); + } +} + +impl Drop for Background { + fn drop(&mut self) { + let inner = match self.inner.take() { + Some(i) => i, + None => return, + }; + + inner.shutdown_now(); + + let shutdown = Shutdown { inner }; + let _ = shutdown.wait(); + } +} + +// ===== impl Shutdown ===== + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let task = Task::Futures1(task::current()); + self.inner.shared.shutdown_task.register_task(task); + + if !self.inner.is_shutdown() { + return Ok(Async::NotReady); + } + + Ok(().into()) + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Returns true if the reactor thread is shutdown. + fn is_shutdown(&self) -> bool { + self.shared.shutdown.load(SeqCst) == SHUTDOWN + } + + /// Notify the reactor thread to shutdown once the reactor transitions to an + /// idle state. + fn shutdown_on_idle(&self) { + self.shared.shutdown + .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst); + self.handle.wakeup(); + } + + /// Notify the reactor thread to shutdown immediately. + fn shutdown_now(&self) { + let mut curr = self.shared.shutdown.load(SeqCst); + + loop { + if curr >= SHUTDOWN_NOW { + return; + } + + let act = self.shared.shutdown + .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst); + + if act == curr { + self.handle.wakeup(); + return; + } + + curr = act; + } + } +} + +// ===== impl Reactor thread ===== + +fn run(mut reactor: Reactor, shared: Arc<Shared>) { + debug!("starting background reactor"); + loop { + let shutdown = shared.shutdown.load(SeqCst); + + if shutdown == SHUTDOWN_NOW { + debug!("shutting background reactor down NOW"); + break; + } + + if shutdown == SHUTDOWN_IDLE && reactor.is_idle() { + debug!("shutting background reactor on idle"); + break; + } + + reactor.turn(None).unwrap(); + } + + drop(reactor); + + // Transition the state to shutdown + shared.shutdown.store(SHUTDOWN, SeqCst); + + // Notify any waiters + shared.shutdown_task.notify(); + + debug!("background reactor has shutdown"); +} diff --git a/third_party/rust/tokio-reactor/src/lib.rs b/third_party/rust/tokio-reactor/src/lib.rs new file mode 100644 index 0000000000..381d77af03 --- /dev/null +++ b/third_party/rust/tokio-reactor/src/lib.rs @@ -0,0 +1,773 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! The reactor is the engine that drives asynchronous I/O resources (like TCP and +//! UDP sockets). It is backed by [`mio`] and acts as a bridge between [`mio`] and +//! [`futures`]. +//! +//! The crate provides: +//! +//! * [`Reactor`] is the main type of this crate. It performs the event loop logic. +//! +//! * [`Handle`] provides a reference to a reactor instance. +//! +//! * [`Registration`] and [`PollEvented`] allow third parties to implement I/O +//! resources that are driven by the reactor. +//! +//! Application authors will not use this crate directly. Instead, they will use the +//! `tokio` crate. Library authors should only depend on `tokio-reactor` if they +//! are building a custom I/O resource. +//! +//! For more details, see [reactor module] documentation in the Tokio crate. +//! +//! [`mio`]: http://github.com/carllerche/mio +//! [`futures`]: http://github.com/rust-lang-nursery/futures-rs +//! [`Reactor`]: struct.Reactor.html +//! [`Handle`]: struct.Handle.html +//! [`Registration`]: struct.Registration.html +//! [`PollEvented`]: struct.PollEvented.html +//! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html + +#![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.3")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +#[macro_use] +extern crate futures; +#[macro_use] +extern crate log; +extern crate mio; +extern crate slab; +extern crate tokio_executor; +extern crate tokio_io; + +#[cfg(feature = "unstable-futures")] +extern crate futures2; + +mod atomic_task; +pub(crate) mod background; +mod poll_evented; +mod registration; + +// ===== Public re-exports ===== + +pub use self::background::{Background, Shutdown}; +pub use self::registration::Registration; +pub use self::poll_evented::PollEvented; + +// ===== Private imports ===== + +use atomic_task::AtomicTask; + +use tokio_executor::Enter; +use tokio_executor::park::{Park, Unpark}; + +use std::{fmt, usize}; +use std::io; +use std::mem; +use std::cell::RefCell; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; +use std::sync::{Arc, Weak, RwLock}; +use std::time::{Duration, Instant}; + +use log::Level; +use mio::event::Evented; +use slab::Slab; + +/// The core reactor, or event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +pub struct Reactor { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, + + _wakeup_registration: mio::Registration, +} + +/// A reference to a reactor. +/// +/// A `Handle` is used for associating I/O objects with an event loop +/// explicitly. Typically though you won't end up using a `Handle` that often +/// and will instead use the default reactor for the execution context. +/// +/// By default, most components bind lazily to reactors. +/// To get this behavior when manually passing a `Handle`, use `default()`. +#[derive(Clone)] +pub struct Handle { + inner: Option<HandlePriv>, +} + +/// Like `Handle`, but never `None`. +#[derive(Clone)] +struct HandlePriv { + inner: Weak<Inner>, +} + +/// Return value from the `turn` method on `Reactor`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn { + _priv: (), +} + +/// Error returned from `Handle::set_fallback`. +#[derive(Clone, Debug)] +pub struct SetFallbackError(()); + +#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")] +#[doc(hidden)] +pub type SetDefaultError = SetFallbackError; + +#[test] +fn test_handle_size() { + use std::mem; + assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>()); +} + +struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// ABA guard counter + next_aba_guard: AtomicUsize, + + /// Dispatch slabs for I/O and futures events + io_dispatch: RwLock<Slab<ScheduledIo>>, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness +} + +struct ScheduledIo { + aba_guard: usize, + readiness: AtomicUsize, + reader: AtomicTask, + writer: AtomicTask, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(crate) enum Direction { + Read, + Write, +} + +/// The global fallback reactor. +static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT; + +/// Tracks the reactor for the current execution context. +thread_local!(static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)); + +const TOKEN_SHIFT: usize = 22; + +// Kind of arbitrary, but this reserves some token space for later usage. +const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; +const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +/// A wakeup handle for a task, which may be either a futures 0.1 or 0.2 task +#[derive(Debug, Clone)] +pub(crate) enum Task { + Futures1(futures::task::Task), + #[cfg(feature = "unstable-futures")] + Futures2(futures2::task::Waker), +} + +// ===== impl Reactor ===== + +/// Set the default reactor for the duration of the closure +/// +/// # Panics +/// +/// This function panics if there already is a default reactor set. +pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R +where F: FnOnce(&mut Enter) -> R +{ + // Ensure that the executor is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset; + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } + } + + // This ensures the value for the current reactor gets reset even if there + // is a panic. + let _r = Reset; + + CURRENT_REACTOR.with(|current| { + { + let mut current = current.borrow_mut(); + + assert!(current.is_none(), "default Tokio reactor already set \ + for execution context"); + + let handle = match handle.as_priv() { + Some(handle) => handle, + None => { + panic!("`handle` does not reference a reactor"); + } + }; + + *current = Some(handle.clone()); + } + + f(enter) + }) +} + +impl Reactor { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result<Reactor> { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register(&wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level())?; + + Ok(Reactor { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io: io, + next_aba_guard: AtomicUsize::new(0), + io_dispatch: RwLock::new(Slab::with_capacity(1)), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub fn handle(&self) -> Handle { + Handle { + inner: Some(HandlePriv { + inner: Arc::downgrade(&self.inner), + }), + } + } + + /// Configures the fallback handle to be returned from `Handle::default`. + /// + /// The `Handle::default()` function will by default lazily spin up a global + /// thread and run a reactor on this global thread. This behavior is not + /// always desirable in all applications, however, and sometimes a different + /// fallback reactor is desired. + /// + /// This function will attempt to globally alter the return value of + /// `Handle::default()` to return the `handle` specified rather than a + /// lazily initialized global thread. If successful then all future calls to + /// `Handle::default()` which would otherwise fall back to the global thread + /// will instead return a clone of the handle specified. + /// + /// # Errors + /// + /// This function may not always succeed in configuring the fallback handle. + /// If this function was previously called (or perhaps concurrently called + /// on many threads) only the *first* invocation of this function will + /// succeed. All other invocations will return an error. + /// + /// Additionally if the global reactor thread has already been initialized + /// then this function will also return an error. (aka if `Handle::default` + /// has been called previously in this program). + pub fn set_fallback(&self) -> Result<(), SetFallbackError> { + set_fallback(self.handle().into_priv().unwrap()) + } + + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// This method is the primary method of running this reactor and processing + /// I/O events that occur. This method executes one iteration of an event + /// loop, blocking at most once waiting for events to happen. + /// + /// If a `max_wait` is specified then the method should block no longer than + /// the duration specified, but this shouldn't be used as a super-precise + /// timer but rather a "ballpark approximation" + /// + /// # Return value + /// + /// This function returns an instance of `Turn` + /// + /// `Turn` as of today has no extra information with it and can be safely + /// discarded. In the future `Turn` may contain information about what + /// happened while this reactor blocked. + /// + /// # Errors + /// + /// This function may also return any I/O error which occurs when polling + /// for readiness of I/O objects with the OS. This is quite unlikely to + /// arise and typically mean that things have gone horribly wrong at that + /// point. Currently this is primarily only known to happen for internal + /// bugs to `tokio` itself. + pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { + self.poll(max_wait)?; + Ok(Turn { _priv: () }) + } + + /// Returns true if the reactor is currently idle. + /// + /// Idle is defined as all tasks that have been spawned have completed, + /// either successfully or with an error. + pub fn is_idle(&self) -> bool { + self.inner.io_dispatch + .read().unwrap() + .is_empty() + } + + /// Run this reactor on a background thread. + /// + /// This function takes ownership, spawns a new thread, and moves the + /// reactor to this new thread. It then runs the reactor, driving all + /// associated I/O resources, until the `Background` handle is dropped or + /// explicitly shutdown. + pub fn background(self) -> io::Result<Background> { + Background::new(self) + } + + fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.inner.io.poll(&mut self.events, max_wait) { + Ok(_) => {} + Err(e) => return Err(e), + } + + let start = if log_enabled!(Level::Debug) { + Some(Instant::now()) + } else { + None + }; + + // Process all the events that came in, dispatching appropriately + let mut events = 0; + for event in self.events.iter() { + events += 1; + let token = event.token(); + trace!("event {:?} {:?}", event.readiness(), event.token()); + + if token == TOKEN_WAKEUP { + self.inner.wakeup.set_readiness(mio::Ready::empty()).unwrap(); + } else { + self.dispatch(token, event.readiness()); + } + } + + if let Some(start) = start { + let dur = start.elapsed(); + debug!("loop process - {} events, {}.{:03}s", + events, + dur.as_secs(), + dur.subsec_nanos() / 1_000_000); + } + + Ok(()) + } + + fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + let aba_guard = token.0 & !MAX_SOURCES; + let token = token.0 & MAX_SOURCES; + + let mut rd = None; + let mut wr = None; + + // Create a scope to ensure that notifying the tasks stays out of the + // lock's critical section. + { + let io_dispatch = self.inner.io_dispatch.read().unwrap(); + + let io = match io_dispatch.get(token) { + Some(io) => io, + None => return, + }; + + if aba_guard != io.aba_guard { + return; + } + + io.readiness.fetch_or(ready.as_usize(), Relaxed); + + if ready.is_writable() || platform::is_hup(&ready) { + wr = io.writer.take_to_notify(); + } + + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_to_notify(); + } + } + + if let Some(task) = rd { + task.notify(); + } + + if let Some(task) = wr { + task.notify(); + } + } +} + +impl Park for Reactor { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } +} + +impl fmt::Debug for Reactor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Reactor") + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current reactor. + pub fn current() -> Handle { + // TODO: Should this panic on error? + HandlePriv::try_current() + .map(|handle| Handle { + inner: Some(handle), + }) + .unwrap_or(Handle { + inner: Some(HandlePriv { + inner: Weak::new(), + }) + }) + } + + fn as_priv(&self) -> Option<&HandlePriv> { + self.inner.as_ref() + } + + fn into_priv(self) -> Option<HandlePriv> { + self.inner + } + + fn wakeup(&self) { + if let Some(handle) = self.as_priv() { + handle.wakeup(); + } + } +} + +impl Unpark for Handle { + fn unpark(&self) { + if let Some(ref h) = self.inner { + h.wakeup(); + } + } +} + +impl Default for Handle { + /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor. + fn default() -> Handle { + Handle { inner: None } + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Handle") + } +} + +fn set_fallback(handle: HandlePriv) -> Result<(), SetFallbackError> { + unsafe { + let val = handle.into_usize(); + match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { + Ok(_) => Ok(()), + Err(_) => { + drop(HandlePriv::from_usize(val)); + Err(SetFallbackError(())) + } + } + } +} + +// ===== impl HandlePriv ===== + +impl HandlePriv { + /// Try to get a handle to the current reactor. + /// + /// Returns `Err` if no handle is found. + pub(crate) fn try_current() -> io::Result<HandlePriv> { + CURRENT_REACTOR.with(|current| { + match *current.borrow() { + Some(ref handle) => Ok(handle.clone()), + None => HandlePriv::fallback(), + } + }) + } + + /// Returns a handle to the fallback reactor. + fn fallback() -> io::Result<HandlePriv> { + let mut fallback = HANDLE_FALLBACK.load(SeqCst); + + // If the fallback hasn't been previously initialized then let's spin + // up a helper thread and try to initialize with that. If we can't + // actually create a helper thread then we'll just return a "defunct" + // handle which will return errors when I/O objects are attempted to be + // associated. + if fallback == 0 { + let reactor = match Reactor::new() { + Ok(reactor) => reactor, + Err(_) => return Err(io::Error::new(io::ErrorKind::Other, + "failed to create reactor")), + }; + + // If we successfully set ourselves as the actual fallback then we + // want to `forget` the helper thread to ensure that it persists + // globally. If we fail to set ourselves as the fallback that means + // that someone was racing with this call to `Handle::default`. + // They ended up winning so we'll destroy our helper thread (which + // shuts down the thread) and reload the fallback. + if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() { + let ret = reactor.handle().into_priv().unwrap(); + + match reactor.background() { + Ok(bg) => bg.forget(), + // The global handle is fubar, but y'all probably got bigger + // problems if a thread can't spawn. + Err(_) => {} + } + + return Ok(ret); + } + + fallback = HANDLE_FALLBACK.load(SeqCst); + } + + // At this point our fallback handle global was configured so we use + // its value to reify a handle, clone it, and then forget our reified + // handle as we don't actually have an owning reference to it. + assert!(fallback != 0); + + let ret = unsafe { + let handle = HandlePriv::from_usize(fallback); + let ret = handle.clone(); + + // This prevents `handle` from being dropped and having the ref + // count decremented. + drop(handle.into_usize()); + + ret + }; + + Ok(ret) + } + + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + } + } + + fn into_usize(self) -> usize { + unsafe { + mem::transmute::<Weak<Inner>, usize>(self.inner) + } + } + + unsafe fn from_usize(val: usize) -> HandlePriv { + let inner = mem::transmute::<usize, Weak<Inner>>(val);; + HandlePriv { inner } + } + + fn inner(&self) -> Option<Arc<Inner>> { + self.inner.upgrade() + } +} + +impl fmt::Debug for HandlePriv { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "HandlePriv") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Register an I/O resource with the reactor. + /// + /// The registration token is returned. + fn add_source(&self, source: &Evented) + -> io::Result<usize> + { + // Get an ABA guard value + let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); + + let mut io_dispatch = self.io_dispatch.write().unwrap(); + + if io_dispatch.len() == MAX_SOURCES { + return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \ + registered I/O resources")); + } + + // Acquire a write lock + let key = io_dispatch.insert(ScheduledIo { + aba_guard, + readiness: AtomicUsize::new(0), + reader: AtomicTask::new(), + writer: AtomicTask::new(), + }); + + try!(self.io.register(source, + mio::Token(aba_guard | key), + mio::Ready::all(), + mio::PollOpt::edge())); + + Ok(key) + } + + /// Deregisters an I/O resource from the reactor. + fn deregister_source(&self, source: &Evented) -> io::Result<()> { + self.io.deregister(source) + } + + fn drop_source(&self, token: usize) { + debug!("dropping I/O source: {}", token); + self.io_dispatch.write().unwrap().remove(token); + } + + /// Registers interest in the I/O resource associated with `token`. + fn register(&self, token: usize, dir: Direction, t: Task) { + debug!("scheduling direction for: {}", token); + let io_dispatch = self.io_dispatch.read().unwrap(); + let sched = io_dispatch.get(token).unwrap(); + + let (task, ready) = match dir { + Direction::Read => (&sched.reader, !mio::Ready::writable()), + Direction::Write => (&sched.writer, mio::Ready::writable()), + }; + + task.register_task(t); + + if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { + task.notify(); + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // When a reactor is dropped it needs to wake up all blocked tasks as + // they'll never receive a notification, and all connected I/O objects + // will start returning errors pretty quickly. + let io = self.io_dispatch.read().unwrap(); + for (_, io) in io.iter() { + io.writer.notify(); + io.reader.notify(); + } + } +} + +impl Direction { + fn mask(&self) -> mio::Ready { + match *self { + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup(), + } + } +} + +impl Task { + fn notify(&self) { + match *self { + Task::Futures1(ref task) => task.notify(), + + #[cfg(feature = "unstable-futures")] + Task::Futures2(ref waker) => waker.wake(), + } + } +} + +#[cfg(unix)] +mod platform { + use mio::Ready; + use mio::unix::UnixReady; + + pub fn hup() -> Ready { + UnixReady::hup().into() + } + + pub fn is_hup(ready: &Ready) -> bool { + UnixReady::from(*ready).is_hup() + } +} + +#[cfg(windows)] +mod platform { + use mio::Ready; + + pub fn hup() -> Ready { + Ready::empty() + } + + pub fn is_hup(_: &Ready) -> bool { + false + } +} + +#[cfg(feature = "unstable-futures")] +fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> { + match old { + futures::Async::Ready(x) => futures2::Async::Ready(x), + futures::Async::NotReady => futures2::Async::Pending, + } +} + +#[cfg(feature = "unstable-futures")] +fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> { + match new { + futures2::Async::Ready(x) => futures::Async::Ready(x), + futures2::Async::Pending => futures::Async::NotReady, + } +} diff --git a/third_party/rust/tokio-reactor/src/poll_evented.rs b/third_party/rust/tokio-reactor/src/poll_evented.rs new file mode 100644 index 0000000000..99dd8aa467 --- /dev/null +++ b/third_party/rust/tokio-reactor/src/poll_evented.rs @@ -0,0 +1,664 @@ +use {Handle, Registration}; + +use futures::{task, Async, Poll}; +use mio; +use mio::event::Evented; +use tokio_io::{AsyncRead, AsyncWrite}; + +#[cfg(feature = "unstable-futures")] +use futures2; + +use std::fmt; +use std::io::{self, Read, Write}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +/// Associates an I/O resource that implements the [`std::Read`] and / or +/// [`std::Write`] traits with the reactor that drives it. +/// +/// `PollEvented` uses [`Registration`] internally to take a type that +/// implements [`mio::Evented`] as well as [`std::Read`] and or [`std::Write`] +/// and associate it with a reactor that will drive it. +/// +/// Once the [`mio::Evented`] type is wrapped by `PollEvented`, it can be +/// used from within the future's execution model. As such, the `PollEvented` +/// type provides [`AsyncRead`] and [`AsyncWrite`] implementations using the +/// underlying I/O resource as well as readiness events provided by the reactor. +/// +/// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is +/// `Sync`), the caller must ensure that there are at most two tasks that use a +/// `PollEvented` instance concurrently. One for reading and one for writing. +/// While violating this requirement is "safe" from a Rust memory model point of +/// view, it will result in unexpected behavior in the form of lost +/// notifications and tasks hanging. +/// +/// ## Readiness events +/// +/// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, +/// this type also supports access to the underlying readiness event stream. +/// While similar in function to what [`Registration`] provides, the semantics +/// are a bit different. +/// +/// Two functions are provided to access the readiness events: +/// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the +/// current readiness state of the `PollEvented` instance. If +/// [`poll_read_ready`] indicates read readiness, immediately calling +/// [`poll_read_ready`] again will also indicate read readiness. +/// +/// When the operation is attempted and is unable to succeed due to the I/O +/// resource not being ready, the caller must call [`clear_read_ready`] or +/// [`clear_write_ready`]. This clears the readiness state until a new readiness +/// event is received. +/// +/// This allows the caller to implement additional functions. For example, +/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and +/// [`clear_read_ready`]. +/// +/// ```rust,ignore +/// pub fn poll_accept(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> { +/// let ready = Ready::readable(); +/// +/// try_ready!(self.poll_evented.poll_read_ready(ready)); +/// +/// match self.poll_evented.get_ref().accept_std() { +/// Ok(pair) => Ok(Async::Ready(pair)), +/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { +/// self.poll_evented.clear_read_ready(ready); +/// Ok(Async::NotReady) +/// } +/// Err(e) => Err(e), +/// } +/// } +/// ``` +/// +/// ## Platform-specific events +/// +/// `PollEvented` also allows receiving platform-specific `mio::Ready` events. +/// These events are included as part of the read readiness event stream. The +/// write readiness event stream is only for `Ready::writable()` events. +/// +/// [`std::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +/// [`std::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +/// [`AsyncRead`]: ../io/trait.AsyncRead.html +/// [`AsyncWrite`]: ../io/trait.AsyncWrite.html +/// [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +/// [`Registration`]: struct.Registration.html +/// [`TcpListener`]: ../net/struct.TcpListener.html +/// [`clear_read_ready`]: #method.clear_read_ready +/// [`clear_read_ready`]: #method.clear_read_ready +/// [`poll_read_ready`]: #method.poll_read_ready +/// [`poll_write_ready`]: #method.poll_write_ready +pub struct PollEvented<E: Evented> { + io: Option<E>, + inner: Inner, +} + +struct Inner { + registration: Registration, + + /// Currently visible read readiness + read_readiness: AtomicUsize, + + /// Currently visible write readiness + write_readiness: AtomicUsize, +} + +// ===== impl PollEvented ===== + +macro_rules! poll_ready { + ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{ + $me.register()?; + + // Load cached & encoded readiness. + let mut cached = $me.inner.$cache.load(Relaxed); + let mask = $mask | ::platform::hup(); + + // See if the current readiness matches any bits. + let mut ret = mio::Ready::from_usize(cached) & $mask; + + if ret.is_empty() { + // Readiness does not match, consume the registration's readiness + // stream. This happens in a loop to ensure that the stream gets + // drained. + loop { + let ready = try_ready!($poll); + cached |= ready.as_usize(); + + // Update the cache store + $me.inner.$cache.store(cached, Relaxed); + + ret |= ready & mask; + + if !ret.is_empty() { + return Ok(ret.into()); + } + } + } else { + // Check what's new with the registration stream. This will not + // request to be notified + if let Some(ready) = $me.inner.registration.$take()? { + cached |= ready.as_usize(); + $me.inner.$cache.store(cached, Relaxed); + } + + Ok(mio::Ready::from_usize(cached).into()) + } + }} +} + +impl<E> PollEvented<E> +where E: Evented +{ + /// Creates a new `PollEvented` associated with the default reactor. + pub fn new(io: E) -> PollEvented<E> { + PollEvented { + io: Some(io), + inner: Inner { + registration: Registration::new(), + read_readiness: AtomicUsize::new(0), + write_readiness: AtomicUsize::new(0), + } + } + } + + /// Creates a new `PollEvented` associated with the specified reactor. + pub fn new_with_handle(io: E, handle: &Handle) -> io::Result<Self> { + let ret = PollEvented::new(io); + + if let Some(handle) = handle.as_priv() { + ret.inner.registration + .register_with_priv(ret.io.as_ref().unwrap(), handle)?; + } + + Ok(ret) + } + + /// Returns a shared reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_ref(&self) -> &E { + self.io.as_ref().unwrap() + } + + /// Returns a mutable reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_mut(&mut self) -> &mut E { + self.io.as_mut().unwrap() + } + + /// Consumes self, returning the inner I/O object + /// + /// This function will deregister the I/O resource from the reactor before + /// returning. If the deregistration operation fails, an error is returned. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + pub fn into_inner(mut self) -> io::Result<E> { + let io = self.io.take().unwrap(); + self.inner.registration.deregister(&io)?; + Ok(io) + } + + /// Check the I/O resource's read readiness state. + /// + /// The mask argument allows specifying what readiness to notify on. This + /// can be any value, including platform specific readiness, **except** + /// `writable`. HUP is always implicitly included on platforms that support + /// it. + /// + /// If the resource is not ready for a read then `Async::NotReady` is + /// returned and the current task is notified once a new event is received. + /// + /// The I/O resource will remain in a read-ready state until readiness is + /// cleared by calling [`clear_read_ready`]. + /// + /// [`clear_read_ready`]: #method.clear_read_ready + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` includes writable. + /// * called from outside of a task context. + pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> { + assert!(!mask.is_writable(), "cannot poll for write readiness"); + poll_ready!( + self, mask, read_readiness, take_read_ready, + self.inner.registration.poll_read_ready() + ) + } + + /// Like `poll_read_ready` but compatible with futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context, mask: mio::Ready) + -> futures2::Poll<mio::Ready, io::Error> + { + assert!(!mask.is_writable(), "cannot poll for write readiness"); + let mut res = || poll_ready!( + self, mask, read_readiness, take_read_ready, + self.inner.registration.poll_read_ready2(cx).map(::lower_async) + ); + res().map(::lift_async) + } + + /// Clears the I/O resource's read readiness state and registers the current + /// task to be notified once a read readiness event is received. + /// + /// After calling this function, `poll_read_ready` will return `NotReady` + /// until a new read readiness event has been received. + /// + /// The `mask` argument specifies the readiness bits to clear. This may not + /// include `writable` or `hup`. + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` includes writable or HUP + /// * called from outside of a task context. + pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> { + // Cannot clear write readiness + assert!(!ready.is_writable(), "cannot clear write readiness"); + assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness"); + + self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed); + + if self.poll_read_ready(ready)?.is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Like `clear_read_ready` but compatible with futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn clear_read_ready2(&self, cx: &mut futures2::task::Context, ready: mio::Ready) + -> io::Result<()> + { + // Cannot clear write readiness + assert!(!ready.is_writable(), "cannot clear write readiness"); + assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness"); + + self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed); + + if self.poll_read_ready2(cx, ready)?.is_ready() { + // Notify the current task + cx.waker().wake() + } + + Ok(()) + } + + /// Check the I/O resource's write readiness state. + /// + /// This always checks for writable readiness and also checks for HUP + /// readiness on platforms that support it. + /// + /// If the resource is not ready for a write then `Async::NotReady` is + /// returned and the current task is notified once a new event is received. + /// + /// The I/O resource will remain in a write-ready state until readiness is + /// cleared by calling [`clear_write_ready`]. + /// + /// [`clear_write_ready`]: #method.clear_write_ready + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` contains bits besides `writable` and `hup`. + /// * called from outside of a task context. + pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { + poll_ready!( + self, + mio::Ready::writable(), + write_readiness, + take_write_ready, + self.inner.registration.poll_write_ready() + ) + } + + /// Like `poll_write_ready` but compatible with futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context) + -> futures2::Poll<mio::Ready, io::Error> + { + let mut res = || poll_ready!( + self, + mio::Ready::writable(), + write_readiness, + take_write_ready, + self.inner.registration.poll_write_ready2(cx).map(::lower_async) + ); + res().map(::lift_async) + } + + + /// Resets the I/O resource's write readiness state and registers the current + /// task to be notified once a write readiness event is received. + /// + /// This only clears writable readiness. HUP (on platforms that support HUP) + /// cannot be cleared as it is a final state. + /// + /// After calling this function, `poll_write_ready(Ready::writable())` will + /// return `NotReady` until a new write readiness event has been received. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn clear_write_ready(&self) -> io::Result<()> { + let ready = mio::Ready::writable(); + + self.inner.write_readiness.fetch_and(!ready.as_usize(), Relaxed); + + if self.poll_write_ready()?.is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Like `clear_write_ready`, but compatible with futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn clear_write_ready2(&self, cx: &mut futures2::task::Context) -> io::Result<()> { + let ready = mio::Ready::writable(); + + self.inner.write_readiness.fetch_and(!ready.as_usize(), Relaxed); + + if self.poll_write_ready2(cx)?.is_ready() { + // Notify the current task + cx.waker().wake() + } + + Ok(()) + } + + /// Ensure that the I/O resource is registered with the reactor. + fn register(&self) -> io::Result<()> { + self.inner.registration.register(self.io.as_ref().unwrap())?; + Ok(()) + } +} + +// ===== Read / Write impls ===== + +impl<E> Read for PollEvented<E> +where E: Evented + Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().read(buf); + + if is_wouldblock(&r) { + self.clear_read_ready(mio::Ready::readable())?; + } + + return r + } +} + +#[cfg(feature = "unstable-futures")] +impl<E> futures2::io::AsyncRead for PollEvented<E> + where E: Evented, E: Read, +{ + fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending); + } + + match self.get_mut().read(buf) { + Ok(n) => Ok(futures2::Async::Ready(n)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } +} + +impl<E> Write for PollEvented<E> +where E: Evented + Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().write(buf); + + if is_wouldblock(&r) { + self.clear_write_ready()?; + } + + return r + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().flush(); + + if is_wouldblock(&r) { + self.clear_write_ready()?; + } + + return r + } +} + +#[cfg(feature = "unstable-futures")] +impl<E> futures2::io::AsyncWrite for PollEvented<E> + where E: Evented, E: Write, +{ + fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.poll_write_ready2(cx)? { + return Ok(futures2::Async::Pending); + } + + match self.get_mut().write(buf) { + Ok(n) => Ok(futures2::Async::Ready(n)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_write_ready2(cx)?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + if let futures2::Async::Pending = self.poll_write_ready2(cx)? { + return Ok(futures2::Async::Pending); + } + + match self.get_mut().flush() { + Ok(_) => Ok(futures2::Async::Ready(())), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_write_ready2(cx)?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_flush(self, cx) + } +} + + +impl<E> AsyncRead for PollEvented<E> +where E: Evented + Read, +{ +} + +impl<E> AsyncWrite for PollEvented<E> +where E: Evented + Write, +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +// ===== &'a Read / &'a Write impls ===== + +impl<'a, E> Read for &'a PollEvented<E> +where E: Evented, &'a E: Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_ref().read(buf); + + if is_wouldblock(&r) { + self.clear_read_ready(mio::Ready::readable())?; + } + + return r + } +} + +#[cfg(feature = "unstable-futures")] +impl<'a, E> futures2::io::AsyncRead for &'a PollEvented<E> + where E: Evented, &'a E: Read, +{ + fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending); + } + + match self.get_ref().read(buf) { + Ok(n) => Ok(futures2::Async::Ready(n)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } +} + +impl<'a, E> Write for &'a PollEvented<E> +where E: Evented, &'a E: Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_ref().write(buf); + + if is_wouldblock(&r) { + self.clear_write_ready()?; + } + + return r + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_ref().flush(); + + if is_wouldblock(&r) { + self.clear_write_ready()?; + } + + return r + } +} + +#[cfg(feature = "unstable-futures")] +impl<'a, E> futures2::io::AsyncWrite for &'a PollEvented<E> + where E: Evented, &'a E: Write, +{ + fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.poll_write_ready2(cx)? { + return Ok(futures2::Async::Pending); + } + + match self.get_ref().write(buf) { + Ok(n) => Ok(futures2::Async::Ready(n)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_write_ready2(cx)?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + if let futures2::Async::Pending = self.poll_write_ready2(cx)? { + return Ok(futures2::Async::Pending); + } + + match self.get_ref().flush() { + Ok(_) => Ok(futures2::Async::Ready(())), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_write_ready2(cx)?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_flush(self, cx) + } +} + +impl<'a, E> AsyncRead for &'a PollEvented<E> +where E: Evented, &'a E: Read, +{ +} + +impl<'a, E> AsyncWrite for &'a PollEvented<E> +where E: Evented, &'a E: Write, +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +fn is_wouldblock<T>(r: &io::Result<T>) -> bool { + match *r { + Ok(_) => false, + Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + } +} + +impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PollEvented") + .field("io", &self.io) + .finish() + } +} + +impl<E: Evented> Drop for PollEvented<E> { + fn drop(&mut self) { + if let Some(io) = self.io.take() { + // Ignore errors + let _ = self.inner.registration.deregister(&io); + } + } +} diff --git a/third_party/rust/tokio-reactor/src/registration.rs b/third_party/rust/tokio-reactor/src/registration.rs new file mode 100644 index 0000000000..278b57680c --- /dev/null +++ b/third_party/rust/tokio-reactor/src/registration.rs @@ -0,0 +1,569 @@ +use {Handle, HandlePriv, Direction, Task}; + +use futures::{Async, Poll, task}; +use mio::{self, Evented}; + +#[cfg(feature = "unstable-futures")] +use futures2; + +use std::{io, ptr, usize}; +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// Associates an I/O resource with the reactor instance that drives it. +/// +/// A registration represents an I/O resource registered with a Reactor such +/// that it will receive task notifications on readiness. This is the lowest +/// level API for integrating with a reactor. +/// +/// The association between an I/O resource is made by calling [`register`]. +/// Once the association is established, it remains established until the +/// registration instance is dropped. Subsequent calls to [`register`] are +/// no-ops. +/// +/// A registration instance represents two separate readiness streams. One for +/// the read readiness and one for write readiness. These streams are +/// independent and can be consumed from separate tasks. +/// +/// **Note**: while `Registration` is `Sync`, the caller must ensure that there +/// are at most two tasks that use a registration instance concurrently. One +/// task for [`poll_read_ready`] and one task for [`poll_write_ready`]. While +/// violating this requirement is "safe" from a Rust memory safety point of +/// view, it will result in unexpected behavior in the form of lost +/// notifications and tasks hanging. +/// +/// ## Platform-specific events +/// +/// `Registration` also allows receiving platform-specific `mio::Ready` events. +/// These events are included as part of the read readiness event stream. The +/// write readiness event stream is only for `Ready::writable()` events. +/// +/// [`register`]: #method.register +/// [`poll_read_ready`]: #method.poll_read_ready`] +/// [`poll_write_ready`]: #method.poll_write_ready`] +#[derive(Debug)] +pub struct Registration { + /// Stores the handle. Once set, the value is not changed. + /// + /// Setting this requires acquiring the lock from state. + inner: UnsafeCell<Option<Inner>>, + + /// Tracks the state of the registration. + /// + /// The least significant 2 bits are used to track the lifecycle of the + /// registration. The rest of the `state` variable is a pointer to tasks + /// that must be notified once the lock is released. + state: AtomicUsize, +} + +#[derive(Debug)] +struct Inner { + handle: HandlePriv, + token: usize, +} + +/// Tasks waiting on readiness notifications. +#[derive(Debug)] +struct Node { + direction: Direction, + task: Task, + next: *mut Node, +} + +/// Initial state. The handle is not set and the registration is idle. +const INIT: usize = 0; + +/// A thread locked the state and will associate a handle. +const LOCKED: usize = 1; + +/// A handle has been associated with the registration. +const READY: usize = 2; + +/// Masks the lifecycle state +const LIFECYCLE_MASK: usize = 0b11; + +/// A fake token used to identify error situations +const ERROR: usize = usize::MAX; + +// ===== impl Registration ===== + +impl Registration { + /// Create a new `Registration`. + /// + /// This registration is not associated with a Reactor instance. Call + /// `register` to establish the association. + pub fn new() -> Registration { + Registration { + inner: UnsafeCell::new(None), + state: AtomicUsize::new(INIT), + } + } + + /// Register the I/O resource with the default reactor. + /// + /// This function is safe to call concurrently and repeatedly. However, only + /// the first call will establish the registration. Subsequent calls will be + /// no-ops. + /// + /// # Return + /// + /// If the registration happened successfully, `Ok(true)` is returned. + /// + /// If an I/O resource has previously been successfully registered, + /// `Ok(false)` is returned. + /// + /// If an error is encountered during registration, `Err` is returned. + pub fn register<T>(&self, io: &T) -> io::Result<bool> + where T: Evented, + { + self.register2(io, || HandlePriv::try_current()) + } + + /// Deregister the I/O resource from the reactor it is associated with. + /// + /// This function must be called before the I/O resource associated with the + /// registration is dropped. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + /// + /// # Return + /// + /// If the deregistration was successful, `Ok` is returned. Any calls to + /// `Reactor::turn` that happen after a successful call to `deregister` will + /// no longer result in notifications getting sent for this registration. + /// + /// `Err` is returned if an error is encountered. + pub fn deregister<T>(&mut self, io: &T) -> io::Result<()> + where T: Evented, + { + // The state does not need to be checked and coordination is not + // necessary as this function takes `&mut self`. This guarantees a + // single thread is accessing the instance. + if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } { + inner.deregister(io)?; + } + + Ok(()) + } + + /// Register the I/O resource with the specified reactor. + /// + /// This function is safe to call concurrently and repeatedly. However, only + /// the first call will establish the registration. Subsequent calls will be + /// no-ops. + /// + /// If the registration happened successfully, `Ok(true)` is returned. + /// + /// If an I/O resource has previously been successfully registered, + /// `Ok(false)` is returned. + /// + /// If an error is encountered during registration, `Err` is returned. + pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool> + where T: Evented, + { + self.register2(io, || { + match handle.as_priv() { + Some(handle) => Ok(handle.clone()), + None => HandlePriv::try_current(), + } + }) + } + + pub(crate) fn register_with_priv<T>(&self, io: &T, handle: &HandlePriv) -> io::Result<bool> + where T: Evented, + { + self.register2(io, || Ok(handle.clone())) + } + + fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool> + where T: Evented, + F: Fn() -> io::Result<HandlePriv>, + { + let mut state = self.state.load(SeqCst); + + loop { + match state { + INIT => { + // Registration is currently not associated with a handle. + // Get a handle then attempt to lock the state. + let handle = f()?; + + let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst); + + if actual != state { + state = actual; + continue; + } + + // Create the actual registration + let (inner, res) = Inner::new(io, handle); + + unsafe { *self.inner.get() = Some(inner); } + + // Transition out of the locked state. This acquires the + // current value, potentially having a list of tasks that + // are pending readiness notifications. + let actual = self.state.swap(READY, SeqCst); + + // Consume the stack of nodes + + let mut read = false; + let mut write = false; + let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node; + + let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; + + while !ptr.is_null() { + let node = unsafe { Box::from_raw(ptr) }; + let node = *node; + let Node { + direction, + task, + next, + } = node; + + let flag = match direction { + Direction::Read => &mut read, + Direction::Write => &mut write, + }; + + if !*flag { + *flag = true; + + inner.register(direction, task); + } + + ptr = next; + } + + return res.map(|_| true); + } + _ => return Ok(false), + } + } + } + + /// Poll for events on the I/O resource's read readiness stream. + /// + /// If the I/O resource receives a new read readiness event since the last + /// call to `poll_read_ready`, it is returned. If it has not, the current + /// task is notified once a new event is received. + /// + /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, + /// the function will always return `Ready(HUP)`. This should be treated as + /// the end of the readiness stream. + /// + /// Ensure that [`register`] has been called first. + /// + /// # Return value + /// + /// There are several possible return values: + /// + /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received + /// a new readiness event. The readiness value is included. + /// + /// * `Ok(NotReady)` means that no new readiness events have been received + /// since the last call to `poll_read_ready`. + /// + /// * `Err(err)` means that the registration has encountered an error. This + /// error either represents a permanent internal error **or** the fact + /// that [`register`] was not called first. + /// + /// [`register`]: #method.register + /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> { + self.poll_ready(Direction::Read, true, || Task::Futures1(task::current())) + .map(|v| match v { + Some(v) => Async::Ready(v), + _ => Async::NotReady, + }) + } + + /// Like `poll_ready_ready`, but compatible with futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context) + -> futures2::Poll<mio::Ready, io::Error> + { + use futures2::Async as Async2; + self.poll_ready(Direction::Read, true, || Task::Futures2(cx.waker().clone())) + .map(|v| match v { + Some(v) => Async2::Ready(v), + _ => Async2::Pending, + }) + } + + /// Consume any pending read readiness event. + /// + /// This function is identical to [`poll_read_ready`] **except** that it + /// will not notify the current task when a new event is received. As such, + /// it is safe to call this function from outside of a task context. + /// + /// [`poll_read_ready`]: #method.poll_read_ready + pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> { + self.poll_ready(Direction::Read, false, || panic!()) + + } + + /// Poll for events on the I/O resource's write readiness stream. + /// + /// If the I/O resource receives a new write readiness event since the last + /// call to `poll_write_ready`, it is returned. If it has not, the current + /// task is notified once a new event is received. + /// + /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, + /// the function will always return `Ready(HUP)`. This should be treated as + /// the end of the readiness stream. + /// + /// Ensure that [`register`] has been called first. + /// + /// # Return value + /// + /// There are several possible return values: + /// + /// * `Ok(Async::Ready(readiness))` means that the I/O resource has received + /// a new readiness event. The readiness value is included. + /// + /// * `Ok(NotReady)` means that no new readiness events have been received + /// since the last call to `poll_write_ready`. + /// + /// * `Err(err)` means that the registration has encountered an error. This + /// error either represents a permanent internal error **or** the fact + /// that [`register`] was not called first. + /// + /// [`register`]: #method.register + /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { + self.poll_ready(Direction::Write, true, || Task::Futures1(task::current())) + .map(|v| match v { + Some(v) => Async::Ready(v), + _ => Async::NotReady, + }) + } + + /// Like `poll_write_ready`, but compatible with futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context) + -> futures2::Poll<mio::Ready, io::Error> + { + use futures2::Async as Async2; + self.poll_ready(Direction::Write, true, || Task::Futures2(cx.waker().clone())) + .map(|v| match v { + Some(v) => Async2::Ready(v), + _ => Async2::Pending, + }) + } + + /// Consume any pending write readiness event. + /// + /// This function is identical to [`poll_write_ready`] **except** that it + /// will not notify the current task when a new event is received. As such, + /// it is safe to call this function from outside of a task context. + /// + /// [`poll_write_ready`]: #method.poll_write_ready + pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> { + self.poll_ready(Direction::Write, false, || unreachable!()) + } + + fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F) + -> io::Result<Option<mio::Ready>> + where F: Fn() -> Task + { + let mut state = self.state.load(SeqCst); + + // Cache the node pointer + let mut node = None; + + loop { + match state { + INIT => { + return Err(io::Error::new(io::ErrorKind::Other, "must call `register` + before poll_read_ready")); + } + READY => { + let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; + return inner.poll_ready(direction, notify, task); + } + LOCKED => { + if !notify { + // Skip the notification tracking junk. + return Ok(None); + } + + let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node; + + let task = task(); + + // Get the node + let mut n = node.take().unwrap_or_else(|| { + Box::new(Node { + direction, + task: task, + next: ptr::null_mut(), + }) + }); + + n.next = next_ptr; + + let node_ptr = Box::into_raw(n); + let next = node_ptr as usize | (state & LIFECYCLE_MASK); + + let actual = self.state.compare_and_swap(state, next, SeqCst); + + if actual != state { + // Back out of the node boxing + let n = unsafe { Box::from_raw(node_ptr) }; + + // Save this for next loop + node = Some(n); + + state = actual; + continue; + } + + return Ok(None); + } + _ => unreachable!(), + } + } + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +// ===== impl Inner ===== + +impl Inner { + fn new<T>(io: &T, handle: HandlePriv) -> (Self, io::Result<()>) + where T: Evented, + { + let mut res = Ok(()); + + let token = match handle.inner() { + Some(inner) => match inner.add_source(io) { + Ok(token) => token, + Err(e) => { + res = Err(e); + ERROR + } + }, + None => { + res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone")); + ERROR + } + }; + + let inner = Inner { + handle, + token, + }; + + (inner, res) + } + + fn register(&self, direction: Direction, task: Task) { + if self.token == ERROR { + task.notify(); + return; + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => { + task.notify(); + return; + } + }; + + inner.register(self.token, direction, task); + } + + fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> { + if self.token == ERROR { + return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor")); + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + + inner.deregister_source(io) + } + + fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F) + -> io::Result<Option<mio::Ready>> + where F: FnOnce() -> Task + { + if self.token == ERROR { + return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor")); + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + + let mask = direction.mask(); + let mask_no_hup = (mask - ::platform::hup()).as_usize(); + + let io_dispatch = inner.io_dispatch.read().unwrap(); + let sched = &io_dispatch[self.token]; + + // This consumes the current readiness state **except** for HUP. HUP is + // excluded because a) it is a final state and never transitions out of + // HUP and b) both the read AND the write directions need to be able to + // observe this state. + // + // If HUP were to be cleared when `direction` is `Read`, then when + // `poll_ready` is called again with a _`direction` of `Write`, the HUP + // state would not be visible. + let mut ready = mask & mio::Ready::from_usize( + sched.readiness.fetch_and(!mask_no_hup, SeqCst)); + + if ready.is_empty() && notify { + let task = task(); + // Update the task info + match direction { + Direction::Read => sched.reader.register_task(task), + Direction::Write => sched.writer.register_task(task), + } + + // Try again + ready = mask & mio::Ready::from_usize( + sched.readiness.fetch_and(!mask_no_hup, SeqCst)); + } + + if ready.is_empty() { + Ok(None) + } else { + Ok(Some(ready)) + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + if self.token == ERROR { + return; + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return, + }; + + inner.drop_source(self.token); + } +} |