diff options
Diffstat (limited to 'third_party/rust/tokio-reactor/src/background.rs')
-rw-r--r-- | third_party/rust/tokio-reactor/src/background.rs | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/third_party/rust/tokio-reactor/src/background.rs b/third_party/rust/tokio-reactor/src/background.rs new file mode 100644 index 0000000000..88f78a8bcb --- /dev/null +++ b/third_party/rust/tokio-reactor/src/background.rs @@ -0,0 +1,217 @@ +use {Reactor, Handle, Task}; +use atomic_task::AtomicTask; + +use futures::{Future, Async, Poll, task}; + +use std::io; +use std::thread; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// Handle to the reactor running on a background thread. +/// +/// Instances are created by calling [`Reactor::background`]. +/// +/// [`Reactor::background`]: struct.Reactor.html#method.background +#[derive(Debug)] +pub struct Background { + /// When `None`, the reactor thread will run until the process terminates. + inner: Option<Inner>, +} + +/// Future that resolves when the reactor thread has shutdown. +#[derive(Debug)] +pub struct Shutdown { + inner: Inner, +} + +/// Actual Background handle. +#[derive(Debug)] +struct Inner { + /// Handle to the reactor + handle: Handle, + + /// Shared state between the background handle and the reactor thread. + shared: Arc<Shared>, +} + +#[derive(Debug)] +struct Shared { + /// Signal the reactor thread to shutdown. + shutdown: AtomicUsize, + + /// Task to notify when the reactor thread enters a shutdown state. + shutdown_task: AtomicTask, +} + +/// Notifies the reactor thread to shutdown once the reactor becomes idle. +const SHUTDOWN_IDLE: usize = 1; + +/// Notifies the reactor thread to shutdown immediately. +const SHUTDOWN_NOW: usize = 2; + +/// The reactor is currently shutdown. +const SHUTDOWN: usize = 3; + +// ===== impl Background ===== + +impl Background { + /// Launch a reactor in the background and return a handle to the thread. + pub(crate) fn new(reactor: Reactor) -> io::Result<Background> { + // Grab a handle to the reactor + let handle = reactor.handle().clone(); + + // Create the state shared between the background handle and the reactor + // thread. + let shared = Arc::new(Shared { + shutdown: AtomicUsize::new(0), + shutdown_task: AtomicTask::new(), + }); + + // For the reactor thread + let shared2 = shared.clone(); + + // Start the reactor thread + thread::Builder::new() + .spawn(move || run(reactor, shared2))?; + + Ok(Background { + inner: Some(Inner { + handle, + shared, + }), + }) + } + + /// Returns a reference to the reactor handle. + pub fn handle(&self) -> &Handle { + &self.inner.as_ref().unwrap().handle + } + + /// Shutdown the reactor on idle. + /// + /// Returns a future that completes once the reactor thread has shutdown. + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.shutdown_on_idle(); + + Shutdown { inner } + } + + /// Shutdown the reactor immediately + /// + /// Returns a future that completes once the reactor thread has shutdown. + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + inner.shutdown_now(); + + Shutdown { inner } + } + + /// Run the reactor on its thread until the process terminates. + pub fn forget(mut self) { + drop(self.inner.take()); + } +} + +impl Drop for Background { + fn drop(&mut self) { + let inner = match self.inner.take() { + Some(i) => i, + None => return, + }; + + inner.shutdown_now(); + + let shutdown = Shutdown { inner }; + let _ = shutdown.wait(); + } +} + +// ===== impl Shutdown ===== + +impl Future for Shutdown { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let task = Task::Futures1(task::current()); + self.inner.shared.shutdown_task.register_task(task); + + if !self.inner.is_shutdown() { + return Ok(Async::NotReady); + } + + Ok(().into()) + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Returns true if the reactor thread is shutdown. + fn is_shutdown(&self) -> bool { + self.shared.shutdown.load(SeqCst) == SHUTDOWN + } + + /// Notify the reactor thread to shutdown once the reactor transitions to an + /// idle state. + fn shutdown_on_idle(&self) { + self.shared.shutdown + .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst); + self.handle.wakeup(); + } + + /// Notify the reactor thread to shutdown immediately. + fn shutdown_now(&self) { + let mut curr = self.shared.shutdown.load(SeqCst); + + loop { + if curr >= SHUTDOWN_NOW { + return; + } + + let act = self.shared.shutdown + .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst); + + if act == curr { + self.handle.wakeup(); + return; + } + + curr = act; + } + } +} + +// ===== impl Reactor thread ===== + +fn run(mut reactor: Reactor, shared: Arc<Shared>) { + debug!("starting background reactor"); + loop { + let shutdown = shared.shutdown.load(SeqCst); + + if shutdown == SHUTDOWN_NOW { + debug!("shutting background reactor down NOW"); + break; + } + + if shutdown == SHUTDOWN_IDLE && reactor.is_idle() { + debug!("shutting background reactor on idle"); + break; + } + + reactor.turn(None).unwrap(); + } + + drop(reactor); + + // Transition the state to shutdown + shared.shutdown.store(SHUTDOWN, SeqCst); + + // Notify any waiters + shared.shutdown_task.notify(); + + debug!("background reactor has shutdown"); +} |