summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/task
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-threadpool/src/task
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/task')
-rw-r--r--third_party/rust/tokio-threadpool/src/task/blocking.rs496
-rw-r--r--third_party/rust/tokio-threadpool/src/task/blocking_state.rs89
-rw-r--r--third_party/rust/tokio-threadpool/src/task/mod.rs308
-rw-r--r--third_party/rust/tokio-threadpool/src/task/state.rs57
4 files changed, 950 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/task/blocking.rs b/third_party/rust/tokio-threadpool/src/task/blocking.rs
new file mode 100644
index 0000000000..ded59edfef
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/blocking.rs
@@ -0,0 +1,496 @@
+use pool::Pool;
+use task::{BlockingState, Task};
+
+use futures::{Async, Poll};
+
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::ptr;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::Arc;
+use std::thread;
+
+/// Manages the state around entering a blocking section and tasks that are
+/// queued pending the ability to block.
+///
+/// This is a hybrid counter and intrusive mpsc channel (like `Queue`).
+#[derive(Debug)]
+pub(crate) struct Blocking {
+ /// Queue head.
+ ///
+ /// This is either the current remaining capacity for blocking sections
+ /// **or** if the max has been reached, the head of a pending blocking
+ /// capacity channel of tasks.
+ ///
+ /// When this points to a task, it represents a strong reference, i.e.
+ /// `Arc<Task>`.
+ state: AtomicUsize,
+
+ /// Tail pointer. This is `Arc<Task>` unless it points to `stub`.
+ tail: UnsafeCell<*mut Task>,
+
+ /// Stub pointer, used as part of the intrusive mpsc channel algorithm
+ /// described by 1024cores.
+ stub: Box<Task>,
+
+ /// The channel algorithm is MPSC. This means that, in order to pop tasks,
+ /// coordination is required.
+ ///
+ /// Since it doesn't matter *which* task pops & notifies the queued task, we
+ /// can avoid a full mutex and make the "lock" lock free.
+ ///
+ /// Instead, threads race to set the "entered" bit. When the transition is
+ /// successfully made, the thread has permission to pop tasks off of the
+ /// queue. If a thread loses the race, instead of waiting to pop a task, it
+ /// signals to the winning thread that it should pop an additional task.
+ lock: AtomicUsize,
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) enum CanBlock {
+ /// Blocking capacity has been allocated to this task.
+ ///
+ /// The capacity allocation is initially checked before a task is polled. If
+ /// capacity has been allocated, it is consumed and tracked as `Allocated`.
+ Allocated,
+
+ /// Allocation capacity must be either available to the task when it is
+ /// polled or not available. This means that a task can only ask for
+ /// capacity once. This state is used to track a task that has not yet asked
+ /// for blocking capacity. When a task needs blocking capacity, if it is in
+ /// this state, it can immediately try to get an allocation.
+ CanRequest,
+
+ /// The task has requested blocking capacity, but none is available.
+ NoCapacity,
+}
+
+/// Decorates the `usize` value of `Blocking::state`, providing fns to
+/// manipulate the state instead of requiring bit ops.
+#[derive(Copy, Clone, Eq, PartialEq)]
+struct State(usize);
+
+/// Flag differentiating between remaining capacity and task pointers.
+///
+/// If we assume pointers are properly aligned, then the least significant bit
+/// will always be zero. So, we use that bit to track if the value represents a
+/// number.
+const NUM_FLAG: usize = 1;
+
+/// When representing "numbers", the state has to be shifted this much (to get
+/// rid of the flag bit).
+const NUM_SHIFT: usize = 1;
+
+// ====== impl Blocking =====
+//
+impl Blocking {
+ /// Create a new `Blocking`.
+ pub fn new(capacity: usize) -> Blocking {
+ assert!(capacity > 0, "blocking capacity must be greater than zero");
+
+ let stub = Box::new(Task::stub());
+ let ptr = &*stub as *const _ as *mut _;
+
+ // Allocations are aligned
+ debug_assert!(ptr as usize & NUM_FLAG == 0);
+
+ // The initial state value. This starts at the max capacity.
+ let init = State::new(capacity);
+
+ Blocking {
+ state: AtomicUsize::new(init.into()),
+ tail: UnsafeCell::new(ptr),
+ stub: stub,
+ lock: AtomicUsize::new(0),
+ }
+ }
+
+ /// Atomically either acquire blocking capacity or queue the task to be
+ /// notified once capacity becomes available.
+ ///
+ /// The caller must ensure that `task` has not previously been queued to be
+ /// notified when capacity becomes available.
+ pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
+ // This requires atomically claiming blocking capacity and if none is
+ // available, queuing &task.
+
+ // The task cannot be queued at this point. The caller must ensure this.
+ debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued());
+
+ // Don't bump the ref count unless necessary.
+ let mut strong: Option<*const Task> = None;
+
+ // Load the state
+ let mut curr: State = self.state.load(Acquire).into();
+
+ loop {
+ let mut next = curr;
+
+ if !next.claim_capacity(&self.stub) {
+ debug_assert!(curr.ptr().is_some());
+
+ // Unable to claim capacity, so we must queue `task` onto the
+ // channel.
+ //
+ // This guard also serves to ensure that queuing work that is
+ // only needed to run once only gets run once.
+ if strong.is_none() {
+ // First, transition the task to a "queued" state. This
+ // prevents double queuing.
+ //
+ // This is also the only thread that can set the queued flag
+ // at this point. And, the goal is for this to only be
+ // visible when the task node is polled from the channel.
+ // The memory ordering is established by MPSC queue
+ // operation.
+ //
+ // Note that, if the task doesn't get queued (because the
+ // CAS fails and capacity is now available) then this flag
+ // must be unset. Again, there is no race because until the
+ // task is queued, no other thread can see it.
+ let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
+ debug_assert!(!prev.is_queued());
+
+ // Bump the ref count
+ strong = Some(Arc::into_raw(task.clone()));
+
+ // Set the next pointer. This does not require an atomic
+ // operation as this node is not currently accessible to
+ // other threads via the queue.
+ task.next_blocking.store(ptr::null_mut(), Relaxed);
+ }
+
+ let ptr = strong.unwrap();
+
+ // Update the head to point to the new node. We need to see the
+ // previous node in order to update the next pointer as well as
+ // release `task` to any other threads calling `push`.
+ next.set_ptr(ptr);
+ }
+
+ debug_assert_ne!(curr.0, 0);
+ debug_assert_ne!(next.0, 0);
+
+ let actual = self
+ .state
+ .compare_and_swap(curr.into(), next.into(), AcqRel)
+ .into();
+
+ if curr == actual {
+ break;
+ }
+
+ curr = actual;
+ }
+
+ match curr.ptr() {
+ Some(prev) => {
+ let ptr = strong.unwrap();
+
+ // Finish pushing
+ unsafe {
+ (*prev).next_blocking.store(ptr as *mut _, Release);
+ }
+
+ // The node was queued to be notified once capacity is made
+ // available.
+ Ok(Async::NotReady)
+ }
+ None => {
+ debug_assert!(curr.remaining_capacity() > 0);
+
+ // If `strong` is set, gotta undo a bunch of work
+ if let Some(ptr) = strong {
+ let _ = unsafe { Arc::from_raw(ptr) };
+
+ // Unset the queued flag.
+ let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
+ debug_assert!(prev.is_queued());
+ }
+
+ // Capacity has been obtained
+ Ok(().into())
+ }
+ }
+ }
+
+ unsafe fn push_stub(&self) {
+ let task: *mut Task = &*self.stub as *const _ as *mut _;
+
+ // Set the next pointer. This does not require an atomic operation as
+ // this node is not accessible. The write will be flushed with the next
+ // operation
+ (*task).next_blocking.store(ptr::null_mut(), Relaxed);
+
+ // Update the head to point to the new node. We need to see the previous
+ // node in order to update the next pointer as well as release `task`
+ // to any other threads calling `push`.
+ let prev = self.state.swap(task as usize, AcqRel);
+
+ // The stub is only pushed when there are pending tasks. Because of
+ // this, the state must *always* be in pointer mode.
+ debug_assert!(State::from(prev).is_ptr());
+
+ let prev = prev as *const Task;
+
+ // We don't want the *existing* pointer to be a stub.
+ debug_assert_ne!(prev, task);
+
+ // Release `task` to the consume end.
+ (*prev).next_blocking.store(task, Release);
+ }
+
+ pub fn notify_task(&self, pool: &Arc<Pool>) {
+ let prev = self.lock.fetch_add(1, AcqRel);
+
+ if prev != 0 {
+ // Another thread has the lock and will be responsible for notifying
+ // pending tasks.
+ return;
+ }
+
+ let mut dec = 1;
+
+ loop {
+ let mut remaining_pops = dec;
+ while remaining_pops > 0 {
+ remaining_pops -= 1;
+
+ let task = match self.pop(remaining_pops) {
+ Some(t) => t,
+ None => break,
+ };
+
+ Task::notify_blocking(task, pool);
+ }
+
+ // Decrement the number of handled notifications
+ let actual = self.lock.fetch_sub(dec, AcqRel);
+
+ if actual == dec {
+ break;
+ }
+
+ // This can only be greater than expected as we are the only thread
+ // that is decrementing.
+ debug_assert!(actual > dec);
+ dec = actual - dec;
+ }
+ }
+
+ /// Pop a task
+ ///
+ /// `rem` represents the remaining number of times the caller will pop. If
+ /// there are no more tasks to pop, `rem` is used to set the remaining
+ /// capacity.
+ fn pop(&self, rem: usize) -> Option<Arc<Task>> {
+ 'outer: loop {
+ unsafe {
+ let mut tail = *self.tail.get();
+ let mut next = (*tail).next_blocking.load(Acquire);
+
+ let stub = &*self.stub as *const _ as *mut _;
+
+ if tail == stub {
+ if next.is_null() {
+ // This loop is not part of the standard intrusive mpsc
+ // channel algorithm. This is where we atomically pop
+ // the last task and add `rem` to the remaining capacity.
+ //
+ // This modification to the pop algorithm works because,
+ // at this point, we have not done any work (only done
+ // reading). We have a *pretty* good idea that there is
+ // no concurrent pusher.
+ //
+ // The capacity is then atomically added by doing an
+ // AcqRel CAS on `state`. The `state` cell is the
+ // linchpin of the algorithm.
+ //
+ // By successfully CASing `head` w/ AcqRel, we ensure
+ // that, if any thread was racing and entered a push, we
+ // see that and abort pop, retrying as it is
+ // "inconsistent".
+ let mut curr: State = self.state.load(Acquire).into();
+
+ loop {
+ if curr.has_task(&self.stub) {
+ // Inconsistent state, yield the thread and try
+ // again.
+ thread::yield_now();
+ continue 'outer;
+ }
+
+ let mut after = curr;
+
+ // +1 here because `rem` represents the number of
+ // pops that will come after the current one.
+ after.add_capacity(rem + 1, &self.stub);
+
+ let actual: State = self
+ .state
+ .compare_and_swap(curr.into(), after.into(), AcqRel)
+ .into();
+
+ if actual == curr {
+ // Successfully returned the remaining capacity
+ return None;
+ }
+
+ curr = actual;
+ }
+ }
+
+ *self.tail.get() = next;
+ tail = next;
+ next = (*next).next_blocking.load(Acquire);
+ }
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+
+ // No ref_count inc is necessary here as this poll is paired
+ // with a `push` which "forgets" the handle.
+ return Some(Arc::from_raw(tail));
+ }
+
+ let state = self.state.load(Acquire);
+
+ // This must always be a pointer
+ debug_assert!(State::from(state).is_ptr());
+
+ if state != tail as usize {
+ // Try again
+ thread::yield_now();
+ continue 'outer;
+ }
+
+ self.push_stub();
+
+ next = (*tail).next_blocking.load(Acquire);
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+
+ return Some(Arc::from_raw(tail));
+ }
+
+ thread::yield_now();
+ // Try again
+ }
+ }
+ }
+}
+
+// ====== impl State =====
+
+impl State {
+ /// Return a new `State` representing the remaining capacity at the maximum
+ /// value.
+ fn new(capacity: usize) -> State {
+ State((capacity << NUM_SHIFT) | NUM_FLAG)
+ }
+
+ fn remaining_capacity(&self) -> usize {
+ if !self.has_remaining_capacity() {
+ return 0;
+ }
+
+ self.0 >> 1
+ }
+
+ fn has_remaining_capacity(&self) -> bool {
+ self.0 & NUM_FLAG == NUM_FLAG
+ }
+
+ fn has_task(&self, stub: &Task) -> bool {
+ !(self.has_remaining_capacity() || self.is_stub(stub))
+ }
+
+ fn is_stub(&self, stub: &Task) -> bool {
+ self.0 == stub as *const _ as usize
+ }
+
+ /// Try to claim blocking capacity.
+ ///
+ /// # Return
+ ///
+ /// Returns `true` if the capacity was claimed, `false` otherwise. If
+ /// `false` is returned, it can be assumed that `State` represents the head
+ /// pointer in the mpsc channel.
+ fn claim_capacity(&mut self, stub: &Task) -> bool {
+ if !self.has_remaining_capacity() {
+ return false;
+ }
+
+ debug_assert!(self.0 != 1);
+
+ self.0 -= 1 << NUM_SHIFT;
+
+ if self.0 == NUM_FLAG {
+ // Set the state to the stub pointer.
+ self.0 = stub as *const _ as usize;
+ }
+
+ true
+ }
+
+ /// Add blocking capacity.
+ fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool {
+ debug_assert!(capacity > 0);
+
+ if self.is_stub(stub) {
+ self.0 = (capacity << NUM_SHIFT) | NUM_FLAG;
+ true
+ } else if self.has_remaining_capacity() {
+ self.0 += capacity << NUM_SHIFT;
+ true
+ } else {
+ false
+ }
+ }
+
+ fn is_ptr(&self) -> bool {
+ self.0 & NUM_FLAG == 0
+ }
+
+ fn ptr(&self) -> Option<*const Task> {
+ if self.is_ptr() {
+ Some(self.0 as *const Task)
+ } else {
+ None
+ }
+ }
+
+ fn set_ptr(&mut self, ptr: *const Task) {
+ let ptr = ptr as usize;
+ debug_assert!(ptr & NUM_FLAG == 0);
+ self.0 = ptr
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let mut fmt = fmt.debug_struct("State");
+
+ if self.is_ptr() {
+ fmt.field("ptr", &self.0);
+ } else {
+ fmt.field("remaining", &self.remaining_capacity());
+ }
+
+ fmt.finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/blocking_state.rs b/third_party/rust/tokio-threadpool/src/task/blocking_state.rs
new file mode 100644
index 0000000000..b41fc4868d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/blocking_state.rs
@@ -0,0 +1,89 @@
+use task::CanBlock;
+
+use std::fmt;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+/// State tracking task level state to support `blocking`.
+///
+/// This tracks two separate flags.
+///
+/// a) If the task is queued in the pending blocking channel. This prevents
+/// double queuing (which would break the linked list).
+///
+/// b) If the task has been allocated capacity to block.
+#[derive(Eq, PartialEq)]
+pub(crate) struct BlockingState(usize);
+
+const QUEUED: usize = 0b01;
+const ALLOCATED: usize = 0b10;
+
+impl BlockingState {
+ /// Create a new, default, `BlockingState`.
+ pub fn new() -> BlockingState {
+ BlockingState(0)
+ }
+
+ /// Returns `true` if the state represents the associated task being queued
+ /// in the pending blocking capacity channel
+ pub fn is_queued(&self) -> bool {
+ self.0 & QUEUED == QUEUED
+ }
+
+ /// Toggle the queued flag
+ ///
+ /// Returns the state before the flag has been toggled.
+ pub fn toggle_queued(state: &AtomicUsize, ordering: Ordering) -> BlockingState {
+ state.fetch_xor(QUEUED, ordering).into()
+ }
+
+ /// Returns `true` if the state represents the associated task having been
+ /// allocated capacity to block.
+ pub fn is_allocated(&self) -> bool {
+ self.0 & ALLOCATED == ALLOCATED
+ }
+
+ /// Atomically consume the capacity allocation and return if the allocation
+ /// was present.
+ ///
+ /// If this returns `true`, then the task has the ability to block for the
+ /// duration of the `poll`.
+ pub fn consume_allocation(state: &AtomicUsize, ordering: Ordering) -> CanBlock {
+ let state: Self = state.fetch_and(!ALLOCATED, ordering).into();
+
+ if state.is_allocated() {
+ CanBlock::Allocated
+ } else if state.is_queued() {
+ CanBlock::NoCapacity
+ } else {
+ CanBlock::CanRequest
+ }
+ }
+
+ pub fn notify_blocking(state: &AtomicUsize, ordering: Ordering) {
+ let prev: Self = state.fetch_xor(ALLOCATED | QUEUED, ordering).into();
+
+ debug_assert!(prev.is_queued());
+ debug_assert!(!prev.is_allocated());
+ }
+}
+
+impl From<usize> for BlockingState {
+ fn from(src: usize) -> BlockingState {
+ BlockingState(src)
+ }
+}
+
+impl From<BlockingState> for usize {
+ fn from(src: BlockingState) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for BlockingState {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BlockingState")
+ .field("is_queued", &self.is_queued())
+ .field("is_allocated", &self.is_allocated())
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/mod.rs b/third_party/rust/tokio-threadpool/src/task/mod.rs
new file mode 100644
index 0000000000..9c5a448fcd
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/mod.rs
@@ -0,0 +1,308 @@
+mod blocking;
+mod blocking_state;
+mod state;
+
+pub(crate) use self::blocking::{Blocking, CanBlock};
+use self::blocking_state::BlockingState;
+use self::state::State;
+
+use notifier::Notifier;
+use pool::Pool;
+
+use futures::executor::{self, Spawn};
+use futures::{self, Async, Future};
+
+use std::cell::{Cell, UnsafeCell};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::atomic::{AtomicPtr, AtomicUsize};
+use std::sync::Arc;
+use std::{fmt, panic, ptr};
+
+/// Harness around a future.
+///
+/// This also behaves as a node in the inbound work queue and the blocking
+/// queue.
+pub(crate) struct Task {
+ /// Task lifecycle state
+ state: AtomicUsize,
+
+ /// Task blocking related state
+ blocking: AtomicUsize,
+
+ /// Next pointer in the queue of tasks pending blocking capacity.
+ next_blocking: AtomicPtr<Task>,
+
+ /// ID of the worker that polled this task first.
+ ///
+ /// This field can be a `Cell` because it's only accessed by the worker thread that is
+ /// executing the task.
+ ///
+ /// The worker ID is represented by a `u32` rather than `usize` in order to save some space
+ /// on 64-bit platforms.
+ pub reg_worker: Cell<Option<u32>>,
+
+ /// The key associated with this task in the `Slab` it was registered in.
+ ///
+ /// This field can be a `Cell` because it's only accessed by the worker thread that has
+ /// registered the task.
+ pub reg_index: Cell<usize>,
+
+ /// Store the future at the head of the struct
+ ///
+ /// The future is dropped immediately when it transitions to Complete
+ future: UnsafeCell<Option<Spawn<BoxFuture>>>,
+}
+
+#[derive(Debug)]
+pub(crate) enum Run {
+ Idle,
+ Schedule,
+ Complete,
+}
+
+type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send + 'static>;
+
+// ===== impl Task =====
+
+impl Task {
+ /// Create a new `Task` as a harness for `future`.
+ pub fn new(future: BoxFuture) -> Task {
+ // Wrap the future with an execution context.
+ let task_fut = executor::spawn(future);
+
+ Task {
+ state: AtomicUsize::new(State::new().into()),
+ blocking: AtomicUsize::new(BlockingState::new().into()),
+ next_blocking: AtomicPtr::new(ptr::null_mut()),
+ reg_worker: Cell::new(None),
+ reg_index: Cell::new(0),
+ future: UnsafeCell::new(Some(task_fut)),
+ }
+ }
+
+ /// Create a fake `Task` to be used as part of the intrusive mpsc channel
+ /// algorithm.
+ fn stub() -> Task {
+ let future = Box::new(futures::empty()) as BoxFuture;
+ let task_fut = executor::spawn(future);
+
+ Task {
+ state: AtomicUsize::new(State::stub().into()),
+ blocking: AtomicUsize::new(BlockingState::new().into()),
+ next_blocking: AtomicPtr::new(ptr::null_mut()),
+ reg_worker: Cell::new(None),
+ reg_index: Cell::new(0),
+ future: UnsafeCell::new(Some(task_fut)),
+ }
+ }
+
+ /// Execute the task returning `Run::Schedule` if the task needs to be
+ /// scheduled again.
+ pub fn run(&self, unpark: &Arc<Notifier>) -> Run {
+ use self::State::*;
+
+ // Transition task to running state. At this point, the task must be
+ // scheduled.
+ let actual: State = self
+ .state
+ .compare_and_swap(Scheduled.into(), Running.into(), AcqRel)
+ .into();
+
+ match actual {
+ Scheduled => {}
+ _ => panic!("unexpected task state; {:?}", actual),
+ }
+
+ trace!(
+ "Task::run; state={:?}",
+ State::from(self.state.load(Relaxed))
+ );
+
+ // The transition to `Running` done above ensures that a lock on the
+ // future has been obtained.
+ let fut = unsafe { &mut (*self.future.get()) };
+
+ // This block deals with the future panicking while being polled.
+ //
+ // If the future panics, then the drop handler must be called such that
+ // `thread::panicking() -> true`. To do this, the future is dropped from
+ // within the catch_unwind block.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a>(&'a mut Option<Spawn<BoxFuture>>, bool);
+
+ impl<'a> Drop for Guard<'a> {
+ fn drop(&mut self) {
+ // This drops the future
+ if self.1 {
+ let _ = self.0.take();
+ }
+ }
+ }
+
+ let mut g = Guard(fut, true);
+
+ let ret =
+ g.0.as_mut()
+ .unwrap()
+ .poll_future_notify(unpark, self as *const _ as usize);
+
+ g.1 = false;
+
+ ret
+ }));
+
+ match res {
+ Ok(Ok(Async::Ready(_))) | Ok(Err(_)) | Err(_) => {
+ trace!(" -> task complete");
+
+ // The future has completed. Drop it immediately to free
+ // resources and run drop handlers.
+ //
+ // The `Task` harness will stay around longer if it is contained
+ // by any of the various queues.
+ self.drop_future();
+
+ // Transition to the completed state
+ self.state.store(State::Complete.into(), Release);
+
+ if let Err(panic_err) = res {
+ if let Some(ref f) = unpark.pool.config.panic_handler {
+ f(panic_err);
+ }
+ }
+
+ Run::Complete
+ }
+ Ok(Ok(Async::NotReady)) => {
+ trace!(" -> not ready");
+
+ // Attempt to transition from Running -> Idle, if successful,
+ // then the task does not need to be scheduled again. If the CAS
+ // fails, then the task has been unparked concurrent to running,
+ // in which case it transitions immediately back to scheduled
+ // and we return `true`.
+ let prev: State = self
+ .state
+ .compare_and_swap(Running.into(), Idle.into(), AcqRel)
+ .into();
+
+ match prev {
+ Running => Run::Idle,
+ Notified => {
+ self.state.store(Scheduled.into(), Release);
+ Run::Schedule
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+
+ /// Aborts this task.
+ ///
+ /// This is called when the threadpool shuts down and the task has already beed polled but not
+ /// completed.
+ pub fn abort(&self) {
+ use self::State::*;
+
+ let mut state = self.state.load(Acquire).into();
+
+ loop {
+ match state {
+ Idle | Scheduled => {}
+ Running | Notified | Complete | Aborted => {
+ // It is assumed that no worker threads are running so the task must be either
+ // in the idle or scheduled state.
+ panic!("unexpected state while aborting task: {:?}", state);
+ }
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), Aborted.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ // The future has been aborted. Drop it immediately to free resources and run drop
+ // handlers.
+ self.drop_future();
+ break;
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Notify the task
+ pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) {
+ if me.schedule() {
+ let _ = pool.submit(me, pool);
+ }
+ }
+
+ /// Notify the task it has been allocated blocking capacity
+ pub fn notify_blocking(me: Arc<Task>, pool: &Arc<Pool>) {
+ BlockingState::notify_blocking(&me.blocking, AcqRel);
+ Task::notify(me, pool);
+ }
+
+ /// Transition the task state to scheduled.
+ ///
+ /// Returns `true` if the caller is permitted to schedule the task.
+ pub fn schedule(&self) -> bool {
+ use self::State::*;
+
+ loop {
+ // Scheduling can only be done from the `Idle` state.
+ let actual = self
+ .state
+ .compare_and_swap(Idle.into(), Scheduled.into(), AcqRel)
+ .into();
+
+ match actual {
+ Idle => return true,
+ Running => {
+ // The task is already running on another thread. Transition
+ // the state to `Notified`. If this CAS fails, then restart
+ // the logic again from `Idle`.
+ let actual = self
+ .state
+ .compare_and_swap(Running.into(), Notified.into(), AcqRel)
+ .into();
+
+ match actual {
+ Idle => continue,
+ _ => return false,
+ }
+ }
+ Complete | Aborted | Notified | Scheduled => return false,
+ }
+ }
+ }
+
+ /// Consumes any allocated capacity to block.
+ ///
+ /// Returns `true` if capacity was allocated, `false` otherwise.
+ pub fn consume_blocking_allocation(&self) -> CanBlock {
+ // This flag is the primary point of coordination. The queued flag
+ // happens "around" setting the blocking capacity.
+ BlockingState::consume_allocation(&self.blocking, AcqRel)
+ }
+
+ /// Drop the future
+ ///
+ /// This must only be called by the thread that successfully transitioned
+ /// the future state to `Running`.
+ fn drop_future(&self) {
+ let _ = unsafe { (*self.future.get()).take() };
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Task")
+ .field("state", &self.state)
+ .field("future", &"Spawn<BoxFuture>")
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/task/state.rs b/third_party/rust/tokio-threadpool/src/task/state.rs
new file mode 100644
index 0000000000..3e00f89bc5
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/state.rs
@@ -0,0 +1,57 @@
+#[repr(usize)]
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) enum State {
+ /// Task is currently idle
+ Idle = 0,
+
+ /// Task is currently running
+ Running = 1,
+
+ /// Task is currently running, but has been notified that it must run again.
+ Notified = 2,
+
+ /// Task has been scheduled
+ Scheduled = 3,
+
+ /// Task is complete
+ Complete = 4,
+
+ /// Task was aborted because the thread pool has been shut down
+ Aborted = 5,
+}
+
+// ===== impl State =====
+
+impl State {
+ /// Returns the initial task state.
+ ///
+ /// Tasks start in the scheduled state as they are immediately scheduled on
+ /// creation.
+ pub fn new() -> State {
+ State::Scheduled
+ }
+
+ pub fn stub() -> State {
+ State::Idle
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ use self::State::*;
+
+ debug_assert!(
+ src >= Idle as usize && src <= Aborted as usize,
+ "actual={}",
+ src
+ );
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src as usize
+ }
+}