summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-reactor/src/background.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-reactor/src/background.rs')
-rw-r--r--third_party/rust/tokio-reactor/src/background.rs217
1 files changed, 217 insertions, 0 deletions
diff --git a/third_party/rust/tokio-reactor/src/background.rs b/third_party/rust/tokio-reactor/src/background.rs
new file mode 100644
index 0000000000..88f78a8bcb
--- /dev/null
+++ b/third_party/rust/tokio-reactor/src/background.rs
@@ -0,0 +1,217 @@
+use {Reactor, Handle, Task};
+use atomic_task::AtomicTask;
+
+use futures::{Future, Async, Poll, task};
+
+use std::io;
+use std::thread;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// Handle to the reactor running on a background thread.
+///
+/// Instances are created by calling [`Reactor::background`].
+///
+/// [`Reactor::background`]: struct.Reactor.html#method.background
+#[derive(Debug)]
+pub struct Background {
+ /// When `None`, the reactor thread will run until the process terminates.
+ inner: Option<Inner>,
+}
+
+/// Future that resolves when the reactor thread has shutdown.
+#[derive(Debug)]
+pub struct Shutdown {
+ inner: Inner,
+}
+
+/// Actual Background handle.
+#[derive(Debug)]
+struct Inner {
+ /// Handle to the reactor
+ handle: Handle,
+
+ /// Shared state between the background handle and the reactor thread.
+ shared: Arc<Shared>,
+}
+
+#[derive(Debug)]
+struct Shared {
+ /// Signal the reactor thread to shutdown.
+ shutdown: AtomicUsize,
+
+ /// Task to notify when the reactor thread enters a shutdown state.
+ shutdown_task: AtomicTask,
+}
+
+/// Notifies the reactor thread to shutdown once the reactor becomes idle.
+const SHUTDOWN_IDLE: usize = 1;
+
+/// Notifies the reactor thread to shutdown immediately.
+const SHUTDOWN_NOW: usize = 2;
+
+/// The reactor is currently shutdown.
+const SHUTDOWN: usize = 3;
+
+// ===== impl Background =====
+
+impl Background {
+ /// Launch a reactor in the background and return a handle to the thread.
+ pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
+ // Grab a handle to the reactor
+ let handle = reactor.handle().clone();
+
+ // Create the state shared between the background handle and the reactor
+ // thread.
+ let shared = Arc::new(Shared {
+ shutdown: AtomicUsize::new(0),
+ shutdown_task: AtomicTask::new(),
+ });
+
+ // For the reactor thread
+ let shared2 = shared.clone();
+
+ // Start the reactor thread
+ thread::Builder::new()
+ .spawn(move || run(reactor, shared2))?;
+
+ Ok(Background {
+ inner: Some(Inner {
+ handle,
+ shared,
+ }),
+ })
+ }
+
+ /// Returns a reference to the reactor handle.
+ pub fn handle(&self) -> &Handle {
+ &self.inner.as_ref().unwrap().handle
+ }
+
+ /// Shutdown the reactor on idle.
+ ///
+ /// Returns a future that completes once the reactor thread has shutdown.
+ pub fn shutdown_on_idle(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.shutdown_on_idle();
+
+ Shutdown { inner }
+ }
+
+ /// Shutdown the reactor immediately
+ ///
+ /// Returns a future that completes once the reactor thread has shutdown.
+ pub fn shutdown_now(mut self) -> Shutdown {
+ let inner = self.inner.take().unwrap();
+ inner.shutdown_now();
+
+ Shutdown { inner }
+ }
+
+ /// Run the reactor on its thread until the process terminates.
+ pub fn forget(mut self) {
+ drop(self.inner.take());
+ }
+}
+
+impl Drop for Background {
+ fn drop(&mut self) {
+ let inner = match self.inner.take() {
+ Some(i) => i,
+ None => return,
+ };
+
+ inner.shutdown_now();
+
+ let shutdown = Shutdown { inner };
+ let _ = shutdown.wait();
+ }
+}
+
+// ===== impl Shutdown =====
+
+impl Future for Shutdown {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ let task = Task::Futures1(task::current());
+ self.inner.shared.shutdown_task.register_task(task);
+
+ if !self.inner.is_shutdown() {
+ return Ok(Async::NotReady);
+ }
+
+ Ok(().into())
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ /// Returns true if the reactor thread is shutdown.
+ fn is_shutdown(&self) -> bool {
+ self.shared.shutdown.load(SeqCst) == SHUTDOWN
+ }
+
+ /// Notify the reactor thread to shutdown once the reactor transitions to an
+ /// idle state.
+ fn shutdown_on_idle(&self) {
+ self.shared.shutdown
+ .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
+ self.handle.wakeup();
+ }
+
+ /// Notify the reactor thread to shutdown immediately.
+ fn shutdown_now(&self) {
+ let mut curr = self.shared.shutdown.load(SeqCst);
+
+ loop {
+ if curr >= SHUTDOWN_NOW {
+ return;
+ }
+
+ let act = self.shared.shutdown
+ .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
+
+ if act == curr {
+ self.handle.wakeup();
+ return;
+ }
+
+ curr = act;
+ }
+ }
+}
+
+// ===== impl Reactor thread =====
+
+fn run(mut reactor: Reactor, shared: Arc<Shared>) {
+ debug!("starting background reactor");
+ loop {
+ let shutdown = shared.shutdown.load(SeqCst);
+
+ if shutdown == SHUTDOWN_NOW {
+ debug!("shutting background reactor down NOW");
+ break;
+ }
+
+ if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
+ debug!("shutting background reactor on idle");
+ break;
+ }
+
+ reactor.turn(None).unwrap();
+ }
+
+ drop(reactor);
+
+ // Transition the state to shutdown
+ shared.shutdown.store(SHUTDOWN, SeqCst);
+
+ // Notify any waiters
+ shared.shutdown_task.notify();
+
+ debug!("background reactor has shutdown");
+}