diff options
Diffstat (limited to 'third_party/rust/futures-0.1.29/src/task_impl')
7 files changed, 2322 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/src/task_impl/atomic_task.rs b/third_party/rust/futures-0.1.29/src/task_impl/atomic_task.rs new file mode 100644 index 0000000000..d73954e617 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/atomic_task.rs @@ -0,0 +1,283 @@ +use super::Task; + +use core::fmt; +use core::cell::UnsafeCell; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering::{Acquire, Release, AcqRel}; + +/// A synchronization primitive for task notification. +/// +/// `AtomicTask` will coordinate concurrent notifications with the consumer +/// potentially "updating" the underlying task to notify. This is useful in +/// scenarios where a computation completes in another thread and wants to +/// notify the consumer, but the consumer is in the process of being migrated to +/// a new logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `notify` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `notify` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicTask` may be reused for any number of calls to `register` or +/// `notify`. +/// +/// `AtomicTask` does not provide any memory ordering guarantees, as such the +/// user should use caution and use other synchronization primitives to guard +/// the result of the underlying computation. +pub struct AtomicTask { + state: AtomicUsize, + task: UnsafeCell<Option<Task>>, +} + +// `AtomicTask` is a multi-consumer, single-producer transfer cell. The cell +// stores a `Task` value produced by calls to `register` and many threads can +// race to take the task (to notify it) by calling `notify. +// +// If a new `Task` instance is produced by calling `register` before an existing +// one is consumed, then the existing one is overwritten. +// +// While `AtomicTask` is single-producer, the implementation ensures memory +// safety. In the event of concurrent calls to `register`, there will be a +// single winner whose task will get stored in the cell. The losers will not +// have their tasks notified. As such, callers should ensure to add +// synchronization to calls to `register`. +// +// The implementation uses a single `AtomicUsize` value to coordinate access to +// the `Task` cell. There are two bits that are operated on independently. These +// are represented by `REGISTERING` and `NOTIFYING`. +// +// The `REGISTERING` bit is set when a producer enters the critical section. The +// `NOTIFYING` bit is set when a consumer enters the critical section. Neither +// bit being set is represented by `WAITING`. +// +// A thread obtains an exclusive lock on the task cell by transitioning the +// state from `WAITING` to `REGISTERING` or `NOTIFYING`, depending on the +// operation the thread wishes to perform. When this transition is made, it is +// guaranteed that no other thread will access the task cell. +// +// # Registering +// +// On a call to `register`, an attempt to transition the state from WAITING to +// REGISTERING is made. On success, the caller obtains a lock on the task cell. +// +// If the lock is obtained, then the thread sets the task cell to the task +// provided as an argument. Then it attempts to transition the state back from +// `REGISTERING` -> `WAITING`. +// +// If this transition is successful, then the registering process is complete +// and the next call to `notify` will observe the task. +// +// If the transition fails, then there was a concurrent call to `notify` that +// was unable to access the task cell (due to the registering thread holding the +// lock). To handle this, the registering thread removes the task it just set +// from the cell and calls `notify` on it. This call to notify represents the +// attempt to notify by the other thread (that set the `NOTIFYING` bit). The +// state is then transitioned from `REGISTERING | NOTIFYING` back to `WAITING`. +// This transition must succeed because, at this point, the state cannot be +// transitioned by another thread. +// +// # Notifying +// +// On a call to `notify`, an attempt to transition the state from `WAITING` to +// `NOTIFYING` is made. On success, the caller obtains a lock on the task cell. +// +// If the lock is obtained, then the thread takes ownership of the current value +// in teh task cell, and calls `notify` on it. The state is then transitioned +// back to `WAITING`. This transition must succeed as, at this point, the state +// cannot be transitioned by another thread. +// +// If the thread is unable to obtain the lock, the `NOTIFYING` bit is still. +// This is because it has either been set by the current thread but the previous +// value included the `REGISTERING` bit **or** a concurrent thread is in the +// `NOTIFYING` critical section. Either way, no action must be taken. +// +// If the current thread is the only concurrent call to `notify` and another +// thread is in the `register` critical section, when the other thread **exits** +// the `register` critical section, it will observe the `NOTIFYING` bit and +// handle the notify itself. +// +// If another thread is in the `notify` critical section, then it will handle +// notifying the task. +// +// # A potential race (is safely handled). +// +// Imagine the following situation: +// +// * Thread A obtains the `notify` lock and notifies a task. +// +// * Before thread A releases the `notify` lock, the notified task is scheduled. +// +// * Thread B attempts to notify the task. In theory this should result in the +// task being notified, but it cannot because thread A still holds the notify +// lock. +// +// This case is handled by requiring users of `AtomicTask` to call `register` +// **before** attempting to observe the application state change that resulted +// in the task being notified. The notifiers also change the application state +// before calling notify. +// +// Because of this, the task will do one of two things. +// +// 1) Observe the application state change that Thread B is notifying on. In +// this case, it is OK for Thread B's notification to be lost. +// +// 2) Call register before attempting to observe the application state. Since +// Thread A still holds the `notify` lock, the call to `register` will result +// in the task notifying itself and get scheduled again. + +/// Idle state +const WAITING: usize = 0; + +/// A new task value is being registered with the `AtomicTask` cell. +const REGISTERING: usize = 0b01; + +/// The task currently registered with the `AtomicTask` cell is being notified. +const NOTIFYING: usize = 0b10; + +impl AtomicTask { + /// Create an `AtomicTask` initialized with the given `Task` + pub fn new() -> AtomicTask { + // Make sure that task is Sync + trait AssertSync: Sync {} + impl AssertSync for Task {} + + AtomicTask { + state: AtomicUsize::new(WAITING), + task: UnsafeCell::new(None), + } + } + + /// Registers the current task to be notified on calls to `notify`. + /// + /// This is the same as calling `register_task` with `task::current()`. + pub fn register(&self) { + self.register_task(super::current()); + } + + /// Registers the provided task to be notified on calls to `notify`. + /// + /// The new task will take place of any previous tasks that were registered + /// by previous calls to `register`. Any calls to `notify` that happen after + /// a call to `register` (as defined by the memory ordering rules), will + /// notify the `register` caller's task. + /// + /// It is safe to call `register` with multiple other threads concurrently + /// calling `notify`. This will result in the `register` caller's current + /// task being notified once. + /// + /// This function is safe to call concurrently, but this is generally a bad + /// idea. Concurrent calls to `register` will attempt to register different + /// tasks to be notified. One of the callers will win and have its task set, + /// but there is no guarantee as to which caller will succeed. + pub fn register_task(&self, task: Task) { + match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + WAITING => { + unsafe { + // Locked acquired, update the waker cell + *self.task.get() = Some(task.clone()); + + // Release the lock. If the state transitioned to include + // the `NOTIFYING` bit, this means that a notify has been + // called concurrently, so we have to remove the task and + // notify it.` + // + // Start by assuming that the state is `REGISTERING` as this + // is what we jut set it to. + let res = self.state.compare_exchange( + REGISTERING, WAITING, AcqRel, Acquire); + + match res { + Ok(_) => {} + Err(actual) => { + // This branch can only be reached if a + // concurrent thread called `notify`. In this + // case, `actual` **must** be `REGISTERING | + // `NOTIFYING`. + debug_assert_eq!(actual, REGISTERING | NOTIFYING); + + // Take the task to notify once the atomic operation has + // completed. + let notify = (*self.task.get()).take().unwrap(); + + // Just swap, because no one could change state + // while state == `Registering | `Waking` + self.state.swap(WAITING, AcqRel); + + // The atomic swap was complete, now + // notify the task and return. + notify.notify(); + } + } + } + } + NOTIFYING => { + // Currently in the process of notifying the task, i.e., + // `notify` is currently being called on the old task handle. + // So, we call notify on the new task handle + task.notify(); + } + state => { + // In this case, a concurrent thread is holding the + // "registering" lock. This probably indicates a bug in the + // caller's code as racing to call `register` doesn't make much + // sense. + // + // We just want to maintain memory safety. It is ok to drop the + // call to `register`. + debug_assert!( + state == REGISTERING || + state == REGISTERING | NOTIFYING); + } + } + } + + /// Notifies the task that last called `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn notify(&self) { + // AcqRel ordering is used in order to acquire the value of the `task` + // cell as well as to establish a `release` ordering with whatever + // memory the `AtomicTask` is associated with. + match self.state.fetch_or(NOTIFYING, AcqRel) { + WAITING => { + // The notifying lock has been acquired. + let task = unsafe { (*self.task.get()).take() }; + + // Release the lock + self.state.fetch_and(!NOTIFYING, Release); + + if let Some(task) = task { + task.notify(); + } + } + state => { + // There is a concurrent thread currently updating the + // associated task. + // + // Nothing more to do as the `NOTIFYING` bit has been set. It + // doesn't matter if there are concurrent registering threads or + // not. + // + debug_assert!( + state == REGISTERING || + state == REGISTERING | NOTIFYING || + state == NOTIFYING); + } + } + } +} + +impl Default for AtomicTask { + fn default() -> Self { + AtomicTask::new() + } +} + +impl fmt::Debug for AtomicTask { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "AtomicTask") + } +} + +unsafe impl Send for AtomicTask {} +unsafe impl Sync for AtomicTask {} diff --git a/third_party/rust/futures-0.1.29/src/task_impl/core.rs b/third_party/rust/futures-0.1.29/src/task_impl/core.rs new file mode 100644 index 0000000000..d454116012 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/core.rs @@ -0,0 +1,186 @@ +#![cfg_attr(feature = "use_std", allow(dead_code))] + +use core::marker; +use core::mem; +use core::sync::atomic::AtomicUsize; +#[allow(deprecated)] +use core::sync::atomic::ATOMIC_USIZE_INIT; +use core::sync::atomic::Ordering::{SeqCst, Relaxed}; + +use super::{BorrowedTask, NotifyHandle}; + +pub struct LocalKey; +pub struct LocalMap; +pub fn local_map() -> LocalMap { LocalMap } + +#[derive(Copy, Clone)] +pub struct BorrowedEvents<'a>(marker::PhantomData<&'a ()>); + +#[derive(Copy, Clone)] +pub struct BorrowedUnpark<'a> { + f: &'a Fn() -> NotifyHandle, + id: usize, +} + +pub struct TaskUnpark { + handle: NotifyHandle, + id: usize, +} + +#[derive(Clone)] +pub struct UnparkEvents; + +impl<'a> BorrowedEvents<'a> { + pub fn new() -> BorrowedEvents<'a> { + BorrowedEvents(marker::PhantomData) + } + + pub fn to_owned(&self) -> UnparkEvents { + UnparkEvents + } +} + +impl<'a> BorrowedUnpark<'a> { + #[inline] + pub fn new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a> { + BorrowedUnpark { f: f, id: id } + } + + #[inline] + pub fn to_owned(&self) -> TaskUnpark { + let handle = (self.f)(); + let id = handle.clone_id(self.id); + TaskUnpark { handle: handle, id: id } + } +} + +impl UnparkEvents { + pub fn notify(&self) {} + + pub fn will_notify(&self, _other: &BorrowedEvents) -> bool { + true + } +} + +impl TaskUnpark { + pub fn notify(&self) { + self.handle.notify(self.id); + } + + pub fn will_notify(&self, other: &BorrowedUnpark) -> bool { + self.id == other.id && self.handle.inner == (other.f)().inner + } +} + +impl Clone for TaskUnpark { + fn clone(&self) -> TaskUnpark { + let handle = self.handle.clone(); + let id = handle.clone_id(self.id); + TaskUnpark { handle: handle, id: id } + } +} + +impl Drop for TaskUnpark { + fn drop(&mut self) { + self.handle.drop_id(self.id); + } +} + +#[allow(deprecated)] +static GET: AtomicUsize = ATOMIC_USIZE_INIT; +#[allow(deprecated)] +static SET: AtomicUsize = ATOMIC_USIZE_INIT; + +/// Initialize the `futures` task system. +/// +/// This function is an unsafe low-level implementation detail typically only +/// used by crates using `futures` in `no_std` context. Users of this crate +/// who also use the standard library never need to invoke this function. +/// +/// The task system in the `futures` crate relies on some notion of "local +/// storage" for the running thread and/or context. The `task::current` function +/// can get invoked in any context, for example, and needs to be able to return +/// a `Task`. Typically with the standard library this is supported with +/// thread-local-storage, but this is not available in `no_std` contexts! +/// +/// This function is provided to allow `no_std` contexts to continue to be able +/// to use the standard task system in this crate. The functions provided here +/// will be used as-if they were thread-local-storage getters/setters. The `get` +/// function provided is used to retrieve the current thread-local value of the +/// task system's pointer, returning null if not initialized. The `set` function +/// updates the value of the pointer. +/// +/// # Return value +/// +/// This function will return whether initialization succeeded or not. This +/// function can be called concurrently and only the first invocation will +/// succeed. If `false` is returned then the `get` and `set` pointers provided +/// were *not* registered for use with the task system, but if `true` was +/// provided then they will be called when the task system is used. +/// +/// Note that while safe to call concurrently it's recommended to still perform +/// external synchronization when calling this function. This task system is +/// not guaranteed to be ready to go until a call to this function returns +/// `true`. In other words, if you call this function and see `false`, the +/// task system may not be ready to go as another thread may still be calling +/// `init`. +/// +/// # Unsafety +/// +/// This function is unsafe due to the requirements on the behavior of the +/// `get` and `set` functions. The pointers returned from these functions must +/// reflect the semantics specified above and must also be thread-local, +/// depending on the definition of a "thread" in the calling context. +pub unsafe fn init(get: fn() -> *mut u8, set: fn(*mut u8)) -> bool { + if GET.compare_exchange(0, get as usize, SeqCst, SeqCst).is_ok() { + SET.store(set as usize, SeqCst); + true + } else { + false + } +} + +/// Return whether the caller is running in a task (and so can use task_local!). +pub fn is_in_task() -> bool { + if let Some(ptr) = get_ptr() { + !ptr.is_null() + } else { + false + } +} + +#[inline] +pub fn get_ptr() -> Option<*mut u8> { + match GET.load(Relaxed) { + 0 => None, + n => Some(unsafe { mem::transmute::<usize, fn() -> *mut u8>(n)() }), + } +} + +#[cfg(feature = "use_std")] +#[inline] +pub fn is_get_ptr(f: usize) -> bool { + GET.load(Relaxed) == f +} + +pub fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R + where F: FnOnce() -> R +{ + let set = match SET.load(Relaxed) { + 0 => panic!("not initialized"), + n => unsafe { mem::transmute::<usize, fn(*mut u8)>(n) }, + }; + + struct Reset(fn(*mut u8), *mut u8); + + impl Drop for Reset { + #[inline] + fn drop(&mut self) { + (self.0)(self.1); + } + } + + let _reset = Reset(set, get_ptr().unwrap()); + set(task as *const _ as *mut u8); + f() +} diff --git a/third_party/rust/futures-0.1.29/src/task_impl/mod.rs b/third_party/rust/futures-0.1.29/src/task_impl/mod.rs new file mode 100644 index 0000000000..6f1cf36c0c --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/mod.rs @@ -0,0 +1,733 @@ +use core::fmt; +use core::marker::PhantomData; + +use {Poll, Future, Stream, Sink, StartSend}; + +mod atomic_task; +pub use self::atomic_task::AtomicTask; + +mod core; + +#[cfg(feature = "use_std")] +mod std; +#[cfg(feature = "use_std")] +pub use self::std::*; +#[cfg(not(feature = "use_std"))] +pub use self::core::*; + +pub struct BorrowedTask<'a> { + id: usize, + unpark: BorrowedUnpark<'a>, + events: BorrowedEvents<'a>, + // Task-local storage + map: &'a LocalMap, +} + +fn fresh_task_id() -> usize { + use core::sync::atomic::{AtomicUsize, Ordering}; + #[allow(deprecated)] + use core::sync::atomic::ATOMIC_USIZE_INIT; + + // TODO: this assert is a real bummer, need to figure out how to reuse + // old IDs that are no longer in use. + // + // Note, though, that it is intended that these ids go away entirely + // eventually, see the comment on `is_current` below. + #[allow(deprecated)] + static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; + let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + assert!(id < usize::max_value() / 2, + "too many previous tasks have been allocated"); + id +} + +fn with<F: FnOnce(&BorrowedTask) -> R, R>(f: F) -> R { + unsafe { + let task = get_ptr().expect("no Task is currently running"); + assert!(!task.is_null(), "no Task is currently running"); + f(&*(task as *const BorrowedTask)) + } +} + +/// A handle to a "task", which represents a single lightweight "thread" of +/// execution driving a future to completion. +/// +/// In general, futures are composed into large units of work, which are then +/// spawned as tasks onto an *executor*. The executor is responsible for polling +/// the future as notifications arrive, until the future terminates. +/// +/// This is obtained by the `task::current` function. +/// +/// # FAQ +/// +/// ### Why does `Task` not implement `Eq` and `Hash`? +/// +/// A valid use case for `Task` to implement these two traits has not been +/// encountered. +/// +/// Usually, this question is asked by someone who wants to store a `Task` +/// instance in a `HashSet`. This seems like an obvious way to implement a +/// future aware, multi-handle structure; e.g. a multi-producer channel. +/// +/// In this case, the idea is that whenever a `start_send` is called on one of +/// the channel's send handles, if the channel is at capacity, the current task +/// is stored in a set. Then, when capacity is available, a task is removed from +/// the set and notified. +/// +/// The problem with this strategy is that multiple `Sender` handles can be used +/// on the same task. In this case, when the second handle is used and the task +/// is stored in a set, there already is an entry. Then, when the first +/// handle is dropped, this entry is cleared, resulting in a dead lock. +/// +/// See [here](https://github.com/rust-lang-nursery/futures-rs/issues/670) for +/// more discussion. +/// +#[derive(Clone)] +pub struct Task { + id: usize, + unpark: TaskUnpark, + events: UnparkEvents, +} + +trait AssertSend: Send {} +impl AssertSend for Task {} + +/// Returns a handle to the current task to call `notify` at a later date. +/// +/// The returned handle implements the `Send` and `'static` bounds and may also +/// be cheaply cloned. This is useful for squirreling away the handle into a +/// location which is then later signaled that a future can make progress. +/// +/// Implementations of the `Future` trait typically use this function if they +/// would otherwise perform a blocking operation. When something isn't ready +/// yet, this `current` function is called to acquire a handle to the current +/// task, and then the future arranges it such that when the blocking operation +/// otherwise finishes (perhaps in the background) it will `notify` the +/// returned handle. +/// +/// It's sometimes necessary to pass extra information to the task when +/// unparking it, so that the task knows something about *why* it was woken. +/// See the `FutureQueue` documentation for details on how to do this. +/// +/// # Panics +/// +/// This function will panic if a task is not currently being executed. That +/// is, this method can be dangerous to call outside of an implementation of +/// `poll`. +pub fn current() -> Task { + with(|borrowed| { + let unpark = borrowed.unpark.to_owned(); + let events = borrowed.events.to_owned(); + + Task { + id: borrowed.id, + unpark: unpark, + events: events, + } + }) +} + +#[doc(hidden)] +#[deprecated(note = "renamed to `current`")] +pub fn park() -> Task { + current() +} + +impl Task { + /// Indicate that the task should attempt to poll its future in a timely + /// fashion. + /// + /// It's typically guaranteed that, after calling `notify`, `poll` will + /// be called at least once subsequently (unless the future has terminated). + /// If the task is currently polling its future when `notify` is called, it + /// must poll the future *again* afterwards, ensuring that all relevant + /// events are eventually observed by the future. + pub fn notify(&self) { + self.events.notify(); + self.unpark.notify(); + } + + #[doc(hidden)] + #[deprecated(note = "renamed to `notify`")] + pub fn unpark(&self) { + self.notify() + } + + /// Returns `true` when called from within the context of the task. + /// + /// In other words, the task is currently running on the thread calling the + /// function. Note that this is currently, and has historically, been + /// implemented by tracking an `id` on every instance of `Spawn` created. + /// When a `Spawn` is being polled it stores in thread-local-storage the id + /// of the instance, and then `task::current` will return a `Task` that also + /// stores this id. + /// + /// The intention of this function was to answer questions like "if I + /// `notify` this task, is it equivalent to `task::current().notify()`?" + /// The answer "yes" may be able to avoid some extra work to block the + /// current task, such as sending a task along a channel or updating a + /// stored `Task` somewhere. An answer of "no" typically results in doing + /// the work anyway. + /// + /// Unfortunately this function has been somewhat buggy in the past and is + /// not intended to be supported in the future. By simply matching `id` the + /// intended question above isn't accurately taking into account, for + /// example, unpark events (now deprecated, but still a feature). Thus many + /// old users of this API weren't fully accounting for the question it was + /// intended they were asking. + /// + /// This API continues to be implemented but will in the future, e.g. in the + /// 0.1.x series of this crate, eventually return `false` unconditionally. + /// It is intended that this function will be removed in the next breaking + /// change of this crate. If you'd like to continue to be able to answer the + /// example question above, it's recommended you use the + /// `will_notify_current` method. + /// + /// If you've got questions about this though please let us know! We'd like + /// to learn about other use cases here that we did not consider. + /// + /// # Panics + /// + /// This function will panic if no current future is being polled. + #[deprecated(note = "intended to be removed, see docs for details")] + pub fn is_current(&self) -> bool { + with(|current| current.id == self.id) + } + + /// This function is intended as a performance optimization for structures + /// which store a `Task` internally. + /// + /// The purpose of this function is to answer the question "if I `notify` + /// this task is it equivalent to `task::current().notify()`". An answer + /// "yes" may mean that you don't actually need to call `task::current()` + /// and store it, but rather you can simply leave a stored task in place. An + /// answer of "no" typically means that you need to call `task::current()` + /// and store it somewhere. + /// + /// As this is purely a performance optimization a valid implementation for + /// this function is to always return `false`. A best effort is done to + /// return `true` where possible, but false negatives may happen. Note that + /// this function will not return a false positive, however. + /// + /// # Panics + /// + /// This function will panic if no current future is being polled. + #[allow(deprecated)] + pub fn will_notify_current(&self) -> bool { + with(|current| { + self.unpark.will_notify(¤t.unpark) && + self.events.will_notify(¤t.events) + }) + } +} + +impl fmt::Debug for Task { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Task") + .finish() + } +} + +/// Representation of a spawned future/stream. +/// +/// This object is returned by the `spawn` function in this module. This +/// represents a "fused task and future", storing all necessary pieces of a task +/// and owning the top-level future that's being driven as well. +/// +/// A `Spawn` can be poll'd for completion or execution of the current thread +/// can be blocked indefinitely until a notification arrives. This can be used +/// with either futures or streams, with different methods being available on +/// `Spawn` depending which is used. +pub struct Spawn<T: ?Sized> { + id: usize, + data: LocalMap, + obj: T, +} + +/// Spawns a future or stream, returning it and the new task responsible for +/// running it to completion. +/// +/// This function is the termination endpoint for running futures. This method +/// will conceptually allocate a new task to run the given object, which is +/// normally either a `Future` or `Stream`. +/// +/// This function is similar to the `thread::spawn` function but does not +/// attempt to run code in the background. The future will not make progress +/// until the methods on `Spawn` are called in turn. +pub fn spawn<T>(obj: T) -> Spawn<T> { + Spawn { + id: fresh_task_id(), + obj: obj, + data: local_map(), + } +} + +impl<T: ?Sized> Spawn<T> { + /// Get a shared reference to the object the Spawn is wrapping. + pub fn get_ref(&self) -> &T { + &self.obj + } + + /// Get a mutable reference to the object the Spawn is wrapping. + pub fn get_mut(&mut self) -> &mut T { + &mut self.obj + } + + /// Consume the Spawn, returning its inner object + pub fn into_inner(self) -> T where T: Sized { + self.obj + } + + /// Calls the provided closure, scheduling notifications to be sent to the + /// `notify` argument. + pub fn poll_fn_notify<N, F, R>(&mut self, + notify: &N, + id: usize, + f: F) -> R + where F: FnOnce(&mut T) -> R, + N: Clone + Into<NotifyHandle>, + { + let mk = || notify.clone().into(); + self.enter(BorrowedUnpark::new(&mk, id), f) + } + + /// Polls the internal future, scheduling notifications to be sent to the + /// `notify` argument. + /// + /// This method will poll the internal future, testing if it's completed + /// yet. The `notify` argument is used as a sink for notifications sent to + /// this future. That is, while the future is being polled, any call to + /// `task::current()` will return a handle that contains the `notify` + /// specified. + /// + /// If this function returns `NotReady`, then the `notify` should have been + /// scheduled to receive a notification when poll can be called again. + /// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be + /// safely destroyed. + /// + /// Note that `notify` itself is passed as a shared reference, and is itself + /// not required to be a `NotifyHandle`. The `Clone` and `Into` trait bounds + /// will be used to convert this `notify` to a `NotifyHandle` if necessary. + /// This construction can avoid an unnecessary atomic reference count bump + /// in some situations. + /// + /// ## Unsafety and `id` + /// + /// This function and all other `*_notify` functions on this type will treat + /// the `id` specified very carefully, explicitly calling functions like the + /// `notify` argument's `clone_id` and `drop_id` functions. It should be + /// safe to encode a pointer itself into the `id` specified, such as an + /// `Arc<N>` or a `Box<N>`. The `clone_id` and `drop_id` functions are then + /// intended to be sufficient for the memory management related to that + /// pointer. + pub fn poll_future_notify<N>(&mut self, + notify: &N, + id: usize) -> Poll<T::Item, T::Error> + where N: Clone + Into<NotifyHandle>, + T: Future, + { + self.poll_fn_notify(notify, id, |f| f.poll()) + } + + /// Like `poll_future_notify`, except polls the underlying stream. + pub fn poll_stream_notify<N>(&mut self, + notify: &N, + id: usize) + -> Poll<Option<T::Item>, T::Error> + where N: Clone + Into<NotifyHandle>, + T: Stream, + { + self.poll_fn_notify(notify, id, |s| s.poll()) + } + + /// Invokes the underlying `start_send` method with this task in place. + /// + /// If the underlying operation returns `NotReady` then the `notify` value + /// passed in will receive a notification when the operation is ready to be + /// attempted again. + pub fn start_send_notify<N>(&mut self, + value: T::SinkItem, + notify: &N, + id: usize) + -> StartSend<T::SinkItem, T::SinkError> + where N: Clone + Into<NotifyHandle>, + T: Sink, + { + self.poll_fn_notify(notify, id, |s| s.start_send(value)) + } + + /// Invokes the underlying `poll_complete` method with this task in place. + /// + /// If the underlying operation returns `NotReady` then the `notify` value + /// passed in will receive a notification when the operation is ready to be + /// attempted again. + pub fn poll_flush_notify<N>(&mut self, + notify: &N, + id: usize) + -> Poll<(), T::SinkError> + where N: Clone + Into<NotifyHandle>, + T: Sink, + { + self.poll_fn_notify(notify, id, |s| s.poll_complete()) + } + + /// Invokes the underlying `close` method with this task in place. + /// + /// If the underlying operation returns `NotReady` then the `notify` value + /// passed in will receive a notification when the operation is ready to be + /// attempted again. + pub fn close_notify<N>(&mut self, + notify: &N, + id: usize) + -> Poll<(), T::SinkError> + where N: Clone + Into<NotifyHandle>, + T: Sink, + { + self.poll_fn_notify(notify, id, |s| s.close()) + } + + fn enter<F, R>(&mut self, unpark: BorrowedUnpark, f: F) -> R + where F: FnOnce(&mut T) -> R + { + let borrowed = BorrowedTask { + id: self.id, + unpark: unpark, + events: BorrowedEvents::new(), + map: &self.data, + }; + let obj = &mut self.obj; + set(&borrowed, || f(obj)) + } +} + +impl<T: fmt::Debug + ?Sized> fmt::Debug for Spawn<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Spawn") + .field("obj", &&self.obj) + .finish() + } +} + +/// A trait which represents a sink of notifications that a future is ready to +/// make progress. +/// +/// This trait is provided as an argument to the `Spawn::*_notify` family of +/// functions. It's transitively used as part of the `Task::notify` method to +/// internally deliver notifications of readiness of a future to move forward. +/// +/// An instance of `Notify` has one primary method, `notify`, which is given a +/// contextual argument as to what's being notified. This contextual argument is +/// *also* provided to the `Spawn::*_notify` family of functions and can be used +/// to reuse an instance of `Notify` across many futures. +/// +/// Instances of `Notify` must be safe to share across threads, and the methods +/// be invoked concurrently. They must also live for the `'static` lifetime, +/// not containing any stack references. +pub trait Notify: Send + Sync { + /// Indicates that an associated future and/or task are ready to make + /// progress. + /// + /// Typically this means that the receiver of the notification should + /// arrange for the future to get poll'd in a prompt fashion. + /// + /// This method takes an `id` as an argument which was transitively passed + /// in from the original call to `Spawn::*_notify`. This id can be used to + /// disambiguate which precise future became ready for polling. + /// + /// # Panics + /// + /// Since `unpark` may be invoked from arbitrary contexts, it should + /// endeavor not to panic and to do as little work as possible. However, it + /// is not guaranteed not to panic, and callers should be wary. If a panic + /// occurs, that panic may or may not be propagated to the end-user of the + /// future that you'd otherwise wake up. + fn notify(&self, id: usize); + + /// This function is called whenever a new copy of `id` is needed. + /// + /// This is called in one of two situations: + /// + /// * A `Task` is being created through `task::current` while a future is + /// being polled. In that case the instance of `Notify` passed in to one + /// of the `poll_*` functions is called with the `id` passed into the same + /// `poll_*` function. + /// * A `Task` is itself being cloned. Each `Task` contains its own id and a + /// handle to the `Notify` behind it, and the task's `Notify` is used to + /// clone the internal `id` to assign to the new task. + /// + /// The `id` returned here will be stored in the `Task`-to-be and used later + /// to pass to `notify` when the `Task::notify` function is called on that + /// `Task`. + /// + /// Note that typically this is just the identity function, passing through + /// the identifier. For more unsafe situations, however, if `id` is itself a + /// pointer of some kind this can be used as a hook to "clone" the pointer, + /// depending on what that means for the specified pointer. + fn clone_id(&self, id: usize) -> usize { + id + } + + /// All instances of `Task` store an `id` that they're going to internally + /// notify with, and this function is called when the `Task` is dropped. + /// + /// This function provides a hook for schemes which encode pointers in this + /// `id` argument to deallocate resources associated with the pointer. It's + /// guaranteed that after this function is called the `Task` containing this + /// `id` will no longer use the `id`. + fn drop_id(&self, id: usize) { + drop(id); + } +} + +/// Sets the `NotifyHandle` of the current task for the duration of the provided +/// closure. +/// +/// This function takes a type that can be converted into a notify handle, +/// `notify` and `id`, and a closure `f`. The closure `f` will be executed such +/// that calls to `task::current()` will store a reference to the notify handle +/// provided, not the one previously in the environment. +/// +/// Note that calls to `task::current()` in the closure provided *will not* be +/// equivalent to `task::current()` before this method is called. The two tasks +/// returned will notify different handles, and the task handles pulled out +/// during the duration of this closure will not notify the previous task. It's +/// recommended that you call `task::current()` in some capacity before calling +/// this function to ensure that calls to `task::current()` inside of this +/// closure can transitively wake up the outer task. +/// +/// # Panics +/// +/// This function will panic if it is called outside the context of a future's +/// task. This is only valid to call once you've already entered a future via +/// `Spawn::poll_*` functions. +pub fn with_notify<F, T, R>(notify: &T, id: usize, f: F) -> R + where F: FnOnce() -> R, + T: Clone + Into<NotifyHandle>, +{ + with(|task| { + let mk = || notify.clone().into(); + let new_task = BorrowedTask { + id: task.id, + unpark: BorrowedUnpark::new(&mk, id), + events: task.events, + map: task.map, + }; + + set(&new_task, f) + }) +} + +/// An unsafe trait for implementing custom forms of memory management behind a +/// `Task`. +/// +/// The `futures` critically relies on "notification handles" to extract for +/// futures to contain and then later inform that they're ready to make +/// progress. These handles, however, must be cheap to create and cheap +/// to clone to ensure that this operation is efficient throughout the +/// execution of a program. +/// +/// Typically this sort of memory management is done in the standard library +/// with the `Arc` type. An `Arc` is relatively cheap to allocate an is +/// quite cheap to clone and pass around. Plus, it's 100% safe! +/// +/// When working outside the standard library, however, you don't always have +/// and `Arc` type available to you. This trait, `UnsafeNotify`, is intended +/// to be the "unsafe version" of the `Notify` trait. This trait encodes the +/// memory management operations of a `Task`'s notification handle, allowing +/// custom implementations for the memory management of a notification handle. +/// +/// Put another way, the core notification type in this library, +/// `NotifyHandle`, simply internally contains an instance of +/// `*mut UnsafeNotify`. This "unsafe trait object" is then used exclusively +/// to operate with, dynamically dispatching calls to clone, drop, and notify. +/// Critically though as a raw pointer it doesn't require a particular form +/// of memory management, allowing external implementations. +/// +/// A default implementation of the `UnsafeNotify` trait is provided for the +/// `Arc` type in the standard library. If the `use_std` feature of this crate +/// is not available however, you'll be required to implement your own +/// instance of this trait to pass it into `NotifyHandle::new`. +/// +/// # Unsafety +/// +/// This trait is manually encoding the memory management of the underlying +/// handle, and as a result is quite unsafe to implement! Implementors of +/// this trait must guarantee: +/// +/// * Calls to `clone_raw` produce uniquely owned handles. It should be safe +/// to drop the current handle and have the returned handle still be valid. +/// * Calls to `drop_raw` work with `self` as a raw pointer, deallocating +/// resources associated with it. This is a pretty unsafe operation as it's +/// invalidating the `self` pointer, so extreme care needs to be taken. +/// +/// In general it's recommended to review the trait documentation as well as +/// the implementation for `Arc` in this crate. When in doubt ping the +/// `futures` authors to clarify an unsafety question here. +pub unsafe trait UnsafeNotify: Notify { + /// Creates a new `NotifyHandle` from this instance of `UnsafeNotify`. + /// + /// This function will create a new uniquely owned handle that under the + /// hood references the same notification instance. In other words calls + /// to `notify` on the returned handle should be equivalent to calls to + /// `notify` on this handle. + /// + /// # Unsafety + /// + /// This trait is unsafe to implement, as are all these methods. This + /// method is also unsafe to call as it's asserting the `UnsafeNotify` + /// value is in a consistent state. In general it's recommended to + /// review the trait documentation as well as the implementation for `Arc` + /// in this crate. When in doubt ping the `futures` authors to clarify + /// an unsafety question here. + unsafe fn clone_raw(&self) -> NotifyHandle; + + /// Drops this instance of `UnsafeNotify`, deallocating resources + /// associated with it. + /// + /// This method is intended to have a signature such as: + /// + /// ```ignore + /// fn drop_raw(self: *mut Self); + /// ``` + /// + /// Unfortunately in Rust today that signature is not object safe. + /// Nevertheless it's recommended to implement this function *as if* that + /// were its signature. As such it is not safe to call on an invalid + /// pointer, nor is the validity of the pointer guaranteed after this + /// function returns. + /// + /// # Unsafety + /// + /// This trait is unsafe to implement, as are all these methods. This + /// method is also unsafe to call as it's asserting the `UnsafeNotify` + /// value is in a consistent state. In general it's recommended to + /// review the trait documentation as well as the implementation for `Arc` + /// in this crate. When in doubt ping the `futures` authors to clarify + /// an unsafety question here. + unsafe fn drop_raw(&self); +} + +/// A `NotifyHandle` is the core value through which notifications are routed +/// in the `futures` crate. +/// +/// All instances of `Task` will contain a `NotifyHandle` handle internally. +/// This handle itself contains a trait object pointing to an instance of the +/// `Notify` trait, allowing notifications to get routed through it. +/// +/// The `NotifyHandle` type internally does not codify any particular memory +/// management strategy. Internally it contains an instance of `*mut +/// UnsafeNotify`, and more details about that trait can be found on its own +/// documentation. Consequently, though, the one constructor of this type, +/// `NotifyHandle::new`, is `unsafe` to call. It is not recommended to call +/// this constructor directly. +/// +/// If you're working with the standard library then it's recommended to +/// work with the `Arc` type. If you have a struct, `T`, which implements the +/// `Notify` trait, then you can construct this with +/// `NotifyHandle::from(t: Arc<T>)`. The coercion to `UnsafeNotify` will +/// happen automatically and safely for you. +/// +/// When working externally from the standard library it's recommended to +/// provide a similar safe constructor for your custom type as opposed to +/// recommending an invocation of `NotifyHandle::new` directly. +pub struct NotifyHandle { + inner: *mut UnsafeNotify, +} + +unsafe impl Send for NotifyHandle {} +unsafe impl Sync for NotifyHandle {} + +impl NotifyHandle { + /// Constructs a new `NotifyHandle` directly. + /// + /// Note that most code will not need to call this. Implementers of the + /// `UnsafeNotify` trait will typically provide a wrapper that calls this + /// but you otherwise shouldn't call it directly. + /// + /// If you're working with the standard library then it's recommended to + /// use the `NotifyHandle::from` function instead which works with the safe + /// `Arc` type and the safe `Notify` trait. + #[inline] + pub unsafe fn new(inner: *mut UnsafeNotify) -> NotifyHandle { + NotifyHandle { inner: inner } + } + + /// Invokes the underlying instance of `Notify` with the provided `id`. + pub fn notify(&self, id: usize) { + unsafe { (*self.inner).notify(id) } + } + + fn clone_id(&self, id: usize) -> usize { + unsafe { (*self.inner).clone_id(id) } + } + + fn drop_id(&self, id: usize) { + unsafe { (*self.inner).drop_id(id) } + } +} + +impl Clone for NotifyHandle { + #[inline] + fn clone(&self) -> Self { + unsafe { + (*self.inner).clone_raw() + } + } +} + +impl fmt::Debug for NotifyHandle { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("NotifyHandle") + .finish() + } +} + +impl Drop for NotifyHandle { + fn drop(&mut self) { + unsafe { + (*self.inner).drop_raw() + } + } +} + +/// Marker for a `T` that is behind &'static. +struct StaticRef<T>(PhantomData<T>); + +impl<T: Notify> Notify for StaticRef<T> { + fn notify(&self, id: usize) { + let me = unsafe { &*(self as *const _ as *const T) }; + me.notify(id); + } + + fn clone_id(&self, id: usize) -> usize { + let me = unsafe { &*(self as *const _ as *const T) }; + me.clone_id(id) + } + + fn drop_id(&self, id: usize) { + let me = unsafe { &*(self as *const _ as *const T) }; + me.drop_id(id); + } +} + +unsafe impl<T: Notify + 'static> UnsafeNotify for StaticRef<T> { + unsafe fn clone_raw(&self) -> NotifyHandle { + NotifyHandle::new(self as *const _ as *mut StaticRef<T>) + } + + unsafe fn drop_raw(&self) {} +} + +impl<T: Notify> From<&'static T> for NotifyHandle { + fn from(src : &'static T) -> NotifyHandle { + unsafe { NotifyHandle::new(src as *const _ as *mut StaticRef<T>) } + } +} + +#[cfg(feature = "nightly")] +mod nightly { + use super::NotifyHandle; + use core::marker::Unpin; + + impl Unpin for NotifyHandle {} +} diff --git a/third_party/rust/futures-0.1.29/src/task_impl/std/data.rs b/third_party/rust/futures-0.1.29/src/task_impl/std/data.rs new file mode 100644 index 0000000000..770912b219 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/std/data.rs @@ -0,0 +1,131 @@ +use std::prelude::v1::*; + +use std::any::TypeId; +use std::cell::RefCell; +use std::hash::{BuildHasherDefault, Hasher}; +use std::collections::HashMap; + +use task_impl::with; + +/// A macro to create a `static` of type `LocalKey` +/// +/// This macro is intentionally similar to the `thread_local!`, and creates a +/// `static` which has a `with` method to access the data on a task. +/// +/// The data associated with each task local is per-task, so different tasks +/// will contain different values. +#[macro_export] +macro_rules! task_local { + (static $NAME:ident: $t:ty = $e:expr) => ( + static $NAME: $crate::task::LocalKey<$t> = { + fn __init() -> $t { $e } + fn __key() -> ::std::any::TypeId { + struct __A; + ::std::any::TypeId::of::<__A>() + } + $crate::task::LocalKey { + __init: __init, + __key: __key, + } + }; + ) +} + +pub type LocalMap = RefCell<HashMap<TypeId, + Box<Opaque>, + BuildHasherDefault<IdHasher>>>; + +pub fn local_map() -> LocalMap { + RefCell::new(HashMap::default()) +} + +pub trait Opaque: Send {} +impl<T: Send> Opaque for T {} + +/// A key for task-local data stored in a future's task. +/// +/// This type is generated by the `task_local!` macro and performs very +/// similarly to the `thread_local!` macro and `std::thread::LocalKey` types. +/// Data associated with a `LocalKey<T>` is stored inside of a future's task, +/// and the data is destroyed when the future is completed and the task is +/// destroyed. +/// +/// Task-local data can migrate between threads and hence requires a `Send` +/// bound. Additionally, task-local data also requires the `'static` bound to +/// ensure it lives long enough. When a key is accessed for the first time the +/// task's data is initialized with the provided initialization expression to +/// the macro. +#[derive(Debug)] +pub struct LocalKey<T> { + // "private" fields which have to be public to get around macro hygiene, not + // included in the stability story for this type. Can change at any time. + #[doc(hidden)] + pub __key: fn() -> TypeId, + #[doc(hidden)] + pub __init: fn() -> T, +} + +pub struct IdHasher { + id: u64, +} + +impl Default for IdHasher { + fn default() -> IdHasher { + IdHasher { id: 0 } + } +} + +impl Hasher for IdHasher { + fn write(&mut self, _bytes: &[u8]) { + // TODO: need to do something sensible + panic!("can only hash u64"); + } + + fn write_u64(&mut self, u: u64) { + self.id = u; + } + + fn finish(&self) -> u64 { + self.id + } +} + +impl<T: Send + 'static> LocalKey<T> { + /// Access this task-local key, running the provided closure with a + /// reference to the value. + /// + /// This function will access this task-local key to retrieve the data + /// associated with the current task and this key. If this is the first time + /// this key has been accessed on this task, then the key will be + /// initialized with the initialization expression provided at the time the + /// `task_local!` macro was called. + /// + /// The provided closure will be provided a shared reference to the + /// underlying data associated with this task-local-key. The data itself is + /// stored inside of the current task. + /// + /// # Panics + /// + /// This function can possibly panic for a number of reasons: + /// + /// * If there is not a current task. + /// * If the initialization expression is run and it panics + /// * If the closure provided panics + pub fn with<F, R>(&'static self, f: F) -> R + where F: FnOnce(&T) -> R + { + let key = (self.__key)(); + with(|task| { + let raw_pointer = { + let mut data = task.map.borrow_mut(); + let entry = data.entry(key).or_insert_with(|| { + Box::new((self.__init)()) + }); + &**entry as *const Opaque as *const T + }; + unsafe { + f(&*raw_pointer) + } + }) + } +} diff --git a/third_party/rust/futures-0.1.29/src/task_impl/std/mod.rs b/third_party/rust/futures-0.1.29/src/task_impl/std/mod.rs new file mode 100644 index 0000000000..eca750f7d8 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/std/mod.rs @@ -0,0 +1,716 @@ +use std::prelude::v1::*; + +use std::cell::Cell; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; +use super::core; +use super::{BorrowedTask, NotifyHandle, Spawn, spawn, Notify, UnsafeNotify}; + +mod unpark_mutex; +pub use self::unpark_mutex::UnparkMutex; + +mod data; +pub use self::data::*; + +mod task_rc; +#[allow(deprecated)] +#[cfg(feature = "with-deprecated")] +pub use self::task_rc::TaskRc; + +pub use task_impl::core::init; + +thread_local!(static CURRENT_TASK: Cell<*mut u8> = Cell::new(ptr::null_mut())); + +/// Return whether the caller is running in a task (and so can use task_local!). +pub fn is_in_task() -> bool { + CURRENT_TASK.with(|task| !task.get().is_null()) +} + +static INIT: Once = ONCE_INIT; + +pub fn get_ptr() -> Option<*mut u8> { + // Since this condition will always return true when TLS task storage is + // used (the default), the branch predictor will be able to optimize the + // branching and a dynamic dispatch will be avoided, which makes the + // compiler happier. + if core::is_get_ptr(0x1) { + Some(CURRENT_TASK.with(|c| c.get())) + } else { + core::get_ptr() + } +} + +fn tls_slot() -> *const Cell<*mut u8> { + CURRENT_TASK.with(|c| c as *const _) +} + +pub fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R + where F: FnOnce() -> R +{ + // Lazily initialize the get / set ptrs + // + // Note that we won't actually use these functions ever, we'll instead be + // testing the pointer's value elsewhere and calling our own functions. + INIT.call_once(|| unsafe { + let get = mem::transmute::<usize, _>(0x1); + let set = mem::transmute::<usize, _>(0x2); + init(get, set); + }); + + // Same as above. + if core::is_get_ptr(0x1) { + struct Reset(*const Cell<*mut u8>, *mut u8); + + impl Drop for Reset { + #[inline] + fn drop(&mut self) { + unsafe { + (*self.0).set(self.1); + } + } + } + + unsafe { + let slot = tls_slot(); + let _reset = Reset(slot, (*slot).get()); + (*slot).set(task as *const _ as *mut u8); + f() + } + } else { + core::set(task, f) + } +} + +#[derive(Copy, Clone)] +#[allow(deprecated)] +pub enum BorrowedUnpark<'a> { + Old(&'a Arc<Unpark>), + New(core::BorrowedUnpark<'a>), +} + +#[derive(Copy, Clone)] +#[allow(deprecated)] +pub enum BorrowedEvents<'a> { + None, + One(&'a UnparkEvent, &'a BorrowedEvents<'a>), +} + +#[derive(Clone)] +pub enum TaskUnpark { + #[allow(deprecated)] + Old(Arc<Unpark>), + New(core::TaskUnpark), +} + +#[derive(Clone)] +#[allow(deprecated)] +pub enum UnparkEvents { + None, + One(UnparkEvent), + Many(Box<[UnparkEvent]>), +} + +impl<'a> BorrowedUnpark<'a> { + #[inline] + pub fn new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a> { + BorrowedUnpark::New(core::BorrowedUnpark::new(f, id)) + } + + #[inline] + pub fn to_owned(&self) -> TaskUnpark { + match *self { + BorrowedUnpark::Old(old) => TaskUnpark::Old(old.clone()), + BorrowedUnpark::New(new) => TaskUnpark::New(new.to_owned()), + } + } +} + +impl<'a> BorrowedEvents<'a> { + #[inline] + pub fn new() -> BorrowedEvents<'a> { + BorrowedEvents::None + } + + #[inline] + pub fn to_owned(&self) -> UnparkEvents { + let mut one_event = None; + let mut list = Vec::new(); + let mut cur = self; + while let BorrowedEvents::One(event, next) = *cur { + let event = event.clone(); + match one_event.take() { + None if list.len() == 0 => one_event = Some(event), + None => list.push(event), + Some(event2) => { + list.push(event2); + list.push(event); + } + } + cur = next; + } + + match one_event { + None if list.len() == 0 => UnparkEvents::None, + None => UnparkEvents::Many(list.into_boxed_slice()), + Some(e) => UnparkEvents::One(e), + } + } +} + +impl UnparkEvents { + pub fn notify(&self) { + match *self { + UnparkEvents::None => {} + UnparkEvents::One(ref e) => e.unpark(), + UnparkEvents::Many(ref list) => { + for event in list.iter() { + event.unpark(); + } + } + } + } + + pub fn will_notify(&self, events: &BorrowedEvents) -> bool { + // Pessimistically assume that any unpark events mean that we're not + // equivalent to the current task. + match *self { + UnparkEvents::None => {} + _ => return false, + } + + match *events { + BorrowedEvents::None => return true, + _ => {}, + } + + return false + } +} + +#[allow(deprecated)] +impl TaskUnpark { + pub fn notify(&self) { + match *self { + TaskUnpark::Old(ref old) => old.unpark(), + TaskUnpark::New(ref new) => new.notify(), + } + } + + pub fn will_notify(&self, unpark: &BorrowedUnpark) -> bool { + match (unpark, self) { + (&BorrowedUnpark::Old(old1), &TaskUnpark::Old(ref old2)) => { + &**old1 as *const Unpark == &**old2 as *const Unpark + } + (&BorrowedUnpark::New(ref new1), &TaskUnpark::New(ref new2)) => { + new2.will_notify(new1) + } + _ => false, + } + } +} + +impl<F: Future> Spawn<F> { + #[doc(hidden)] + #[deprecated(note = "recommended to use `poll_future_notify` instead")] + #[allow(deprecated)] + pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> { + self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll()) + } + + /// Waits for the internal future to complete, blocking this thread's + /// execution until it does. + /// + /// This function will call `poll_future` in a loop, waiting for the future + /// to complete. When a future cannot make progress it will use + /// `thread::park` to block the current thread. + pub fn wait_future(&mut self) -> Result<F::Item, F::Error> { + ThreadNotify::with_current(|notify| { + + loop { + match self.poll_future_notify(notify, 0)? { + Async::NotReady => notify.park(), + Async::Ready(e) => return Ok(e), + } + } + }) + } + + + #[doc(hidden)] + #[deprecated] + #[allow(deprecated)] + pub fn execute(self, exec: Arc<Executor>) + where F: Future<Item=(), Error=()> + Send + 'static, + { + exec.clone().execute(Run { + // Ideally this method would be defined directly on + // `Spawn<BoxFuture<(), ()>>` so we wouldn't have to box here and + // it'd be more explicit, but unfortunately that currently has a + // link error on nightly: rust-lang/rust#36155 + spawn: spawn(Box::new(self.into_inner())), + inner: Arc::new(RunInner { + exec: exec, + mutex: UnparkMutex::new() + }), + }) + } +} + +impl<S: Stream> Spawn<S> { + #[deprecated(note = "recommended to use `poll_stream_notify` instead")] + #[allow(deprecated)] + #[doc(hidden)] + pub fn poll_stream(&mut self, unpark: Arc<Unpark>) + -> Poll<Option<S::Item>, S::Error> { + self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll()) + } + + /// Like `wait_future`, except only waits for the next element to arrive on + /// the underlying stream. + pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> { + ThreadNotify::with_current(|notify| { + + loop { + match self.poll_stream_notify(notify, 0) { + Ok(Async::NotReady) => notify.park(), + Ok(Async::Ready(Some(e))) => return Some(Ok(e)), + Ok(Async::Ready(None)) => return None, + Err(e) => return Some(Err(e)), + } + } + }) + } +} + +impl<S: Sink> Spawn<S> { + #[doc(hidden)] + #[deprecated(note = "recommended to use `start_send_notify` instead")] + #[allow(deprecated)] + pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>) + -> StartSend<S::SinkItem, S::SinkError> { + self.enter(BorrowedUnpark::Old(unpark), |s| s.start_send(value)) + } + + #[deprecated(note = "recommended to use `poll_flush_notify` instead")] + #[allow(deprecated)] + #[doc(hidden)] + pub fn poll_flush(&mut self, unpark: &Arc<Unpark>) + -> Poll<(), S::SinkError> { + self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete()) + } + + /// Blocks the current thread until it's able to send `value` on this sink. + /// + /// This function will send the `value` on the sink that this task wraps. If + /// the sink is not ready to send the value yet then the current thread will + /// be blocked until it's able to send the value. + pub fn wait_send(&mut self, mut value: S::SinkItem) + -> Result<(), S::SinkError> { + ThreadNotify::with_current(|notify| { + + loop { + value = match self.start_send_notify(value, notify, 0)? { + AsyncSink::NotReady(v) => v, + AsyncSink::Ready => return Ok(()), + }; + notify.park(); + } + }) + } + + /// Blocks the current thread until it's able to flush this sink. + /// + /// This function will call the underlying sink's `poll_complete` method + /// until it returns that it's ready, proxying out errors upwards to the + /// caller if one occurs. + /// + /// The thread will be blocked until `poll_complete` returns that it's + /// ready. + pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { + ThreadNotify::with_current(|notify| { + + loop { + if self.poll_flush_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); + } + }) + } + + /// Blocks the current thread until it's able to close this sink. + /// + /// This function will close the sink that this task wraps. If the sink + /// is not ready to be close yet, then the current thread will be blocked + /// until it's closed. + pub fn wait_close(&mut self) -> Result<(), S::SinkError> { + ThreadNotify::with_current(|notify| { + + loop { + if self.close_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); + } + }) + } +} + +/// A trait which represents a sink of notifications that a future is ready to +/// make progress. +/// +/// This trait is provided as an argument to the `Spawn::poll_future` and +/// `Spawn::poll_stream` functions. It's transitively used as part of the +/// `Task::unpark` method to internally deliver notifications of readiness of a +/// future to move forward. +#[deprecated(note = "recommended to use `Notify` instead")] +pub trait Unpark: Send + Sync { + /// Indicates that an associated future and/or task are ready to make + /// progress. + /// + /// Typically this means that the receiver of the notification should + /// arrange for the future to get poll'd in a prompt fashion. + fn unpark(&self); +} + +/// A trait representing requests to poll futures. +/// +/// This trait is an argument to the `Spawn::execute` which is used to run a +/// future to completion. An executor will receive requests to run a future and +/// an executor is responsible for ensuring that happens in a timely fashion. +/// +/// Note that this trait is likely to be deprecated and/or renamed to avoid +/// clashing with the `future::Executor` trait. If you've got a use case for +/// this or would like to comment on the name please let us know! +#[deprecated] +#[allow(deprecated)] +pub trait Executor: Send + Sync + 'static { + /// Requests that `Run` is executed soon on the given executor. + fn execute(&self, r: Run); +} + +/// Units of work submitted to an `Executor`, currently only created +/// internally. +#[deprecated] +pub struct Run { + spawn: Spawn<Box<Future<Item = (), Error = ()> + Send>>, + inner: Arc<RunInner>, +} + +#[allow(deprecated)] +struct RunInner { + mutex: UnparkMutex<Run>, + exec: Arc<Executor>, +} + +#[allow(deprecated)] +impl Run { + /// Actually run the task (invoking `poll` on its future) on the current + /// thread. + pub fn run(self) { + let Run { mut spawn, inner } = self; + + // SAFETY: the ownership of this `Run` object is evidence that + // we are in the `POLLING`/`REPOLL` state for the mutex. + unsafe { + inner.mutex.start_poll(); + + loop { + match spawn.poll_future_notify(&inner, 0) { + Ok(Async::NotReady) => {} + Ok(Async::Ready(())) | + Err(()) => return inner.mutex.complete(), + } + let run = Run { spawn: spawn, inner: inner.clone() }; + match inner.mutex.wait(run) { + Ok(()) => return, // we've waited + Err(r) => spawn = r.spawn, // someone's notified us + } + } + } + } +} + +#[allow(deprecated)] +impl fmt::Debug for Run { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Run") + .field("contents", &"...") + .finish() + } +} + +#[allow(deprecated)] +impl Notify for RunInner { + fn notify(&self, _id: usize) { + match self.mutex.notify() { + Ok(run) => self.exec.execute(run), + Err(()) => {} + } + } +} + +// ===== ThreadNotify ===== + +struct ThreadNotify { + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +const IDLE: usize = 0; +const NOTIFY: usize = 1; +const SLEEP: usize = 2; + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify { + state: AtomicUsize::new(IDLE), + mutex: Mutex::new(()), + condvar: Condvar::new(), + }); +} + +impl ThreadNotify { + fn with_current<F, R>(f: F) -> R + where F: FnOnce(&Arc<ThreadNotify>) -> R, + { + CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) + } + + fn park(&self) { + // If currently notified, then we skip sleeping. This is checked outside + // of the lock to avoid acquiring a mutex if not necessary. + match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + NOTIFY => return, + IDLE => {}, + _ => unreachable!(), + } + + // The state is currently idle, so obtain the lock and then try to + // transition to a sleeping state. + let mut m = self.mutex.lock().unwrap(); + + // Transition to sleeping + match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { + NOTIFY => { + // Notified before we could sleep, consume the notification and + // exit + self.state.store(IDLE, Ordering::SeqCst); + return; + } + IDLE => {}, + _ => unreachable!(), + } + + // Loop until we've been notified + loop { + m = self.condvar.wait(m).unwrap(); + + // Transition back to idle, loop otherwise + if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + return; + } + } + } +} + +impl Notify for ThreadNotify { + fn notify(&self, _unpark_id: usize) { + // First, try transitioning from IDLE -> NOTIFY, this does not require a + // lock. + match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { + IDLE | NOTIFY => return, + SLEEP => {} + _ => unreachable!(), + } + + // The other half is sleeping, this requires a lock + let _m = self.mutex.lock().unwrap(); + + // Transition from SLEEP -> NOTIFY + match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { + SLEEP => {} + _ => return, + } + + // Wakeup the sleeper + self.condvar.notify_one(); + } +} + +// ===== UnparkEvent ===== + +/// For the duration of the given callback, add an "unpark event" to be +/// triggered when the task handle is used to unpark the task. +/// +/// Unpark events are used to pass information about what event caused a task to +/// be unparked. In some cases, tasks are waiting on a large number of possible +/// events, and need precise information about the wakeup to avoid extraneous +/// polling. +/// +/// Every `Task` handle comes with a set of unpark events which will fire when +/// `unpark` is called. When fired, these events insert an identifier into a +/// concurrent set, which the task can read from to determine what events +/// occurred. +/// +/// This function immediately invokes the closure, `f`, but arranges things so +/// that `task::park` will produce a `Task` handle that includes the given +/// unpark event. +/// +/// # Panics +/// +/// This function will panic if a task is not currently being executed. That +/// is, this method can be dangerous to call outside of an implementation of +/// `poll`. +#[deprecated(note = "recommended to use `FuturesUnordered` instead")] +#[allow(deprecated)] +pub fn with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R + where F: FnOnce() -> R +{ + super::with(|task| { + let new_task = BorrowedTask { + id: task.id, + unpark: task.unpark, + events: BorrowedEvents::One(&event, &task.events), + map: task.map, + }; + + super::set(&new_task, f) + }) +} + +/// A set insertion to trigger upon `unpark`. +/// +/// Unpark events are used to communicate information about *why* an unpark +/// occurred, in particular populating sets with event identifiers so that the +/// unparked task can avoid extraneous polling. See `with_unpark_event` for +/// more. +#[derive(Clone)] +#[deprecated(note = "recommended to use `FuturesUnordered` instead")] +#[allow(deprecated)] +pub struct UnparkEvent { + set: Arc<EventSet>, + item: usize, +} + +#[allow(deprecated)] +impl UnparkEvent { + /// Construct an unpark event that will insert `id` into `set` when + /// triggered. + #[deprecated(note = "recommended to use `FuturesUnordered` instead")] + pub fn new(set: Arc<EventSet>, id: usize) -> UnparkEvent { + UnparkEvent { + set: set, + item: id, + } + } + + fn unpark(&self) { + self.set.insert(self.item); + } +} + +#[allow(deprecated)] +impl fmt::Debug for UnparkEvent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("UnparkEvent") + .field("set", &"...") + .field("item", &self.item) + .finish() + } +} + +/// A concurrent set which allows for the insertion of `usize` values. +/// +/// `EventSet`s are used to communicate precise information about the event(s) +/// that triggered a task notification. See `task::with_unpark_event` for details. +#[deprecated(since="0.1.18", note = "recommended to use `FuturesUnordered` instead")] +pub trait EventSet: Send + Sync + 'static { + /// Insert the given ID into the set + fn insert(&self, id: usize); +} + +// Safe implementation of `UnsafeNotify` for `Arc` in the standard library. +// +// Note that this is a very unsafe implementation! The crucial pieces is that +// these two values are considered equivalent: +// +// * Arc<T> +// * *const ArcWrapped<T> +// +// We don't actually know the layout of `ArcWrapped<T>` as it's an +// implementation detail in the standard library. We can work, though, by +// casting it through and back an `Arc<T>`. +// +// This also means that you won't actually fine `UnsafeNotify for Arc<T>` +// because it's the wrong level of indirection. These methods are sort of +// receiving Arc<T>, but not an owned version. It's... complicated. We may be +// one of the first users of unsafe trait objects! + +struct ArcWrapped<T>(PhantomData<T>); + +impl<T: Notify + 'static> Notify for ArcWrapped<T> { + fn notify(&self, id: usize) { + unsafe { + let me: *const ArcWrapped<T> = self; + T::notify(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>), + id) + } + } + + fn clone_id(&self, id: usize) -> usize { + unsafe { + let me: *const ArcWrapped<T> = self; + T::clone_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>), + id) + } + } + + fn drop_id(&self, id: usize) { + unsafe { + let me: *const ArcWrapped<T> = self; + T::drop_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>), + id) + } + } +} + +unsafe impl<T: Notify + 'static> UnsafeNotify for ArcWrapped<T> { + unsafe fn clone_raw(&self) -> NotifyHandle { + let me: *const ArcWrapped<T> = self; + let arc = (*(&me as *const *const ArcWrapped<T> as *const Arc<T>)).clone(); + NotifyHandle::from(arc) + } + + unsafe fn drop_raw(&self) { + let mut me: *const ArcWrapped<T> = self; + let me = &mut me as *mut *const ArcWrapped<T> as *mut Arc<T>; + ptr::drop_in_place(me); + } +} + +impl<T> From<Arc<T>> for NotifyHandle + where T: Notify + 'static, +{ + fn from(rc: Arc<T>) -> NotifyHandle { + unsafe { + let ptr = mem::transmute::<Arc<T>, *mut ArcWrapped<T>>(rc); + NotifyHandle::new(ptr) + } + } +} + +#[cfg(feature = "nightly")] +mod nightly { + use super::{TaskUnpark, UnparkEvents}; + use core::marker::Unpin; + + impl Unpin for TaskUnpark {} + impl Unpin for UnparkEvents {} +} diff --git a/third_party/rust/futures-0.1.29/src/task_impl/std/task_rc.rs b/third_party/rust/futures-0.1.29/src/task_impl/std/task_rc.rs new file mode 100644 index 0000000000..51bb44878d --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/std/task_rc.rs @@ -0,0 +1,129 @@ +#![cfg(feature = "with-deprecated")] +#![allow(deprecated)] +#![deprecated(since = "0.1.4", + note = "replaced with `BiLock` in many cases, otherwise slated \ + for removal due to confusion")] + +use std::prelude::v1::*; +use std::sync::Arc; +use std::cell::UnsafeCell; +use task_impl; + +// One critical piece of this module's contents are the `TaskRc<A>` handles. +// The purpose of this is to conceptually be able to store data in a task, +// allowing it to be accessed within multiple futures at once. For example if +// you have some concurrent futures working, they may all want mutable access to +// some data. We already know that when the futures are being poll'd that we're +// entirely synchronized (aka `&mut Task`), so you shouldn't require an +// `Arc<Mutex<T>>` to share as the synchronization isn't necessary! +// +// So the idea here is that you insert data into a task via `Task::insert`, and +// a handle to that data is then returned to you. That handle can later get +// presented to the task itself to actually retrieve the underlying data. The +// invariant is that the data can only ever be accessed with the task present, +// and the lifetime of the actual data returned is connected to the lifetime of +// the task itself. +// +// Conceptually I at least like to think of this as "dynamically adding more +// struct fields to a `Task`". Each call to insert creates a new "name" for the +// struct field, a `TaskRc<A>`, and then you can access the fields of a struct +// with the struct itself (`Task`) as well as the name of the field +// (`TaskRc<A>`). If that analogy doesn't make sense then oh well, it at least +// helped me! +// +// So anyway, we do some interesting trickery here to actually get it to work. +// Each `TaskRc<A>` handle stores `Arc<UnsafeCell<A>>`. So it turns out, we're +// not even adding data to the `Task`! Each `TaskRc<A>` contains a reference +// to this `Arc`, and `TaskRc` handles can be cloned which just bumps the +// reference count on the `Arc` itself. +// +// As before, though, you can present the `Arc` to a `Task` and if they +// originated from the same place you're allowed safe access to the internals. +// We allow but shared and mutable access without the `Sync` bound on the data, +// crucially noting that a `Task` itself is not `Sync`. +// +// So hopefully I've convinced you of this point that the `get` and `get_mut` +// methods below are indeed safe. The data is always valid as it's stored in an +// `Arc`, and access is only allowed with the proof of the associated `Task`. +// One thing you might be asking yourself though is what exactly is this "proof +// of a task"? Right now it's a `usize` corresponding to the `Task`'s +// `TaskHandle` arc allocation. +// +// Wait a minute, isn't that the ABA problem! That is, we create a task A, add +// some data to it, destroy task A, do some work, create a task B, and then ask +// to get the data from task B. In this case though the point of the +// `task_inner` "proof" field is simply that there's some non-`Sync` token +// proving that you can get access to the data. So while weird, this case should +// still be safe, as the data's not stored in the task itself. + +/// A reference to a piece of data that's accessible only within a specific +/// `Task`. +/// +/// This data is `Send` even when `A` is not `Sync`, because the data stored +/// within is accessed in a single-threaded way. The thread accessing it may +/// change over time, if the task migrates, so `A` must be `Send`. +#[derive(Debug)] +pub struct TaskRc<A> { + task: task_impl::Task, + ptr: Arc<UnsafeCell<A>>, +} + +// for safety here, see docs at the top of this module +unsafe impl<A: Send> Send for TaskRc<A> {} +unsafe impl<A: Sync> Sync for TaskRc<A> {} + +impl<A> TaskRc<A> { + /// Inserts a new piece of task-local data into this task, returning a + /// reference to it. + /// + /// Ownership of the data will be transferred to the task, and the data will + /// be destroyed when the task itself is destroyed. The returned value can + /// be passed to the `with` method to get a reference back to the original + /// data. + /// + /// Note that the returned handle is cloneable and copyable and can be sent + /// to other futures which will be associated with the same task. All + /// futures will then have access to this data when passed the reference + /// back. + /// + /// # Panics + /// + /// This function will panic if a task is not currently running. + pub fn new(a: A) -> TaskRc<A> { + TaskRc { + task: task_impl::park(), + ptr: Arc::new(UnsafeCell::new(a)), + } + } + + /// Operate with a reference to the underlying data. + /// + /// This method should be passed a handle previously returned by + /// `Task::insert`. That handle, when passed back into this method, will + /// retrieve a reference to the original data. + /// + /// # Panics + /// + /// This method will panic if a task is not currently running or if `self` + /// does not belong to the task that is currently running. That is, if + /// another task generated the `data` handle passed in, this method will + /// panic. + pub fn with<F, R>(&self, f: F) -> R + where F: FnOnce(&A) -> R + { + if !self.task.is_current() { + panic!("TaskRc being accessed on task it does not belong to"); + } + + f(unsafe { &*self.ptr.get() }) + } +} + +impl<A> Clone for TaskRc<A> { + fn clone(&self) -> TaskRc<A> { + TaskRc { + task: self.task.clone(), + ptr: self.ptr.clone(), + } + } +} diff --git a/third_party/rust/futures-0.1.29/src/task_impl/std/unpark_mutex.rs b/third_party/rust/futures-0.1.29/src/task_impl/std/unpark_mutex.rs new file mode 100644 index 0000000000..246def2753 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/task_impl/std/unpark_mutex.rs @@ -0,0 +1,144 @@ +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// A "lock" around data `D`, which employs a *helping* strategy. +/// +/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being +/// invoked on only a single thread at a time (2) `poll` being invoked at least +/// once after each `unpark` (unless the future has completed). +pub struct UnparkMutex<D> { + // The state of task execution (state machine described below) + status: AtomicUsize, + + // The actual task data, accessible only in the POLLING state + inner: UnsafeCell<Option<D>>, +} + +// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on +// acquisition failure, the current lock holder performs the desired work -- +// re-polling. +// +// As such, these impls mirror those for `Mutex<D>`. In particular, a reference +// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which +// must therefore be `Send`. +unsafe impl<D: Send> Send for UnparkMutex<D> {} +unsafe impl<D: Send> Sync for UnparkMutex<D> {} + +// There are four possible task states, listed below with their possible +// transitions: + +// The task is blocked, waiting on an event +const WAITING: usize = 0; // --> POLLING + +// The task is actively being polled by a thread; arrival of additional events +// of interest should move it to the REPOLL state +const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE + +// The task is actively being polled, but will need to be re-polled upon +// completion to ensure that all events were observed. +const REPOLL: usize = 2; // --> POLLING + +// The task has finished executing (either successfully or with an error/panic) +const COMPLETE: usize = 3; // No transitions out + +impl<D> UnparkMutex<D> { + pub fn new() -> UnparkMutex<D> { + UnparkMutex { + status: AtomicUsize::new(WAITING), + inner: UnsafeCell::new(None), + } + } + + /// Attempt to "notify" the mutex that a poll should occur. + /// + /// An `Ok` result indicates that the `POLLING` state has been entered, and + /// the caller can proceed to poll the future. An `Err` result indicates + /// that polling is not necessary (because the task is finished or the + /// polling has been delegated). + pub fn notify(&self) -> Result<D, ()> { + let mut status = self.status.load(SeqCst); + loop { + match status { + // The task is idle, so try to run it immediately. + WAITING => { + match self.status.compare_exchange(WAITING, POLLING, + SeqCst, SeqCst) { + Ok(_) => { + let data = unsafe { + // SAFETY: we've ensured mutual exclusion via + // the status protocol; we are the only thread + // that has transitioned to the POLLING state, + // and we won't transition back to QUEUED until + // the lock is "released" by this thread. See + // the protocol diagram above. + (*self.inner.get()).take().unwrap() + }; + return Ok(data); + } + Err(cur) => status = cur, + } + } + + // The task is being polled, so we need to record that it should + // be *repolled* when complete. + POLLING => { + match self.status.compare_exchange(POLLING, REPOLL, + SeqCst, SeqCst) { + Ok(_) => return Err(()), + Err(cur) => status = cur, + } + } + + // The task is already scheduled for polling, or is complete, so + // we've got nothing to do. + _ => return Err(()), + } + } + } + + /// Alert the mutex that polling is about to begin, clearing any accumulated + /// re-poll requests. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub unsafe fn start_poll(&self) { + self.status.store(POLLING, SeqCst); + } + + /// Alert the mutex that polling completed with NotReady. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub unsafe fn wait(&self, data: D) -> Result<(), D> { + *self.inner.get() = Some(data); + + match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) { + // no unparks came in while we were running + Ok(_) => Ok(()), + + // guaranteed to be in REPOLL state; just clobber the + // state and run again. + Err(status) => { + assert_eq!(status, REPOLL); + self.status.store(POLLING, SeqCst); + Err((*self.inner.get()).take().unwrap()) + } + } + } + + /// Alert the mutex that the task has completed execution and should not be + /// notified again. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub unsafe fn complete(&self) { + self.status.store(COMPLETE, SeqCst); + } +} |