summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/task_impl/std
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/task_impl/std')
-rw-r--r--third_party/rust/futures-0.1.31/src/task_impl/std/data.rs131
-rw-r--r--third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs719
-rw-r--r--third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs129
-rw-r--r--third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs144
4 files changed, 1123 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/data.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/data.rs
new file mode 100644
index 0000000000..770912b219
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/task_impl/std/data.rs
@@ -0,0 +1,131 @@
+use std::prelude::v1::*;
+
+use std::any::TypeId;
+use std::cell::RefCell;
+use std::hash::{BuildHasherDefault, Hasher};
+use std::collections::HashMap;
+
+use task_impl::with;
+
+/// A macro to create a `static` of type `LocalKey`
+///
+/// This macro is intentionally similar to the `thread_local!`, and creates a
+/// `static` which has a `with` method to access the data on a task.
+///
+/// The data associated with each task local is per-task, so different tasks
+/// will contain different values.
+#[macro_export]
+macro_rules! task_local {
+ (static $NAME:ident: $t:ty = $e:expr) => (
+ static $NAME: $crate::task::LocalKey<$t> = {
+ fn __init() -> $t { $e }
+ fn __key() -> ::std::any::TypeId {
+ struct __A;
+ ::std::any::TypeId::of::<__A>()
+ }
+ $crate::task::LocalKey {
+ __init: __init,
+ __key: __key,
+ }
+ };
+ )
+}
+
+pub type LocalMap = RefCell<HashMap<TypeId,
+ Box<Opaque>,
+ BuildHasherDefault<IdHasher>>>;
+
+pub fn local_map() -> LocalMap {
+ RefCell::new(HashMap::default())
+}
+
+pub trait Opaque: Send {}
+impl<T: Send> Opaque for T {}
+
+/// A key for task-local data stored in a future's task.
+///
+/// This type is generated by the `task_local!` macro and performs very
+/// similarly to the `thread_local!` macro and `std::thread::LocalKey` types.
+/// Data associated with a `LocalKey<T>` is stored inside of a future's task,
+/// and the data is destroyed when the future is completed and the task is
+/// destroyed.
+///
+/// Task-local data can migrate between threads and hence requires a `Send`
+/// bound. Additionally, task-local data also requires the `'static` bound to
+/// ensure it lives long enough. When a key is accessed for the first time the
+/// task's data is initialized with the provided initialization expression to
+/// the macro.
+#[derive(Debug)]
+pub struct LocalKey<T> {
+ // "private" fields which have to be public to get around macro hygiene, not
+ // included in the stability story for this type. Can change at any time.
+ #[doc(hidden)]
+ pub __key: fn() -> TypeId,
+ #[doc(hidden)]
+ pub __init: fn() -> T,
+}
+
+pub struct IdHasher {
+ id: u64,
+}
+
+impl Default for IdHasher {
+ fn default() -> IdHasher {
+ IdHasher { id: 0 }
+ }
+}
+
+impl Hasher for IdHasher {
+ fn write(&mut self, _bytes: &[u8]) {
+ // TODO: need to do something sensible
+ panic!("can only hash u64");
+ }
+
+ fn write_u64(&mut self, u: u64) {
+ self.id = u;
+ }
+
+ fn finish(&self) -> u64 {
+ self.id
+ }
+}
+
+impl<T: Send + 'static> LocalKey<T> {
+ /// Access this task-local key, running the provided closure with a
+ /// reference to the value.
+ ///
+ /// This function will access this task-local key to retrieve the data
+ /// associated with the current task and this key. If this is the first time
+ /// this key has been accessed on this task, then the key will be
+ /// initialized with the initialization expression provided at the time the
+ /// `task_local!` macro was called.
+ ///
+ /// The provided closure will be provided a shared reference to the
+ /// underlying data associated with this task-local-key. The data itself is
+ /// stored inside of the current task.
+ ///
+ /// # Panics
+ ///
+ /// This function can possibly panic for a number of reasons:
+ ///
+ /// * If there is not a current task.
+ /// * If the initialization expression is run and it panics
+ /// * If the closure provided panics
+ pub fn with<F, R>(&'static self, f: F) -> R
+ where F: FnOnce(&T) -> R
+ {
+ let key = (self.__key)();
+ with(|task| {
+ let raw_pointer = {
+ let mut data = task.map.borrow_mut();
+ let entry = data.entry(key).or_insert_with(|| {
+ Box::new((self.__init)())
+ });
+ &**entry as *const Opaque as *const T
+ };
+ unsafe {
+ f(&*raw_pointer)
+ }
+ })
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs
new file mode 100644
index 0000000000..e82a23e5d0
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/task_impl/std/mod.rs
@@ -0,0 +1,719 @@
+use std::prelude::v1::*;
+
+use std::cell::Cell;
+use std::fmt;
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr;
+use std::sync::{Arc, Mutex, Condvar, Once};
+#[allow(deprecated)]
+use std::sync::ONCE_INIT;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink};
+use super::core;
+use super::{BorrowedTask, NotifyHandle, Spawn, spawn, Notify, UnsafeNotify};
+
+mod unpark_mutex;
+pub use self::unpark_mutex::UnparkMutex;
+
+mod data;
+pub use self::data::*;
+
+mod task_rc;
+#[allow(deprecated)]
+#[cfg(feature = "with-deprecated")]
+pub use self::task_rc::TaskRc;
+
+pub use task_impl::core::init;
+
+thread_local!(static CURRENT_TASK: Cell<*mut u8> = Cell::new(ptr::null_mut()));
+
+/// Return whether the caller is running in a task (and so can use task_local!).
+pub fn is_in_task() -> bool {
+ CURRENT_TASK.with(|task| !task.get().is_null())
+}
+
+#[allow(deprecated)]
+static INIT: Once = ONCE_INIT;
+
+pub fn get_ptr() -> Option<*mut u8> {
+ // Since this condition will always return true when TLS task storage is
+ // used (the default), the branch predictor will be able to optimize the
+ // branching and a dynamic dispatch will be avoided, which makes the
+ // compiler happier.
+ if core::is_get_ptr(0x1) {
+ Some(CURRENT_TASK.with(|c| c.get()))
+ } else {
+ core::get_ptr()
+ }
+}
+
+fn tls_slot() -> *const Cell<*mut u8> {
+ CURRENT_TASK.with(|c| c as *const _)
+}
+
+pub fn set<'a, F, R>(task: &BorrowedTask<'a>, f: F) -> R
+ where F: FnOnce() -> R
+{
+ // Lazily initialize the get / set ptrs
+ //
+ // Note that we won't actually use these functions ever, we'll instead be
+ // testing the pointer's value elsewhere and calling our own functions.
+ INIT.call_once(|| unsafe {
+ let get = mem::transmute::<usize, _>(0x1);
+ let set = mem::transmute::<usize, _>(0x2);
+ init(get, set);
+ });
+
+ // Same as above.
+ if core::is_get_ptr(0x1) {
+ struct Reset(*const Cell<*mut u8>, *mut u8);
+
+ impl Drop for Reset {
+ #[inline]
+ fn drop(&mut self) {
+ unsafe {
+ (*self.0).set(self.1);
+ }
+ }
+ }
+
+ unsafe {
+ let slot = tls_slot();
+ let _reset = Reset(slot, (*slot).get());
+ (*slot).set(task as *const _ as *mut u8);
+ f()
+ }
+ } else {
+ core::set(task, f)
+ }
+}
+
+#[derive(Copy, Clone)]
+#[allow(deprecated)]
+pub enum BorrowedUnpark<'a> {
+ Old(&'a Arc<Unpark>),
+ New(core::BorrowedUnpark<'a>),
+}
+
+#[derive(Copy, Clone)]
+#[allow(deprecated)]
+pub enum BorrowedEvents<'a> {
+ None,
+ One(&'a UnparkEvent, &'a BorrowedEvents<'a>),
+}
+
+#[derive(Clone)]
+pub enum TaskUnpark {
+ #[allow(deprecated)]
+ Old(Arc<Unpark>),
+ New(core::TaskUnpark),
+}
+
+#[derive(Clone)]
+#[allow(deprecated)]
+pub enum UnparkEvents {
+ None,
+ One(UnparkEvent),
+ Many(Box<[UnparkEvent]>),
+}
+
+impl<'a> BorrowedUnpark<'a> {
+ #[inline]
+ pub fn new(f: &'a Fn() -> NotifyHandle, id: usize) -> BorrowedUnpark<'a> {
+ BorrowedUnpark::New(core::BorrowedUnpark::new(f, id))
+ }
+
+ #[inline]
+ pub fn to_owned(&self) -> TaskUnpark {
+ match *self {
+ BorrowedUnpark::Old(old) => TaskUnpark::Old(old.clone()),
+ BorrowedUnpark::New(new) => TaskUnpark::New(new.to_owned()),
+ }
+ }
+}
+
+impl<'a> BorrowedEvents<'a> {
+ #[inline]
+ pub fn new() -> BorrowedEvents<'a> {
+ BorrowedEvents::None
+ }
+
+ #[inline]
+ pub fn to_owned(&self) -> UnparkEvents {
+ let mut one_event = None;
+ let mut list = Vec::new();
+ let mut cur = self;
+ while let BorrowedEvents::One(event, next) = *cur {
+ let event = event.clone();
+ match one_event.take() {
+ None if list.len() == 0 => one_event = Some(event),
+ None => list.push(event),
+ Some(event2) => {
+ list.push(event2);
+ list.push(event);
+ }
+ }
+ cur = next;
+ }
+
+ match one_event {
+ None if list.len() == 0 => UnparkEvents::None,
+ None => UnparkEvents::Many(list.into_boxed_slice()),
+ Some(e) => UnparkEvents::One(e),
+ }
+ }
+}
+
+impl UnparkEvents {
+ pub fn notify(&self) {
+ match *self {
+ UnparkEvents::None => {}
+ UnparkEvents::One(ref e) => e.unpark(),
+ UnparkEvents::Many(ref list) => {
+ for event in list.iter() {
+ event.unpark();
+ }
+ }
+ }
+ }
+
+ pub fn will_notify(&self, events: &BorrowedEvents) -> bool {
+ // Pessimistically assume that any unpark events mean that we're not
+ // equivalent to the current task.
+ match *self {
+ UnparkEvents::None => {}
+ _ => return false,
+ }
+
+ match *events {
+ BorrowedEvents::None => return true,
+ _ => {},
+ }
+
+ return false
+ }
+}
+
+#[allow(deprecated)]
+impl TaskUnpark {
+ pub fn notify(&self) {
+ match *self {
+ TaskUnpark::Old(ref old) => old.unpark(),
+ TaskUnpark::New(ref new) => new.notify(),
+ }
+ }
+
+ pub fn will_notify(&self, unpark: &BorrowedUnpark) -> bool {
+ match (unpark, self) {
+ (&BorrowedUnpark::Old(old1), &TaskUnpark::Old(ref old2)) => {
+ &**old1 as *const Unpark == &**old2 as *const Unpark
+ }
+ (&BorrowedUnpark::New(ref new1), &TaskUnpark::New(ref new2)) => {
+ new2.will_notify(new1)
+ }
+ _ => false,
+ }
+ }
+}
+
+impl<F: Future> Spawn<F> {
+ #[doc(hidden)]
+ #[deprecated(note = "recommended to use `poll_future_notify` instead")]
+ #[allow(deprecated)]
+ pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
+ self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll())
+ }
+
+ /// Waits for the internal future to complete, blocking this thread's
+ /// execution until it does.
+ ///
+ /// This function will call `poll_future` in a loop, waiting for the future
+ /// to complete. When a future cannot make progress it will use
+ /// `thread::park` to block the current thread.
+ pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
+ ThreadNotify::with_current(|notify| {
+
+ loop {
+ match self.poll_future_notify(notify, 0)? {
+ Async::NotReady => notify.park(),
+ Async::Ready(e) => return Ok(e),
+ }
+ }
+ })
+ }
+
+
+ #[doc(hidden)]
+ #[deprecated]
+ #[allow(deprecated)]
+ pub fn execute(self, exec: Arc<Executor>)
+ where F: Future<Item=(), Error=()> + Send + 'static,
+ {
+ exec.clone().execute(Run {
+ // Ideally this method would be defined directly on
+ // `Spawn<BoxFuture<(), ()>>` so we wouldn't have to box here and
+ // it'd be more explicit, but unfortunately that currently has a
+ // link error on nightly: rust-lang/rust#36155
+ spawn: spawn(Box::new(self.into_inner())),
+ inner: Arc::new(RunInner {
+ exec: exec,
+ mutex: UnparkMutex::new()
+ }),
+ })
+ }
+}
+
+impl<S: Stream> Spawn<S> {
+ #[deprecated(note = "recommended to use `poll_stream_notify` instead")]
+ #[allow(deprecated)]
+ #[doc(hidden)]
+ pub fn poll_stream(&mut self, unpark: Arc<Unpark>)
+ -> Poll<Option<S::Item>, S::Error> {
+ self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll())
+ }
+
+ /// Like `wait_future`, except only waits for the next element to arrive on
+ /// the underlying stream.
+ pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
+ ThreadNotify::with_current(|notify| {
+
+ loop {
+ match self.poll_stream_notify(notify, 0) {
+ Ok(Async::NotReady) => notify.park(),
+ Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
+ Ok(Async::Ready(None)) => return None,
+ Err(e) => return Some(Err(e)),
+ }
+ }
+ })
+ }
+}
+
+impl<S: Sink> Spawn<S> {
+ #[doc(hidden)]
+ #[deprecated(note = "recommended to use `start_send_notify` instead")]
+ #[allow(deprecated)]
+ pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>)
+ -> StartSend<S::SinkItem, S::SinkError> {
+ self.enter(BorrowedUnpark::Old(unpark), |s| s.start_send(value))
+ }
+
+ #[deprecated(note = "recommended to use `poll_flush_notify` instead")]
+ #[allow(deprecated)]
+ #[doc(hidden)]
+ pub fn poll_flush(&mut self, unpark: &Arc<Unpark>)
+ -> Poll<(), S::SinkError> {
+ self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete())
+ }
+
+ /// Blocks the current thread until it's able to send `value` on this sink.
+ ///
+ /// This function will send the `value` on the sink that this task wraps. If
+ /// the sink is not ready to send the value yet then the current thread will
+ /// be blocked until it's able to send the value.
+ pub fn wait_send(&mut self, mut value: S::SinkItem)
+ -> Result<(), S::SinkError> {
+ ThreadNotify::with_current(|notify| {
+
+ loop {
+ value = match self.start_send_notify(value, notify, 0)? {
+ AsyncSink::NotReady(v) => v,
+ AsyncSink::Ready => return Ok(()),
+ };
+ notify.park();
+ }
+ })
+ }
+
+ /// Blocks the current thread until it's able to flush this sink.
+ ///
+ /// This function will call the underlying sink's `poll_complete` method
+ /// until it returns that it's ready, proxying out errors upwards to the
+ /// caller if one occurs.
+ ///
+ /// The thread will be blocked until `poll_complete` returns that it's
+ /// ready.
+ pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
+ ThreadNotify::with_current(|notify| {
+
+ loop {
+ if self.poll_flush_notify(notify, 0)?.is_ready() {
+ return Ok(())
+ }
+ notify.park();
+ }
+ })
+ }
+
+ /// Blocks the current thread until it's able to close this sink.
+ ///
+ /// This function will close the sink that this task wraps. If the sink
+ /// is not ready to be close yet, then the current thread will be blocked
+ /// until it's closed.
+ pub fn wait_close(&mut self) -> Result<(), S::SinkError> {
+ ThreadNotify::with_current(|notify| {
+
+ loop {
+ if self.close_notify(notify, 0)?.is_ready() {
+ return Ok(())
+ }
+ notify.park();
+ }
+ })
+ }
+}
+
+/// A trait which represents a sink of notifications that a future is ready to
+/// make progress.
+///
+/// This trait is provided as an argument to the `Spawn::poll_future` and
+/// `Spawn::poll_stream` functions. It's transitively used as part of the
+/// `Task::unpark` method to internally deliver notifications of readiness of a
+/// future to move forward.
+#[deprecated(note = "recommended to use `Notify` instead")]
+pub trait Unpark: Send + Sync {
+ /// Indicates that an associated future and/or task are ready to make
+ /// progress.
+ ///
+ /// Typically this means that the receiver of the notification should
+ /// arrange for the future to get poll'd in a prompt fashion.
+ fn unpark(&self);
+}
+
+/// A trait representing requests to poll futures.
+///
+/// This trait is an argument to the `Spawn::execute` which is used to run a
+/// future to completion. An executor will receive requests to run a future and
+/// an executor is responsible for ensuring that happens in a timely fashion.
+///
+/// Note that this trait is likely to be deprecated and/or renamed to avoid
+/// clashing with the `future::Executor` trait. If you've got a use case for
+/// this or would like to comment on the name please let us know!
+#[deprecated]
+#[allow(deprecated)]
+pub trait Executor: Send + Sync + 'static {
+ /// Requests that `Run` is executed soon on the given executor.
+ fn execute(&self, r: Run);
+}
+
+/// Units of work submitted to an `Executor`, currently only created
+/// internally.
+#[deprecated]
+pub struct Run {
+ spawn: Spawn<Box<Future<Item = (), Error = ()> + Send>>,
+ inner: Arc<RunInner>,
+}
+
+#[allow(deprecated)]
+struct RunInner {
+ mutex: UnparkMutex<Run>,
+ exec: Arc<Executor>,
+}
+
+#[allow(deprecated)]
+impl Run {
+ /// Actually run the task (invoking `poll` on its future) on the current
+ /// thread.
+ pub fn run(self) {
+ let Run { mut spawn, inner } = self;
+
+ // SAFETY: the ownership of this `Run` object is evidence that
+ // we are in the `POLLING`/`REPOLL` state for the mutex.
+ unsafe {
+ inner.mutex.start_poll();
+
+ loop {
+ match spawn.poll_future_notify(&inner, 0) {
+ Ok(Async::NotReady) => {}
+ Ok(Async::Ready(())) |
+ Err(()) => return inner.mutex.complete(),
+ }
+ let run = Run { spawn: spawn, inner: inner.clone() };
+ match inner.mutex.wait(run) {
+ Ok(()) => return, // we've waited
+ Err(r) => spawn = r.spawn, // someone's notified us
+ }
+ }
+ }
+ }
+}
+
+#[allow(deprecated)]
+impl fmt::Debug for Run {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Run")
+ .field("contents", &"...")
+ .finish()
+ }
+}
+
+#[allow(deprecated)]
+impl Notify for RunInner {
+ fn notify(&self, _id: usize) {
+ match self.mutex.notify() {
+ Ok(run) => self.exec.execute(run),
+ Err(()) => {}
+ }
+ }
+}
+
+// ===== ThreadNotify =====
+
+struct ThreadNotify {
+ state: AtomicUsize,
+ mutex: Mutex<()>,
+ condvar: Condvar,
+}
+
+const IDLE: usize = 0;
+const NOTIFY: usize = 1;
+const SLEEP: usize = 2;
+
+thread_local! {
+ static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
+ state: AtomicUsize::new(IDLE),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ });
+}
+
+impl ThreadNotify {
+ fn with_current<F, R>(f: F) -> R
+ where F: FnOnce(&Arc<ThreadNotify>) -> R,
+ {
+ CURRENT_THREAD_NOTIFY.with(|notify| f(notify))
+ }
+
+ fn park(&self) {
+ // If currently notified, then we skip sleeping. This is checked outside
+ // of the lock to avoid acquiring a mutex if not necessary.
+ match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
+ NOTIFY => return,
+ IDLE => {},
+ _ => unreachable!(),
+ }
+
+ // The state is currently idle, so obtain the lock and then try to
+ // transition to a sleeping state.
+ let mut m = self.mutex.lock().unwrap();
+
+ // Transition to sleeping
+ match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
+ NOTIFY => {
+ // Notified before we could sleep, consume the notification and
+ // exit
+ self.state.store(IDLE, Ordering::SeqCst);
+ return;
+ }
+ IDLE => {},
+ _ => unreachable!(),
+ }
+
+ // Loop until we've been notified
+ loop {
+ m = self.condvar.wait(m).unwrap();
+
+ // Transition back to idle, loop otherwise
+ if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
+ return;
+ }
+ }
+ }
+}
+
+impl Notify for ThreadNotify {
+ fn notify(&self, _unpark_id: usize) {
+ // First, try transitioning from IDLE -> NOTIFY, this does not require a
+ // lock.
+ match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
+ IDLE | NOTIFY => return,
+ SLEEP => {}
+ _ => unreachable!(),
+ }
+
+ // The other half is sleeping, this requires a lock
+ let _m = self.mutex.lock().unwrap();
+
+ // Transition from SLEEP -> NOTIFY
+ match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
+ SLEEP => {}
+ _ => return,
+ }
+
+ // Wakeup the sleeper
+ self.condvar.notify_one();
+ }
+}
+
+// ===== UnparkEvent =====
+
+/// For the duration of the given callback, add an "unpark event" to be
+/// triggered when the task handle is used to unpark the task.
+///
+/// Unpark events are used to pass information about what event caused a task to
+/// be unparked. In some cases, tasks are waiting on a large number of possible
+/// events, and need precise information about the wakeup to avoid extraneous
+/// polling.
+///
+/// Every `Task` handle comes with a set of unpark events which will fire when
+/// `unpark` is called. When fired, these events insert an identifier into a
+/// concurrent set, which the task can read from to determine what events
+/// occurred.
+///
+/// This function immediately invokes the closure, `f`, but arranges things so
+/// that `task::park` will produce a `Task` handle that includes the given
+/// unpark event.
+///
+/// # Panics
+///
+/// This function will panic if a task is not currently being executed. That
+/// is, this method can be dangerous to call outside of an implementation of
+/// `poll`.
+#[deprecated(note = "recommended to use `FuturesUnordered` instead")]
+#[allow(deprecated)]
+pub fn with_unpark_event<F, R>(event: UnparkEvent, f: F) -> R
+ where F: FnOnce() -> R
+{
+ super::with(|task| {
+ let new_task = BorrowedTask {
+ id: task.id,
+ unpark: task.unpark,
+ events: BorrowedEvents::One(&event, &task.events),
+ map: task.map,
+ };
+
+ super::set(&new_task, f)
+ })
+}
+
+/// A set insertion to trigger upon `unpark`.
+///
+/// Unpark events are used to communicate information about *why* an unpark
+/// occurred, in particular populating sets with event identifiers so that the
+/// unparked task can avoid extraneous polling. See `with_unpark_event` for
+/// more.
+#[derive(Clone)]
+#[deprecated(note = "recommended to use `FuturesUnordered` instead")]
+#[allow(deprecated)]
+pub struct UnparkEvent {
+ set: Arc<EventSet>,
+ item: usize,
+}
+
+#[allow(deprecated)]
+impl UnparkEvent {
+ /// Construct an unpark event that will insert `id` into `set` when
+ /// triggered.
+ #[deprecated(note = "recommended to use `FuturesUnordered` instead")]
+ pub fn new(set: Arc<EventSet>, id: usize) -> UnparkEvent {
+ UnparkEvent {
+ set: set,
+ item: id,
+ }
+ }
+
+ fn unpark(&self) {
+ self.set.insert(self.item);
+ }
+}
+
+#[allow(deprecated)]
+impl fmt::Debug for UnparkEvent {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("UnparkEvent")
+ .field("set", &"...")
+ .field("item", &self.item)
+ .finish()
+ }
+}
+
+/// A concurrent set which allows for the insertion of `usize` values.
+///
+/// `EventSet`s are used to communicate precise information about the event(s)
+/// that triggered a task notification. See `task::with_unpark_event` for details.
+#[deprecated(since="0.1.18", note = "recommended to use `FuturesUnordered` instead")]
+pub trait EventSet: Send + Sync + 'static {
+ /// Insert the given ID into the set
+ fn insert(&self, id: usize);
+}
+
+// Safe implementation of `UnsafeNotify` for `Arc` in the standard library.
+//
+// Note that this is a very unsafe implementation! The crucial pieces is that
+// these two values are considered equivalent:
+//
+// * Arc<T>
+// * *const ArcWrapped<T>
+//
+// We don't actually know the layout of `ArcWrapped<T>` as it's an
+// implementation detail in the standard library. We can work, though, by
+// casting it through and back an `Arc<T>`.
+//
+// This also means that you won't actually fine `UnsafeNotify for Arc<T>`
+// because it's the wrong level of indirection. These methods are sort of
+// receiving Arc<T>, but not an owned version. It's... complicated. We may be
+// one of the first users of unsafe trait objects!
+
+struct ArcWrapped<T>(PhantomData<T>);
+
+impl<T: Notify + 'static> Notify for ArcWrapped<T> {
+ fn notify(&self, id: usize) {
+ unsafe {
+ let me: *const ArcWrapped<T> = self;
+ T::notify(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
+ id)
+ }
+ }
+
+ fn clone_id(&self, id: usize) -> usize {
+ unsafe {
+ let me: *const ArcWrapped<T> = self;
+ T::clone_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
+ id)
+ }
+ }
+
+ fn drop_id(&self, id: usize) {
+ unsafe {
+ let me: *const ArcWrapped<T> = self;
+ T::drop_id(&*(&me as *const *const ArcWrapped<T> as *const Arc<T>),
+ id)
+ }
+ }
+}
+
+unsafe impl<T: Notify + 'static> UnsafeNotify for ArcWrapped<T> {
+ unsafe fn clone_raw(&self) -> NotifyHandle {
+ let me: *const ArcWrapped<T> = self;
+ let arc = (*(&me as *const *const ArcWrapped<T> as *const Arc<T>)).clone();
+ NotifyHandle::from(arc)
+ }
+
+ unsafe fn drop_raw(&self) {
+ let mut me: *const ArcWrapped<T> = self;
+ let me = &mut me as *mut *const ArcWrapped<T> as *mut Arc<T>;
+ ptr::drop_in_place(me);
+ }
+}
+
+impl<T> From<Arc<T>> for NotifyHandle
+ where T: Notify + 'static,
+{
+ fn from(rc: Arc<T>) -> NotifyHandle {
+ unsafe {
+ let ptr = mem::transmute::<Arc<T>, *mut ArcWrapped<T>>(rc);
+ NotifyHandle::new(ptr)
+ }
+ }
+}
+
+#[cfg(feature = "nightly")]
+mod nightly {
+ use super::{TaskUnpark, UnparkEvents};
+ use core::marker::Unpin;
+
+ impl Unpin for TaskUnpark {}
+ impl Unpin for UnparkEvents {}
+}
diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs
new file mode 100644
index 0000000000..51bb44878d
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/task_impl/std/task_rc.rs
@@ -0,0 +1,129 @@
+#![cfg(feature = "with-deprecated")]
+#![allow(deprecated)]
+#![deprecated(since = "0.1.4",
+ note = "replaced with `BiLock` in many cases, otherwise slated \
+ for removal due to confusion")]
+
+use std::prelude::v1::*;
+use std::sync::Arc;
+use std::cell::UnsafeCell;
+use task_impl;
+
+// One critical piece of this module's contents are the `TaskRc<A>` handles.
+// The purpose of this is to conceptually be able to store data in a task,
+// allowing it to be accessed within multiple futures at once. For example if
+// you have some concurrent futures working, they may all want mutable access to
+// some data. We already know that when the futures are being poll'd that we're
+// entirely synchronized (aka `&mut Task`), so you shouldn't require an
+// `Arc<Mutex<T>>` to share as the synchronization isn't necessary!
+//
+// So the idea here is that you insert data into a task via `Task::insert`, and
+// a handle to that data is then returned to you. That handle can later get
+// presented to the task itself to actually retrieve the underlying data. The
+// invariant is that the data can only ever be accessed with the task present,
+// and the lifetime of the actual data returned is connected to the lifetime of
+// the task itself.
+//
+// Conceptually I at least like to think of this as "dynamically adding more
+// struct fields to a `Task`". Each call to insert creates a new "name" for the
+// struct field, a `TaskRc<A>`, and then you can access the fields of a struct
+// with the struct itself (`Task`) as well as the name of the field
+// (`TaskRc<A>`). If that analogy doesn't make sense then oh well, it at least
+// helped me!
+//
+// So anyway, we do some interesting trickery here to actually get it to work.
+// Each `TaskRc<A>` handle stores `Arc<UnsafeCell<A>>`. So it turns out, we're
+// not even adding data to the `Task`! Each `TaskRc<A>` contains a reference
+// to this `Arc`, and `TaskRc` handles can be cloned which just bumps the
+// reference count on the `Arc` itself.
+//
+// As before, though, you can present the `Arc` to a `Task` and if they
+// originated from the same place you're allowed safe access to the internals.
+// We allow but shared and mutable access without the `Sync` bound on the data,
+// crucially noting that a `Task` itself is not `Sync`.
+//
+// So hopefully I've convinced you of this point that the `get` and `get_mut`
+// methods below are indeed safe. The data is always valid as it's stored in an
+// `Arc`, and access is only allowed with the proof of the associated `Task`.
+// One thing you might be asking yourself though is what exactly is this "proof
+// of a task"? Right now it's a `usize` corresponding to the `Task`'s
+// `TaskHandle` arc allocation.
+//
+// Wait a minute, isn't that the ABA problem! That is, we create a task A, add
+// some data to it, destroy task A, do some work, create a task B, and then ask
+// to get the data from task B. In this case though the point of the
+// `task_inner` "proof" field is simply that there's some non-`Sync` token
+// proving that you can get access to the data. So while weird, this case should
+// still be safe, as the data's not stored in the task itself.
+
+/// A reference to a piece of data that's accessible only within a specific
+/// `Task`.
+///
+/// This data is `Send` even when `A` is not `Sync`, because the data stored
+/// within is accessed in a single-threaded way. The thread accessing it may
+/// change over time, if the task migrates, so `A` must be `Send`.
+#[derive(Debug)]
+pub struct TaskRc<A> {
+ task: task_impl::Task,
+ ptr: Arc<UnsafeCell<A>>,
+}
+
+// for safety here, see docs at the top of this module
+unsafe impl<A: Send> Send for TaskRc<A> {}
+unsafe impl<A: Sync> Sync for TaskRc<A> {}
+
+impl<A> TaskRc<A> {
+ /// Inserts a new piece of task-local data into this task, returning a
+ /// reference to it.
+ ///
+ /// Ownership of the data will be transferred to the task, and the data will
+ /// be destroyed when the task itself is destroyed. The returned value can
+ /// be passed to the `with` method to get a reference back to the original
+ /// data.
+ ///
+ /// Note that the returned handle is cloneable and copyable and can be sent
+ /// to other futures which will be associated with the same task. All
+ /// futures will then have access to this data when passed the reference
+ /// back.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if a task is not currently running.
+ pub fn new(a: A) -> TaskRc<A> {
+ TaskRc {
+ task: task_impl::park(),
+ ptr: Arc::new(UnsafeCell::new(a)),
+ }
+ }
+
+ /// Operate with a reference to the underlying data.
+ ///
+ /// This method should be passed a handle previously returned by
+ /// `Task::insert`. That handle, when passed back into this method, will
+ /// retrieve a reference to the original data.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if a task is not currently running or if `self`
+ /// does not belong to the task that is currently running. That is, if
+ /// another task generated the `data` handle passed in, this method will
+ /// panic.
+ pub fn with<F, R>(&self, f: F) -> R
+ where F: FnOnce(&A) -> R
+ {
+ if !self.task.is_current() {
+ panic!("TaskRc being accessed on task it does not belong to");
+ }
+
+ f(unsafe { &*self.ptr.get() })
+ }
+}
+
+impl<A> Clone for TaskRc<A> {
+ fn clone(&self) -> TaskRc<A> {
+ TaskRc {
+ task: self.task.clone(),
+ ptr: self.ptr.clone(),
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs b/third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs
new file mode 100644
index 0000000000..246def2753
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/task_impl/std/unpark_mutex.rs
@@ -0,0 +1,144 @@
+use std::cell::UnsafeCell;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// A "lock" around data `D`, which employs a *helping* strategy.
+///
+/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being
+/// invoked on only a single thread at a time (2) `poll` being invoked at least
+/// once after each `unpark` (unless the future has completed).
+pub struct UnparkMutex<D> {
+ // The state of task execution (state machine described below)
+ status: AtomicUsize,
+
+ // The actual task data, accessible only in the POLLING state
+ inner: UnsafeCell<Option<D>>,
+}
+
+// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on
+// acquisition failure, the current lock holder performs the desired work --
+// re-polling.
+//
+// As such, these impls mirror those for `Mutex<D>`. In particular, a reference
+// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which
+// must therefore be `Send`.
+unsafe impl<D: Send> Send for UnparkMutex<D> {}
+unsafe impl<D: Send> Sync for UnparkMutex<D> {}
+
+// There are four possible task states, listed below with their possible
+// transitions:
+
+// The task is blocked, waiting on an event
+const WAITING: usize = 0; // --> POLLING
+
+// The task is actively being polled by a thread; arrival of additional events
+// of interest should move it to the REPOLL state
+const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE
+
+// The task is actively being polled, but will need to be re-polled upon
+// completion to ensure that all events were observed.
+const REPOLL: usize = 2; // --> POLLING
+
+// The task has finished executing (either successfully or with an error/panic)
+const COMPLETE: usize = 3; // No transitions out
+
+impl<D> UnparkMutex<D> {
+ pub fn new() -> UnparkMutex<D> {
+ UnparkMutex {
+ status: AtomicUsize::new(WAITING),
+ inner: UnsafeCell::new(None),
+ }
+ }
+
+ /// Attempt to "notify" the mutex that a poll should occur.
+ ///
+ /// An `Ok` result indicates that the `POLLING` state has been entered, and
+ /// the caller can proceed to poll the future. An `Err` result indicates
+ /// that polling is not necessary (because the task is finished or the
+ /// polling has been delegated).
+ pub fn notify(&self) -> Result<D, ()> {
+ let mut status = self.status.load(SeqCst);
+ loop {
+ match status {
+ // The task is idle, so try to run it immediately.
+ WAITING => {
+ match self.status.compare_exchange(WAITING, POLLING,
+ SeqCst, SeqCst) {
+ Ok(_) => {
+ let data = unsafe {
+ // SAFETY: we've ensured mutual exclusion via
+ // the status protocol; we are the only thread
+ // that has transitioned to the POLLING state,
+ // and we won't transition back to QUEUED until
+ // the lock is "released" by this thread. See
+ // the protocol diagram above.
+ (*self.inner.get()).take().unwrap()
+ };
+ return Ok(data);
+ }
+ Err(cur) => status = cur,
+ }
+ }
+
+ // The task is being polled, so we need to record that it should
+ // be *repolled* when complete.
+ POLLING => {
+ match self.status.compare_exchange(POLLING, REPOLL,
+ SeqCst, SeqCst) {
+ Ok(_) => return Err(()),
+ Err(cur) => status = cur,
+ }
+ }
+
+ // The task is already scheduled for polling, or is complete, so
+ // we've got nothing to do.
+ _ => return Err(()),
+ }
+ }
+ }
+
+ /// Alert the mutex that polling is about to begin, clearing any accumulated
+ /// re-poll requests.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub unsafe fn start_poll(&self) {
+ self.status.store(POLLING, SeqCst);
+ }
+
+ /// Alert the mutex that polling completed with NotReady.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub unsafe fn wait(&self, data: D) -> Result<(), D> {
+ *self.inner.get() = Some(data);
+
+ match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
+ // no unparks came in while we were running
+ Ok(_) => Ok(()),
+
+ // guaranteed to be in REPOLL state; just clobber the
+ // state and run again.
+ Err(status) => {
+ assert_eq!(status, REPOLL);
+ self.status.store(POLLING, SeqCst);
+ Err((*self.inner.get()).take().unwrap())
+ }
+ }
+ }
+
+ /// Alert the mutex that the task has completed execution and should not be
+ /// notified again.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub unsafe fn complete(&self) {
+ self.status.store(COMPLETE, SeqCst);
+ }
+}