diff options
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/thread_pool.rs')
-rw-r--r-- | third_party/rust/tokio-threadpool/src/thread_pool.rs | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/thread_pool.rs b/third_party/rust/tokio-threadpool/src/thread_pool.rs new file mode 100644 index 0000000000..30f58e96ad --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/thread_pool.rs @@ -0,0 +1,217 @@ +use builder::Builder; +use pool::Pool; +use sender::Sender; +use shutdown::{Shutdown, ShutdownTrigger}; + +use futures::sync::oneshot; +use futures::{Future, Poll}; + +use std::sync::Arc; + +/// Work-stealing based thread pool for executing futures. +/// +/// If a `ThreadPool` instance is dropped without explicitly being shutdown, +/// `shutdown_now` is called implicitly, forcing all tasks that have not yet +/// completed to be dropped. +/// +/// Create `ThreadPool` instances using `Builder`. +#[derive(Debug)] +pub struct ThreadPool { + inner: Option<Inner>, +} + +#[derive(Debug)] +struct Inner { + sender: Sender, + trigger: Arc<ShutdownTrigger>, +} + +impl ThreadPool { + /// Create a new `ThreadPool` with default values. + /// + /// Use [`Builder`] for creating a configured thread pool. + /// + /// [`Builder`]: struct.Builder.html + pub fn new() -> ThreadPool { + Builder::new().build() + } + + pub(crate) fn new2(pool: Arc<Pool>, trigger: Arc<ShutdownTrigger>) -> ThreadPool { + ThreadPool { + inner: Some(Inner { + sender: Sender { pool }, + trigger, + }), + } + } + + /// Spawn a future onto the thread pool. + /// + /// This function takes ownership of the future and randomly assigns it to a + /// worker thread. The thread will then start executing the future. + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::ThreadPool; + /// use futures::future::{Future, lazy}; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = ThreadPool::new(); + /// + /// thread_pool.spawn(lazy(|| { + /// println!("called from a worker thread"); + /// Ok(()) + /// })); + /// + /// // Gracefully shutdown the threadpool + /// thread_pool.shutdown().wait().unwrap(); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Use [`Sender::spawn`] for a + /// version that returns a `Result` instead of panicking. + pub fn spawn<F>(&self, future: F) + where + F: Future<Item = (), Error = ()> + Send + 'static, + { + self.sender().spawn(future).unwrap(); + } + + /// Spawn a future on to the thread pool, return a future representing + /// the produced value. + /// + /// The SpawnHandle returned is a future that is a proxy for future itself. + /// When future completes on this thread pool then the SpawnHandle will itself + /// be resolved. + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio_threadpool; + /// # extern crate futures; + /// # use tokio_threadpool::ThreadPool; + /// use futures::future::{Future, lazy}; + /// + /// # pub fn main() { + /// // Create a thread pool with default configuration values + /// let thread_pool = ThreadPool::new(); + /// + /// let handle = thread_pool.spawn_handle(lazy(|| Ok::<_, ()>(42))); + /// + /// let value = handle.wait().unwrap(); + /// assert_eq!(value, 42); + /// + /// // Gracefully shutdown the threadpool + /// thread_pool.shutdown().wait().unwrap(); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. + pub fn spawn_handle<F>(&self, future: F) -> SpawnHandle<F::Item, F::Error> + where + F: Future + Send + 'static, + F::Item: Send + 'static, + F::Error: Send + 'static, + { + SpawnHandle(oneshot::spawn(future, self.sender())) + } + + /// Return a reference to the sender handle + /// + /// The handle is used to spawn futures onto the thread pool. It also + /// implements the `Executor` trait. + pub fn sender(&self) -> &Sender { + &self.inner.as_ref().unwrap().sender + } + + /// Return a mutable reference to the sender handle + pub fn sender_mut(&mut self) -> &mut Sender { + &mut self.inner.as_mut().unwrap().sender + } + + /// Shutdown the pool once it becomes idle. + /// + /// Idle is defined as the completion of all futures that have been spawned + /// onto the thread pool. There may still be outstanding handles when the + /// thread pool reaches an idle state. + /// + /// Once the idle state is reached, calling `spawn` on any outstanding + /// handle will result in an error. All worker threads are signaled and will + /// shutdown. The returned future completes once all worker threads have + /// completed the shutdown process. + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.sender.pool.shutdown(false, false); + Shutdown::new(&inner.trigger) + } + + /// Shutdown the pool + /// + /// This prevents the thread pool from accepting new tasks but will allow + /// any existing tasks to complete. + /// + /// Calling `spawn` on any outstanding handle will result in an error. All + /// worker threads are signaled and will shutdown. The returned future + /// completes once all worker threads have completed the shutdown process. + pub fn shutdown(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.sender.pool.shutdown(true, false); + Shutdown::new(&inner.trigger) + } + + /// Shutdown the pool immediately + /// + /// This will prevent the thread pool from accepting new tasks **and** + /// abort any tasks that are currently running on the thread pool. + /// + /// Calling `spawn` on any outstanding handle will result in an error. All + /// worker threads are signaled and will shutdown. The returned future + /// completes once all worker threads have completed the shutdown process. + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.sender.pool.shutdown(true, true); + Shutdown::new(&inner.trigger) + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + // Begin the shutdown process. + inner.sender.pool.shutdown(true, true); + let shutdown = Shutdown::new(&inner.trigger); + + // Drop `inner` in order to drop its shutdown trigger. + drop(inner); + + // Wait until all worker threads terminate and the threadpool's resources clean up. + let _ = shutdown.wait(); + } + } +} + +/// Handle returned from ThreadPool::spawn_handle. +/// +/// This handle is a future representing the completion of a different future +/// spawned on to the thread pool. Created through the ThreadPool::spawn_handle +/// function this handle will resolve when the future provided resolves on the +/// thread pool. +#[derive(Debug)] +pub struct SpawnHandle<T, E>(oneshot::SpawnHandle<T, E>); + +impl<T, E> Future for SpawnHandle<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + self.0.poll() + } +} |