use task::Task; use worker; use crossbeam_deque::Injector; use futures::task::AtomicTask; use futures::{Async, Future, Poll}; use std::sync::{Arc, Mutex}; /// Future that resolves when the thread pool is shutdown. /// /// A `ThreadPool` is shutdown once all the worker have drained their queues and /// shutdown their threads. /// /// `Shutdown` is returned by [`shutdown`], [`shutdown_on_idle`], and /// [`shutdown_now`]. /// /// [`shutdown`]: struct.ThreadPool.html#method.shutdown /// [`shutdown_on_idle`]: struct.ThreadPool.html#method.shutdown_on_idle /// [`shutdown_now`]: struct.ThreadPool.html#method.shutdown_now #[derive(Debug)] pub struct Shutdown { inner: Arc>, } /// Shared state between `Shutdown` and `ShutdownTrigger`. /// /// This is used for notifying the `Shutdown` future when `ShutdownTrigger` gets dropped. #[derive(Debug)] struct Inner { /// The task to notify when the threadpool completes the shutdown process. task: AtomicTask, /// `true` if the threadpool has been shut down. completed: bool, } impl Shutdown { pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown { Shutdown { inner: trigger.inner.clone(), } } } impl Future for Shutdown { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { let inner = self.inner.lock().unwrap(); if !inner.completed { inner.task.register(); Ok(Async::NotReady) } else { Ok(().into()) } } } /// When dropped, cleans up threadpool's resources and completes the shutdown process. #[derive(Debug)] pub(crate) struct ShutdownTrigger { inner: Arc>, workers: Arc<[worker::Entry]>, queue: Arc>>, } unsafe impl Send for ShutdownTrigger {} unsafe impl Sync for ShutdownTrigger {} impl ShutdownTrigger { pub(crate) fn new( workers: Arc<[worker::Entry]>, queue: Arc>>, ) -> ShutdownTrigger { ShutdownTrigger { inner: Arc::new(Mutex::new(Inner { task: AtomicTask::new(), completed: false, })), workers, queue, } } } impl Drop for ShutdownTrigger { fn drop(&mut self) { // Drain the global task queue. while !self.queue.steal().is_empty() {} // Drop the remaining incomplete tasks and parkers assosicated with workers. for worker in self.workers.iter() { worker.shutdown(); } // Notify the task interested in shutdown. let mut inner = self.inner.lock().unwrap(); inner.completed = true; inner.task.notify(); } }