summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/task/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/task/mod.rs')
-rw-r--r--third_party/rust/tokio-threadpool/src/task/mod.rs308
1 files changed, 308 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/task/mod.rs b/third_party/rust/tokio-threadpool/src/task/mod.rs
new file mode 100644
index 0000000000..9c5a448fcd
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/task/mod.rs
@@ -0,0 +1,308 @@
+mod blocking;
+mod blocking_state;
+mod state;
+
+pub(crate) use self::blocking::{Blocking, CanBlock};
+use self::blocking_state::BlockingState;
+use self::state::State;
+
+use notifier::Notifier;
+use pool::Pool;
+
+use futures::executor::{self, Spawn};
+use futures::{self, Async, Future};
+
+use std::cell::{Cell, UnsafeCell};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::atomic::{AtomicPtr, AtomicUsize};
+use std::sync::Arc;
+use std::{fmt, panic, ptr};
+
+/// Harness around a future.
+///
+/// This also behaves as a node in the inbound work queue and the blocking
+/// queue.
+pub(crate) struct Task {
+ /// Task lifecycle state
+ state: AtomicUsize,
+
+ /// Task blocking related state
+ blocking: AtomicUsize,
+
+ /// Next pointer in the queue of tasks pending blocking capacity.
+ next_blocking: AtomicPtr<Task>,
+
+ /// ID of the worker that polled this task first.
+ ///
+ /// This field can be a `Cell` because it's only accessed by the worker thread that is
+ /// executing the task.
+ ///
+ /// The worker ID is represented by a `u32` rather than `usize` in order to save some space
+ /// on 64-bit platforms.
+ pub reg_worker: Cell<Option<u32>>,
+
+ /// The key associated with this task in the `Slab` it was registered in.
+ ///
+ /// This field can be a `Cell` because it's only accessed by the worker thread that has
+ /// registered the task.
+ pub reg_index: Cell<usize>,
+
+ /// Store the future at the head of the struct
+ ///
+ /// The future is dropped immediately when it transitions to Complete
+ future: UnsafeCell<Option<Spawn<BoxFuture>>>,
+}
+
+#[derive(Debug)]
+pub(crate) enum Run {
+ Idle,
+ Schedule,
+ Complete,
+}
+
+type BoxFuture = Box<dyn Future<Item = (), Error = ()> + Send + 'static>;
+
+// ===== impl Task =====
+
+impl Task {
+ /// Create a new `Task` as a harness for `future`.
+ pub fn new(future: BoxFuture) -> Task {
+ // Wrap the future with an execution context.
+ let task_fut = executor::spawn(future);
+
+ Task {
+ state: AtomicUsize::new(State::new().into()),
+ blocking: AtomicUsize::new(BlockingState::new().into()),
+ next_blocking: AtomicPtr::new(ptr::null_mut()),
+ reg_worker: Cell::new(None),
+ reg_index: Cell::new(0),
+ future: UnsafeCell::new(Some(task_fut)),
+ }
+ }
+
+ /// Create a fake `Task` to be used as part of the intrusive mpsc channel
+ /// algorithm.
+ fn stub() -> Task {
+ let future = Box::new(futures::empty()) as BoxFuture;
+ let task_fut = executor::spawn(future);
+
+ Task {
+ state: AtomicUsize::new(State::stub().into()),
+ blocking: AtomicUsize::new(BlockingState::new().into()),
+ next_blocking: AtomicPtr::new(ptr::null_mut()),
+ reg_worker: Cell::new(None),
+ reg_index: Cell::new(0),
+ future: UnsafeCell::new(Some(task_fut)),
+ }
+ }
+
+ /// Execute the task returning `Run::Schedule` if the task needs to be
+ /// scheduled again.
+ pub fn run(&self, unpark: &Arc<Notifier>) -> Run {
+ use self::State::*;
+
+ // Transition task to running state. At this point, the task must be
+ // scheduled.
+ let actual: State = self
+ .state
+ .compare_and_swap(Scheduled.into(), Running.into(), AcqRel)
+ .into();
+
+ match actual {
+ Scheduled => {}
+ _ => panic!("unexpected task state; {:?}", actual),
+ }
+
+ trace!(
+ "Task::run; state={:?}",
+ State::from(self.state.load(Relaxed))
+ );
+
+ // The transition to `Running` done above ensures that a lock on the
+ // future has been obtained.
+ let fut = unsafe { &mut (*self.future.get()) };
+
+ // This block deals with the future panicking while being polled.
+ //
+ // If the future panics, then the drop handler must be called such that
+ // `thread::panicking() -> true`. To do this, the future is dropped from
+ // within the catch_unwind block.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a>(&'a mut Option<Spawn<BoxFuture>>, bool);
+
+ impl<'a> Drop for Guard<'a> {
+ fn drop(&mut self) {
+ // This drops the future
+ if self.1 {
+ let _ = self.0.take();
+ }
+ }
+ }
+
+ let mut g = Guard(fut, true);
+
+ let ret =
+ g.0.as_mut()
+ .unwrap()
+ .poll_future_notify(unpark, self as *const _ as usize);
+
+ g.1 = false;
+
+ ret
+ }));
+
+ match res {
+ Ok(Ok(Async::Ready(_))) | Ok(Err(_)) | Err(_) => {
+ trace!(" -> task complete");
+
+ // The future has completed. Drop it immediately to free
+ // resources and run drop handlers.
+ //
+ // The `Task` harness will stay around longer if it is contained
+ // by any of the various queues.
+ self.drop_future();
+
+ // Transition to the completed state
+ self.state.store(State::Complete.into(), Release);
+
+ if let Err(panic_err) = res {
+ if let Some(ref f) = unpark.pool.config.panic_handler {
+ f(panic_err);
+ }
+ }
+
+ Run::Complete
+ }
+ Ok(Ok(Async::NotReady)) => {
+ trace!(" -> not ready");
+
+ // Attempt to transition from Running -> Idle, if successful,
+ // then the task does not need to be scheduled again. If the CAS
+ // fails, then the task has been unparked concurrent to running,
+ // in which case it transitions immediately back to scheduled
+ // and we return `true`.
+ let prev: State = self
+ .state
+ .compare_and_swap(Running.into(), Idle.into(), AcqRel)
+ .into();
+
+ match prev {
+ Running => Run::Idle,
+ Notified => {
+ self.state.store(Scheduled.into(), Release);
+ Run::Schedule
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+
+ /// Aborts this task.
+ ///
+ /// This is called when the threadpool shuts down and the task has already beed polled but not
+ /// completed.
+ pub fn abort(&self) {
+ use self::State::*;
+
+ let mut state = self.state.load(Acquire).into();
+
+ loop {
+ match state {
+ Idle | Scheduled => {}
+ Running | Notified | Complete | Aborted => {
+ // It is assumed that no worker threads are running so the task must be either
+ // in the idle or scheduled state.
+ panic!("unexpected state while aborting task: {:?}", state);
+ }
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), Aborted.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ // The future has been aborted. Drop it immediately to free resources and run drop
+ // handlers.
+ self.drop_future();
+ break;
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Notify the task
+ pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) {
+ if me.schedule() {
+ let _ = pool.submit(me, pool);
+ }
+ }
+
+ /// Notify the task it has been allocated blocking capacity
+ pub fn notify_blocking(me: Arc<Task>, pool: &Arc<Pool>) {
+ BlockingState::notify_blocking(&me.blocking, AcqRel);
+ Task::notify(me, pool);
+ }
+
+ /// Transition the task state to scheduled.
+ ///
+ /// Returns `true` if the caller is permitted to schedule the task.
+ pub fn schedule(&self) -> bool {
+ use self::State::*;
+
+ loop {
+ // Scheduling can only be done from the `Idle` state.
+ let actual = self
+ .state
+ .compare_and_swap(Idle.into(), Scheduled.into(), AcqRel)
+ .into();
+
+ match actual {
+ Idle => return true,
+ Running => {
+ // The task is already running on another thread. Transition
+ // the state to `Notified`. If this CAS fails, then restart
+ // the logic again from `Idle`.
+ let actual = self
+ .state
+ .compare_and_swap(Running.into(), Notified.into(), AcqRel)
+ .into();
+
+ match actual {
+ Idle => continue,
+ _ => return false,
+ }
+ }
+ Complete | Aborted | Notified | Scheduled => return false,
+ }
+ }
+ }
+
+ /// Consumes any allocated capacity to block.
+ ///
+ /// Returns `true` if capacity was allocated, `false` otherwise.
+ pub fn consume_blocking_allocation(&self) -> CanBlock {
+ // This flag is the primary point of coordination. The queued flag
+ // happens "around" setting the blocking capacity.
+ BlockingState::consume_allocation(&self.blocking, AcqRel)
+ }
+
+ /// Drop the future
+ ///
+ /// This must only be called by the thread that successfully transitioned
+ /// the future state to `Running`.
+ fn drop_future(&self) {
+ let _ = unsafe { (*self.future.get()).take() };
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Task")
+ .field("state", &self.state)
+ .field("future", &"Spawn<BoxFuture>")
+ .finish()
+ }
+}