summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/shutdown.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/shutdown.rs')
-rw-r--r--third_party/rust/tokio-threadpool/src/shutdown.rs103
1 files changed, 103 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/shutdown.rs b/third_party/rust/tokio-threadpool/src/shutdown.rs
new file mode 100644
index 0000000000..c3d04a002d
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/shutdown.rs
@@ -0,0 +1,103 @@
+use task::Task;
+use worker;
+
+use crossbeam_deque::Injector;
+use futures::task::AtomicTask;
+use futures::{Async, Future, Poll};
+
+use std::sync::{Arc, Mutex};
+
+/// Future that resolves when the thread pool is shutdown.
+///
+/// A `ThreadPool` is shutdown once all the worker have drained their queues and
+/// shutdown their threads.
+///
+/// `Shutdown` is returned by [`shutdown`], [`shutdown_on_idle`], and
+/// [`shutdown_now`].
+///
+/// [`shutdown`]: struct.ThreadPool.html#method.shutdown
+/// [`shutdown_on_idle`]: struct.ThreadPool.html#method.shutdown_on_idle
+/// [`shutdown_now`]: struct.ThreadPool.html#method.shutdown_now
+#[derive(Debug)]
+pub struct Shutdown {
+ inner: Arc<Mutex<Inner>>,
+}
+
+/// Shared state between `Shutdown` and `ShutdownTrigger`.
+///
+/// This is used for notifying the `Shutdown` future when `ShutdownTrigger` gets dropped.
+#[derive(Debug)]
+struct Inner {
+ /// The task to notify when the threadpool completes the shutdown process.
+ task: AtomicTask,
+ /// `true` if the threadpool has been shut down.
+ completed: bool,
+}
+
+impl Shutdown {
+ pub(crate) fn new(trigger: &ShutdownTrigger) -> Shutdown {
+ Shutdown {
+ inner: trigger.inner.clone(),
+ }
+ }
+}
+
+impl Future for Shutdown {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ let inner = self.inner.lock().unwrap();
+
+ if !inner.completed {
+ inner.task.register();
+ Ok(Async::NotReady)
+ } else {
+ Ok(().into())
+ }
+ }
+}
+
+/// When dropped, cleans up threadpool's resources and completes the shutdown process.
+#[derive(Debug)]
+pub(crate) struct ShutdownTrigger {
+ inner: Arc<Mutex<Inner>>,
+ workers: Arc<[worker::Entry]>,
+ queue: Arc<Injector<Arc<Task>>>,
+}
+
+unsafe impl Send for ShutdownTrigger {}
+unsafe impl Sync for ShutdownTrigger {}
+
+impl ShutdownTrigger {
+ pub(crate) fn new(
+ workers: Arc<[worker::Entry]>,
+ queue: Arc<Injector<Arc<Task>>>,
+ ) -> ShutdownTrigger {
+ ShutdownTrigger {
+ inner: Arc::new(Mutex::new(Inner {
+ task: AtomicTask::new(),
+ completed: false,
+ })),
+ workers,
+ queue,
+ }
+ }
+}
+
+impl Drop for ShutdownTrigger {
+ fn drop(&mut self) {
+ // Drain the global task queue.
+ while !self.queue.steal().is_empty() {}
+
+ // Drop the remaining incomplete tasks and parkers assosicated with workers.
+ for worker in self.workers.iter() {
+ worker.shutdown();
+ }
+
+ // Notify the task interested in shutdown.
+ let mut inner = self.inner.lock().unwrap();
+ inner.completed = true;
+ inner.task.notify();
+ }
+}