diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/task_impl/std')
4 files changed, 1123 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/data.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/data.rs new file mode 100644 index 0000000000..770912b219 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/task_impl/std/data.rs @@ -0,0 +1,131 @@ +use std::prelude::v1::*; + +use std::any::TypeId; +use std::cell::RefCell; +use std::hash::{BuildHasherDefault, Hasher}; +use std::collections::HashMap; + +use task_impl::with; + +/// A macro to create a `static` of type `LocalKey` +/// +/// This macro is intentionally similar to the `thread_local!`, and creates a +/// `static` which has a `with` method to access the data on a task. +/// +/// The data associated with each task local is per-task, so different tasks +/// will contain different values. +#[macro_export] +macro_rules! task_local { + (static $NAME:ident: $t:ty = $e:expr) => ( + static $NAME: $crate::task::LocalKey<$t> = { + fn __init() -> $t { $e } + fn __key() -> ::std::any::TypeId { + struct __A; + ::std::any::TypeId::of::<__A>() + } + $crate::task::LocalKey { + __init: __init, + __key: __key, + } + }; + ) +} + +pub type LocalMap = RefCell<HashMap<TypeId, + Box<Opaque>, + BuildHasherDefault<IdHasher>>>; + +pub fn local_map() -> LocalMap { + RefCell::new(HashMap::default()) +} + +pub trait Opaque: Send {} +impl<T: Send> Opaque for T {} + +/// A key for task-local data stored in a future's task. +/// +/// This type is generated by the `task_local!` macro and performs very +/// similarly to the `thread_local!` macro and `std::thread::LocalKey` types. +/// Data associated with a `LocalKey<T>` is stored inside of a future's task, +/// and the data is destroyed when the future is completed and the task is +/// destroyed. +/// +/// Task-local data can migrate between threads and hence requires a `Send` +/// bound. Additionally, task-local data also requires the `'static` bound to +/// ensure it lives long enough. When a key is accessed for the first time the +/// task's data is initialized with the provided initialization expression to +/// the macro. +#[derive(Debug)] +pub struct LocalKey<T> { + // "private" fields which have to be public to get around macro hygiene, not + // included in the stability story for this type. Can change at any time. + #[doc(hidden)] + pub __key: fn() -> TypeId, + #[doc(hidden)] + pub __init: fn() -> T, +} + +pub struct IdHasher { + id: u64, +} + +impl Default for IdHasher { + fn default() -> IdHasher { + IdHasher { id: 0 } + } +} + +impl Hasher for IdHasher { + fn write(&mut self, _bytes: &[u8]) { + // TODO: need to do something sensible + panic!("can only hash u64"); + } + + fn write_u64(&mut self, u: u64) { + self.id = u; + } + + fn finish(&self) -> u64 { + self.id + } +} + +impl<T: Send + 'static> LocalKey<T> { + /// Access this task-local key, running the provided closure with a + /// reference to the value. + /// + /// This function will access this task-local key to retrieve the data + /// associated with the current task and this key. If this is the first time + /// this key has been accessed on this task, then the key will be + /// initialized with the initialization expression provided at the time the + /// `task_local!` macro was called. + /// + /// The provided closure will be provided a shared reference to the + /// underlying data associated with this task-local-key. The data itself is + /// stored inside of the current task. + /// + /// # Panics + /// + /// This function can possibly panic for a number of reasons: + /// + /// * If there is not a current task. + /// * If the initialization expression is run and it panics + /// * If the closure provided panics + pub fn with<F, R>(&'static self, f: F) -> R + where F: FnOnce(&T) -> R + { + let key = (self.__key)(); + with(|task| { + let raw_pointer = { + let mut data = task.map.borrow_mut(); + let entry = data.entry(key).or_insert_with(|| { + Box::new((self.__init)()) + }); + &**entry as *const Opaque as *const T + }; + unsafe { + f(&*raw_pointer) + } + }) + } +} diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs new file mode 100644 index 0000000000..e82a23e5d0 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs @@ -0,0 +1,719 @@ +use std::prelude::v1::*; + +use std::cell::Cell; +use std::fmt; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::{Arc, Mutex, Condvar, Once}; +#[allow(deprecated)] +use std::sync::ONCE_INIT; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; +use super::core; +use super::{BorrowedTask, NotifyHandle, Spawn, spawn, Notify, UnsafeNotify}; + +mod unpark_mutex; +pub use self::unpark_mutex::UnparkMutex; + +mod data; +pub use self::data::*; + +mod task_rc; +#[allow(deprecated)] +#[cfg(feature = "with-deprecated")] +pub use self::task_rc::TaskRc; + +pub use task_impl::core::init; + +thread_local!(static CURRENT_TASK: Cell<*mut u8> = Cell::new(ptr::null_mut())); + +/// Return whether the caller is running in a task (and so can use task_local!). +pub fn is_in_task() -> bool { + CURRENT_TASK.with(|task| !task.get().is_null()) +} + +#[allow(deprecated)] +static INIT: Once = ONCE_INIT; + +pub fn get_ptr() -> Option<*mut u8> { + // Since this condition will always return true when TLS task storage is + // used (the default), the branch predictor will be able to optimize the + // branching and a dynamic dispatch will be avoided, which makes the + // compiler happier. + if core::is_get_ptr(0x1) { + Some(CURRENT_TASK.with(|c| c.get())) + } else { + core::get_ptr() + } +} + +fn tls_slot() -> *const Cell<*mut u8> { + CURRENT_TASK.with(|c| c as *const _) +} + +pub fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R + where F: FnOnce() -> R +{ + // Lazily initialize the get / set ptrs + // + // Note that we won't actually use these functions ever, we'll instead be + // testing the pointer's value elsewhere and calling our own functions. + INIT.call_once(|| unsafe { + let get = mem::transmute::<usize, _>(0x1); + let set = mem::transmute::<usize, _>(0x2); + init(get, set); + }); + + // Same as above. + if core::is_get_ptr(0x1) { + struct Reset(*const Cell<*mut u8>, *mut u8); + + impl Drop for Reset { + #[inline] + fn drop(&mut self) { + unsafe { + (*self.0).set(self.1); + } + } + } + + unsafe { + let slot = tls_slot(); + let _reset = Reset(slot, (*slot).get()); + (*slot).set(task as *const _ as *mut u8); + f() + } + } else { + core::set(task, f) + } +} + +#[derive(Copy, Clone)] +#[allow(deprecated)] +pub enum BorrowedUnpark<'a> { + Old(&'a Arc<Unpark>), + New(core::BorrowedUnpark<'a>), +} + +#[derive(Copy, Clone)] +#[allow(deprecated)] +pub enum BorrowedEvents<'a> { + None, + One(&'a UnparkEvent, &'a BorrowedEvents<'a>), +} + +#[derive(Clone)] +pub enum TaskUnpark { + #[allow(deprecated)] + Old(Arc<Unpark>), + New(core::TaskUnpark), +} + +#[derive(Clone)] +#[allow(deprecated)] +pub enum UnparkEvents { + None, + One(UnparkEvent), + Many(Box<[UnparkEvent]>), +} + +impl<'a> BorrowedUnpark<'a> { + #[inline] + pub fn new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a> { + BorrowedUnpark::New(core::BorrowedUnpark::new(f, id)) + } + + #[inline] + pub fn to_owned(&self) -> TaskUnpark { + match *self { + BorrowedUnpark::Old(old) => TaskUnpark::Old(old.clone()), + BorrowedUnpark::New(new) => TaskUnpark::New(new.to_owned()), + } + } +} + +impl<'a> BorrowedEvents<'a> { + #[inline] + pub fn new() -> BorrowedEvents<'a> { + BorrowedEvents::None + } + + #[inline] + pub fn to_owned(&self) -> UnparkEvents { + let mut one_event = None; + let mut list = Vec::new(); + let mut cur = self; + while let BorrowedEvents::One(event, next) = *cur { + let event = event.clone(); + match one_event.take() { + None if list.len() == 0 => one_event = Some(event), + None => list.push(event), + Some(event2) => { + list.push(event2); + list.push(event); + } + } + cur = next; + } + + match one_event { + None if list.len() == 0 => UnparkEvents::None, + None => UnparkEvents::Many(list.into_boxed_slice()), + Some(e) => UnparkEvents::One(e), + } + } +} + +impl UnparkEvents { + pub fn notify(&self) { + match *self { + UnparkEvents::None => {} + UnparkEvents::One(ref e) => e.unpark(), + UnparkEvents::Many(ref list) => { + for event in list.iter() { + event.unpark(); + } + } + } + } + + pub fn will_notify(&self, events: &BorrowedEvents) -> bool { + // Pessimistically assume that any unpark events mean that we're not + // equivalent to the current task. + match *self { + UnparkEvents::None => {} + _ => return false, + } + + match *events { + BorrowedEvents::None => return true, + _ => {}, + } + + return false + } +} + +#[allow(deprecated)] +impl TaskUnpark { + pub fn notify(&self) { + match *self { + TaskUnpark::Old(ref old) => old.unpark(), + TaskUnpark::New(ref new) => new.notify(), + } + } + + pub fn will_notify(&self, unpark: &BorrowedUnpark) -> bool { + match (unpark, self) { + (&BorrowedUnpark::Old(old1), &TaskUnpark::Old(ref old2)) => { + &**old1 as *const Unpark == &**old2 as *const Unpark + } + (&BorrowedUnpark::New(ref new1), &TaskUnpark::New(ref new2)) => { + new2.will_notify(new1) + } + _ => false, + } + } +} + +impl<F: Future> Spawn<F> { + #[doc(hidden)] + #[deprecated(note = "recommended to use `poll_future_notify` instead")] + #[allow(deprecated)] + pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> { + self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll()) + } + + /// Waits for the internal future to complete, blocking this thread's + /// execution until it does. + /// + /// This function will call `poll_future` in a loop, waiting for the future + /// to complete. When a future cannot make progress it will use + /// `thread::park` to block the current thread. + pub fn wait_future(&mut self) -> Result<F::Item, F::Error> { + ThreadNotify::with_current(|notify| { + + loop { + match self.poll_future_notify(notify, 0)? { + Async::NotReady => notify.park(), + Async::Ready(e) => return Ok(e), + } + } + }) + } + + + #[doc(hidden)] + #[deprecated] + #[allow(deprecated)] + pub fn execute(self, exec: Arc<Executor>) + where F: Future<Item=(), Error=()> + Send + 'static, + { + exec.clone().execute(Run { + // Ideally this method would be defined directly on + // `Spawn<BoxFuture<(), ()>>` so we wouldn't have to box here and + // it'd be more explicit, but unfortunately that currently has a + // link error on nightly: rust-lang/rust#36155 + spawn: spawn(Box::new(self.into_inner())), + inner: Arc::new(RunInner { + exec: exec, + mutex: UnparkMutex::new() + }), + }) + } +} + +impl<S: Stream> Spawn<S> { + #[deprecated(note = "recommended to use `poll_stream_notify` instead")] + #[allow(deprecated)] + #[doc(hidden)] + pub fn poll_stream(&mut self, unpark: Arc<Unpark>) + -> Poll<Option<S::Item>, S::Error> { + self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll()) + } + + /// Like `wait_future`, except only waits for the next element to arrive on + /// the underlying stream. + pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> { + ThreadNotify::with_current(|notify| { + + loop { + match self.poll_stream_notify(notify, 0) { + Ok(Async::NotReady) => notify.park(), + Ok(Async::Ready(Some(e))) => return Some(Ok(e)), + Ok(Async::Ready(None)) => return None, + Err(e) => return Some(Err(e)), + } + } + }) + } +} + +impl<S: Sink> Spawn<S> { + #[doc(hidden)] + #[deprecated(note = "recommended to use `start_send_notify` instead")] + #[allow(deprecated)] + pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>) + -> StartSend<S::SinkItem, S::SinkError> { + self.enter(BorrowedUnpark::Old(unpark), |s| s.start_send(value)) + } + + #[deprecated(note = "recommended to use `poll_flush_notify` instead")] + #[allow(deprecated)] + #[doc(hidden)] + pub fn poll_flush(&mut self, unpark: &Arc<Unpark>) + -> Poll<(), S::SinkError> { + self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete()) + } + + /// Blocks the current thread until it's able to send `value` on this sink. + /// + /// This function will send the `value` on the sink that this task wraps. If + /// the sink is not ready to send the value yet then the current thread will + /// be blocked until it's able to send the value. + pub fn wait_send(&mut self, mut value: S::SinkItem) + -> Result<(), S::SinkError> { + ThreadNotify::with_current(|notify| { + + loop { + value = match self.start_send_notify(value, notify, 0)? { + AsyncSink::NotReady(v) => v, + AsyncSink::Ready => return Ok(()), + }; + notify.park(); + } + }) + } + + /// Blocks the current thread until it's able to flush this sink. + /// + /// This function will call the underlying sink's `poll_complete` method + /// until it returns that it's ready, proxying out errors upwards to the + /// caller if one occurs. + /// + /// The thread will be blocked until `poll_complete` returns that it's + /// ready. + pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { + ThreadNotify::with_current(|notify| { + + loop { + if self.poll_flush_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); + } + }) + } + + /// Blocks the current thread until it's able to close this sink. + /// + /// This function will close the sink that this task wraps. If the sink + /// is not ready to be close yet, then the current thread will be blocked + /// until it's closed. + pub fn wait_close(&mut self) -> Result<(), S::SinkError> { + ThreadNotify::with_current(|notify| { + + loop { + if self.close_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); + } + }) + } +} + +/// A trait which represents a sink of notifications that a future is ready to +/// make progress. +/// +/// This trait is provided as an argument to the `Spawn::poll_future` and +/// `Spawn::poll_stream` functions. It's transitively used as part of the +/// `Task::unpark` method to internally deliver notifications of readiness of a +/// future to move forward. +#[deprecated(note = "recommended to use `Notify` instead")] +pub trait Unpark: Send + Sync { + /// Indicates that an associated future and/or task are ready to make + /// progress. + /// + /// Typically this means that the receiver of the notification should + /// arrange for the future to get poll'd in a prompt fashion. + fn unpark(&self); +} + +/// A trait representing requests to poll futures. +/// +/// This trait is an argument to the `Spawn::execute` which is used to run a +/// future to completion. An executor will receive requests to run a future and +/// an executor is responsible for ensuring that happens in a timely fashion. +/// +/// Note that this trait is likely to be deprecated and/or renamed to avoid +/// clashing with the `future::Executor` trait. If you've got a use case for +/// this or would like to comment on the name please let us know! +#[deprecated] +#[allow(deprecated)] +pub trait Executor: Send + Sync + 'static { + /// Requests that `Run` is executed soon on the given executor. + fn execute(&self, r: Run); +} + +/// Units of work submitted to an `Executor`, currently only created +/// internally. +#[deprecated] +pub struct Run { + spawn: Spawn<Box<Future<Item = (), Error = ()> + Send>>, + inner: Arc<RunInner>, +} + +#[allow(deprecated)] +struct RunInner { + mutex: UnparkMutex<Run>, + exec: Arc<Executor>, +} + +#[allow(deprecated)] +impl Run { + /// Actually run the task (invoking `poll` on its future) on the current + /// thread. + pub fn run(self) { + let Run { mut spawn, inner } = self; + + // SAFETY: the ownership of this `Run` object is evidence that + // we are in the `POLLING`/`REPOLL` state for the mutex. + unsafe { + inner.mutex.start_poll(); + + loop { + match spawn.poll_future_notify(&inner, 0) { + Ok(Async::NotReady) => {} + Ok(Async::Ready(())) | + Err(()) => return inner.mutex.complete(), + } + let run = Run { spawn: spawn, inner: inner.clone() }; + match inner.mutex.wait(run) { + Ok(()) => return, // we've waited + Err(r) => spawn = r.spawn, // someone's notified us + } + } + } + } +} + +#[allow(deprecated)] +impl fmt::Debug for Run { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Run") + .field("contents", &"...") + .finish() + } +} + +#[allow(deprecated)] +impl Notify for RunInner { + fn notify(&self, _id: usize) { + match self.mutex.notify() { + Ok(run) => self.exec.execute(run), + Err(()) => {} + } + } +} + +// ===== ThreadNotify ===== + +struct ThreadNotify { + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +const IDLE: usize = 0; +const NOTIFY: usize = 1; +const SLEEP: usize = 2; + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify { + state: AtomicUsize::new(IDLE), + mutex: Mutex::new(()), + condvar: Condvar::new(), + }); +} + +impl ThreadNotify { + fn with_current<F, R>(f: F) -> R + where F: FnOnce(&Arc<ThreadNotify>) -> R, + { + CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) + } + + fn park(&self) { + // If currently notified, then we skip sleeping. This is checked outside + // of the lock to avoid acquiring a mutex if not necessary. + match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + NOTIFY => return, + IDLE => {}, + _ => unreachable!(), + } + + // The state is currently idle, so obtain the lock and then try to + // transition to a sleeping state. + let mut m = self.mutex.lock().unwrap(); + + // Transition to sleeping + match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { + NOTIFY => { + // Notified before we could sleep, consume the notification and + // exit + self.state.store(IDLE, Ordering::SeqCst); + return; + } + IDLE => {}, + _ => unreachable!(), + } + + // Loop until we've been notified + loop { + m = self.condvar.wait(m).unwrap(); + + // Transition back to idle, loop otherwise + if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + return; + } + } + } +} + +impl Notify for ThreadNotify { + fn notify(&self, _unpark_id: usize) { + // First, try transitioning from IDLE -> NOTIFY, this does not require a + // lock. + match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { + IDLE | NOTIFY => return, + SLEEP => {} + _ => unreachable!(), + } + + // The other half is sleeping, this requires a lock + let _m = self.mutex.lock().unwrap(); + + // Transition from SLEEP -> NOTIFY + match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { + SLEEP => {} + _ => return, + } + + // Wakeup the sleeper + self.condvar.notify_one(); + } +} + +// ===== UnparkEvent ===== + +/// For the duration of the given callback, add an "unpark event" to be +/// triggered when the task handle is used to unpark the task. +/// +/// Unpark events are used to pass information about what event caused a task to +/// be unparked. In some cases, tasks are waiting on a large number of possible +/// events, and need precise information about the wakeup to avoid extraneous +/// polling. +/// +/// Every `Task` handle comes with a set of unpark events which will fire when +/// `unpark` is called. When fired, these events insert an identifier into a +/// concurrent set, which the task can read from to determine what events +/// occurred. +/// +/// This function immediately invokes the closure, `f`, but arranges things so +/// that `task::park` will produce a `Task` handle that includes the given +/// unpark event. +/// +/// # Panics +/// +/// This function will panic if a task is not currently being executed. That +/// is, this method can be dangerous to call outside of an implementation of +/// `poll`. +#[deprecated(note = "recommended to use `FuturesUnordered` instead")] +#[allow(deprecated)] +pub fn with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R + where F: FnOnce() -> R +{ + super::with(|task| { + let new_task = BorrowedTask { + id: task.id, + unpark: task.unpark, + events: BorrowedEvents::One(&event, &task.events), + map: task.map, + }; + + super::set(&new_task, f) + }) +} + +/// A set insertion to trigger upon `unpark`. +/// +/// Unpark events are used to communicate information about *why* an unpark +/// occurred, in particular populating sets with event identifiers so that the +/// unparked task can avoid extraneous polling. See `with_unpark_event` for +/// more. +#[derive(Clone)] +#[deprecated(note = "recommended to use `FuturesUnordered` instead")] +#[allow(deprecated)] +pub struct UnparkEvent { + set: Arc<EventSet>, + item: usize, +} + +#[allow(deprecated)] +impl UnparkEvent { + /// Construct an unpark event that will insert `id` into `set` when + /// triggered. + #[deprecated(note = "recommended to use `FuturesUnordered` instead")] + pub fn new(set: Arc<EventSet>, id: usize) -> UnparkEvent { + UnparkEvent { + set: set, + item: id, + } + } + + fn unpark(&self) { + self.set.insert(self.item); + } +} + +#[allow(deprecated)] +impl fmt::Debug for UnparkEvent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("UnparkEvent") + .field("set", &"...") + .field("item", &self.item) + .finish() + } +} + +/// A concurrent set which allows for the insertion of `usize` values. +/// +/// `EventSet`s are used to communicate precise information about the event(s) +/// that triggered a task notification. See `task::with_unpark_event` for details. +#[deprecated(since="0.1.18", note = "recommended to use `FuturesUnordered` instead")] +pub trait EventSet: Send + Sync + 'static { + /// Insert the given ID into the set + fn insert(&self, id: usize); +} + +// Safe implementation of `UnsafeNotify` for `Arc` in the standard library. +// +// Note that this is a very unsafe implementation! The crucial pieces is that +// these two values are considered equivalent: +// +// * Arc<T> +// * *const ArcWrapped<T> +// +// We don't actually know the layout of `ArcWrapped<T>` as it's an +// implementation detail in the standard library. We can work, though, by +// casting it through and back an `Arc<T>`. +// +// This also means that you won't actually fine `UnsafeNotify for Arc<T>` +// because it's the wrong level of indirection. These methods are sort of +// receiving Arc<T>, but not an owned version. It's... complicated. We may be +// one of the first users of unsafe trait objects! + +struct ArcWrapped<T>(PhantomData<T>); + +impl<T: Notify + 'static> Notify for ArcWrapped<T> { + fn notify(&self, id: usize) { + unsafe { + let me: *const ArcWrapped<T> = self; + T::notify(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>), + id) + } + } + + fn clone_id(&self, id: usize) -> usize { + unsafe { + let me: *const ArcWrapped<T> = self; + T::clone_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>), + id) + } + } + + fn drop_id(&self, id: usize) { + unsafe { + let me: *const ArcWrapped<T> = self; + T::drop_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>), + id) + } + } +} + +unsafe impl<T: Notify + 'static> UnsafeNotify for ArcWrapped<T> { + unsafe fn clone_raw(&self) -> NotifyHandle { + let me: *const ArcWrapped<T> = self; + let arc = (*(&me as *const *const ArcWrapped<T> as *const Arc<T>)).clone(); + NotifyHandle::from(arc) + } + + unsafe fn drop_raw(&self) { + let mut me: *const ArcWrapped<T> = self; + let me = &mut me as *mut *const ArcWrapped<T> as *mut Arc<T>; + ptr::drop_in_place(me); + } +} + +impl<T> From<Arc<T>> for NotifyHandle + where T: Notify + 'static, +{ + fn from(rc: Arc<T>) -> NotifyHandle { + unsafe { + let ptr = mem::transmute::<Arc<T>, *mut ArcWrapped<T>>(rc); + NotifyHandle::new(ptr) + } + } +} + +#[cfg(feature = "nightly")] +mod nightly { + use super::{TaskUnpark, UnparkEvents}; + use core::marker::Unpin; + + impl Unpin for TaskUnpark {} + impl Unpin for UnparkEvents {} +} diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs new file mode 100644 index 0000000000..51bb44878d --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs @@ -0,0 +1,129 @@ +#![cfg(feature = "with-deprecated")] +#![allow(deprecated)] +#![deprecated(since = "0.1.4", + note = "replaced with `BiLock` in many cases, otherwise slated \ + for removal due to confusion")] + +use std::prelude::v1::*; +use std::sync::Arc; +use std::cell::UnsafeCell; +use task_impl; + +// One critical piece of this module's contents are the `TaskRc<A>` handles. +// The purpose of this is to conceptually be able to store data in a task, +// allowing it to be accessed within multiple futures at once. For example if +// you have some concurrent futures working, they may all want mutable access to +// some data. We already know that when the futures are being poll'd that we're +// entirely synchronized (aka `&mut Task`), so you shouldn't require an +// `Arc<Mutex<T>>` to share as the synchronization isn't necessary! +// +// So the idea here is that you insert data into a task via `Task::insert`, and +// a handle to that data is then returned to you. That handle can later get +// presented to the task itself to actually retrieve the underlying data. The +// invariant is that the data can only ever be accessed with the task present, +// and the lifetime of the actual data returned is connected to the lifetime of +// the task itself. +// +// Conceptually I at least like to think of this as "dynamically adding more +// struct fields to a `Task`". Each call to insert creates a new "name" for the +// struct field, a `TaskRc<A>`, and then you can access the fields of a struct +// with the struct itself (`Task`) as well as the name of the field +// (`TaskRc<A>`). If that analogy doesn't make sense then oh well, it at least +// helped me! +// +// So anyway, we do some interesting trickery here to actually get it to work. +// Each `TaskRc<A>` handle stores `Arc<UnsafeCell<A>>`. So it turns out, we're +// not even adding data to the `Task`! Each `TaskRc<A>` contains a reference +// to this `Arc`, and `TaskRc` handles can be cloned which just bumps the +// reference count on the `Arc` itself. +// +// As before, though, you can present the `Arc` to a `Task` and if they +// originated from the same place you're allowed safe access to the internals. +// We allow but shared and mutable access without the `Sync` bound on the data, +// crucially noting that a `Task` itself is not `Sync`. +// +// So hopefully I've convinced you of this point that the `get` and `get_mut` +// methods below are indeed safe. The data is always valid as it's stored in an +// `Arc`, and access is only allowed with the proof of the associated `Task`. +// One thing you might be asking yourself though is what exactly is this "proof +// of a task"? Right now it's a `usize` corresponding to the `Task`'s +// `TaskHandle` arc allocation. +// +// Wait a minute, isn't that the ABA problem! That is, we create a task A, add +// some data to it, destroy task A, do some work, create a task B, and then ask +// to get the data from task B. In this case though the point of the +// `task_inner` "proof" field is simply that there's some non-`Sync` token +// proving that you can get access to the data. So while weird, this case should +// still be safe, as the data's not stored in the task itself. + +/// A reference to a piece of data that's accessible only within a specific +/// `Task`. +/// +/// This data is `Send` even when `A` is not `Sync`, because the data stored +/// within is accessed in a single-threaded way. The thread accessing it may +/// change over time, if the task migrates, so `A` must be `Send`. +#[derive(Debug)] +pub struct TaskRc<A> { + task: task_impl::Task, + ptr: Arc<UnsafeCell<A>>, +} + +// for safety here, see docs at the top of this module +unsafe impl<A: Send> Send for TaskRc<A> {} +unsafe impl<A: Sync> Sync for TaskRc<A> {} + +impl<A> TaskRc<A> { + /// Inserts a new piece of task-local data into this task, returning a + /// reference to it. + /// + /// Ownership of the data will be transferred to the task, and the data will + /// be destroyed when the task itself is destroyed. The returned value can + /// be passed to the `with` method to get a reference back to the original + /// data. + /// + /// Note that the returned handle is cloneable and copyable and can be sent + /// to other futures which will be associated with the same task. All + /// futures will then have access to this data when passed the reference + /// back. + /// + /// # Panics + /// + /// This function will panic if a task is not currently running. + pub fn new(a: A) -> TaskRc<A> { + TaskRc { + task: task_impl::park(), + ptr: Arc::new(UnsafeCell::new(a)), + } + } + + /// Operate with a reference to the underlying data. + /// + /// This method should be passed a handle previously returned by + /// `Task::insert`. That handle, when passed back into this method, will + /// retrieve a reference to the original data. + /// + /// # Panics + /// + /// This method will panic if a task is not currently running or if `self` + /// does not belong to the task that is currently running. That is, if + /// another task generated the `data` handle passed in, this method will + /// panic. + pub fn with<F, R>(&self, f: F) -> R + where F: FnOnce(&A) -> R + { + if !self.task.is_current() { + panic!("TaskRc being accessed on task it does not belong to"); + } + + f(unsafe { &*self.ptr.get() }) + } +} + +impl<A> Clone for TaskRc<A> { + fn clone(&self) -> TaskRc<A> { + TaskRc { + task: self.task.clone(), + ptr: self.ptr.clone(), + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs new file mode 100644 index 0000000000..246def2753 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs @@ -0,0 +1,144 @@ +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// A "lock" around data `D`, which employs a *helping* strategy. +/// +/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being +/// invoked on only a single thread at a time (2) `poll` being invoked at least +/// once after each `unpark` (unless the future has completed). +pub struct UnparkMutex<D> { + // The state of task execution (state machine described below) + status: AtomicUsize, + + // The actual task data, accessible only in the POLLING state + inner: UnsafeCell<Option<D>>, +} + +// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on +// acquisition failure, the current lock holder performs the desired work -- +// re-polling. +// +// As such, these impls mirror those for `Mutex<D>`. In particular, a reference +// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which +// must therefore be `Send`. +unsafe impl<D: Send> Send for UnparkMutex<D> {} +unsafe impl<D: Send> Sync for UnparkMutex<D> {} + +// There are four possible task states, listed below with their possible +// transitions: + +// The task is blocked, waiting on an event +const WAITING: usize = 0; // --> POLLING + +// The task is actively being polled by a thread; arrival of additional events +// of interest should move it to the REPOLL state +const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE + +// The task is actively being polled, but will need to be re-polled upon +// completion to ensure that all events were observed. +const REPOLL: usize = 2; // --> POLLING + +// The task has finished executing (either successfully or with an error/panic) +const COMPLETE: usize = 3; // No transitions out + +impl<D> UnparkMutex<D> { + pub fn new() -> UnparkMutex<D> { + UnparkMutex { + status: AtomicUsize::new(WAITING), + inner: UnsafeCell::new(None), + } + } + + /// Attempt to "notify" the mutex that a poll should occur. + /// + /// An `Ok` result indicates that the `POLLING` state has been entered, and + /// the caller can proceed to poll the future. An `Err` result indicates + /// that polling is not necessary (because the task is finished or the + /// polling has been delegated). + pub fn notify(&self) -> Result<D, ()> { + let mut status = self.status.load(SeqCst); + loop { + match status { + // The task is idle, so try to run it immediately. + WAITING => { + match self.status.compare_exchange(WAITING, POLLING, + SeqCst, SeqCst) { + Ok(_) => { + let data = unsafe { + // SAFETY: we've ensured mutual exclusion via + // the status protocol; we are the only thread + // that has transitioned to the POLLING state, + // and we won't transition back to QUEUED until + // the lock is "released" by this thread. See + // the protocol diagram above. + (*self.inner.get()).take().unwrap() + }; + return Ok(data); + } + Err(cur) => status = cur, + } + } + + // The task is being polled, so we need to record that it should + // be *repolled* when complete. + POLLING => { + match self.status.compare_exchange(POLLING, REPOLL, + SeqCst, SeqCst) { + Ok(_) => return Err(()), + Err(cur) => status = cur, + } + } + + // The task is already scheduled for polling, or is complete, so + // we've got nothing to do. + _ => return Err(()), + } + } + } + + /// Alert the mutex that polling is about to begin, clearing any accumulated + /// re-poll requests. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub unsafe fn start_poll(&self) { + self.status.store(POLLING, SeqCst); + } + + /// Alert the mutex that polling completed with NotReady. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub unsafe fn wait(&self, data: D) -> Result<(), D> { + *self.inner.get() = Some(data); + + match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) { + // no unparks came in while we were running + Ok(_) => Ok(()), + + // guaranteed to be in REPOLL state; just clobber the + // state and run again. + Err(status) => { + assert_eq!(status, REPOLL); + self.status.store(POLLING, SeqCst); + Err((*self.inner.get()).take().unwrap()) + } + } + } + + /// Alert the mutex that the task has completed execution and should not be + /// notified again. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub unsafe fn complete(&self) { + self.status.store(COMPLETE, SeqCst); + } +} |