summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/blocking
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-threadpool/src/blocking
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/blocking')
-rw-r--r--third_party/rust/tokio-threadpool/src/blocking/global.rs218
-rw-r--r--third_party/rust/tokio-threadpool/src/blocking/mod.rs88
2 files changed, 306 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/blocking/global.rs b/third_party/rust/tokio-threadpool/src/blocking/global.rs
new file mode 100644
index 0000000000..c732a58335
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/blocking/global.rs
@@ -0,0 +1,218 @@
+use super::{BlockingError, BlockingImpl};
+use futures::Poll;
+use std::cell::Cell;
+use std::fmt;
+use std::marker::PhantomData;
+use tokio_executor::Enter;
+
+thread_local! {
+ static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
+}
+
+/// Ensures that the executor is removed from the thread-local context
+/// when leaving the scope. This handles cases that involve panicking.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+pub struct DefaultGuard<'a> {
+ prior: BlockingImpl,
+ _lifetime: PhantomData<&'a ()>,
+}
+
+/// Set the default blocking implementation, returning a guard that resets the
+/// blocking implementation when dropped.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
+ CURRENT.with(|cell| {
+ let prior = cell.replace(blocking);
+ DefaultGuard {
+ prior,
+ _lifetime: PhantomData,
+ }
+ })
+}
+
+/// Set the default blocking implementation for the duration of the closure.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
+where
+ F: FnOnce(&mut Enter) -> R,
+{
+ let _guard = set_default(blocking);
+ f(enter)
+}
+
+/// Enter a blocking section of code.
+///
+/// The `blocking` function annotates a section of code that performs a blocking
+/// operation, either by issuing a blocking syscall or by performing a long
+/// running CPU-bound computation.
+///
+/// When the `blocking` function enters, it hands off the responsibility of
+/// processing the current work queue to another thread. Then, it calls the
+/// supplied closure. The closure is permitted to block indefinitely.
+///
+/// If the maximum number of concurrent `blocking` calls has been reached, then
+/// `NotReady` is returned and the task is notified once existing `blocking`
+/// calls complete. The maximum value is specified when creating a thread pool
+/// using [`Builder::max_blocking`][build]
+///
+/// NB: The entire task that called `blocking` is blocked whenever the supplied
+/// closure blocks, even if you have used future combinators such as `select` -
+/// the other futures in this task will not make progress until the closure
+/// returns.
+/// If this is not desired, ensure that `blocking` runs in its own task (e.g.
+/// using `futures::sync::oneshot::spawn`).
+///
+/// [build]: struct.Builder.html#method.max_blocking
+///
+/// # Return
+///
+/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
+/// `T` is the closure's return value.
+///
+/// If the thread pool has shutdown, `Err` is returned.
+///
+/// If the number of concurrent `blocking` calls has reached the maximum,
+/// `Ok(NotReady)` is returned and the current task is notified when a call to
+/// `blocking` will succeed.
+///
+/// If `blocking` is called from outside the context of a Tokio thread pool,
+/// `Err` is returned.
+///
+/// # Background
+///
+/// By default, the Tokio thread pool expects that tasks will only run for short
+/// periods at a time before yielding back to the thread pool. This is the basic
+/// premise of cooperative multitasking.
+///
+/// However, it is common to want to perform a blocking operation while
+/// processing an asynchronous computation. Examples of blocking operation
+/// include:
+///
+/// * Performing synchronous file operations (reading and writing).
+/// * Blocking on acquiring a mutex.
+/// * Performing a CPU bound computation, like cryptographic encryption or
+/// decryption.
+///
+/// One option for dealing with blocking operations in an asynchronous context
+/// is to use a thread pool dedicated to performing these operations. This not
+/// ideal as it requires bidirectional message passing as well as a channel to
+/// communicate which adds a level of buffering.
+///
+/// Instead, `blocking` hands off the responsibility of processing the work queue
+/// to another thread. This hand off is light compared to a channel and does not
+/// require buffering.
+///
+/// # Examples
+///
+/// Block on receiving a message from a `std` channel. This example is a little
+/// silly as using the non-blocking channel from the `futures` crate would make
+/// more sense. The blocking receive can be replaced with any blocking operation
+/// that needs to be performed.
+///
+/// ```rust
+/// # extern crate futures;
+/// # extern crate tokio_threadpool;
+///
+/// use tokio_threadpool::{ThreadPool, blocking};
+///
+/// use futures::Future;
+/// use futures::future::{lazy, poll_fn};
+///
+/// use std::sync::mpsc;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// pub fn main() {
+/// // This is a *blocking* channel
+/// let (tx, rx) = mpsc::channel();
+///
+/// // Spawn a thread to send a message
+/// thread::spawn(move || {
+/// thread::sleep(Duration::from_millis(500));
+/// tx.send("hello").unwrap();
+/// });
+///
+/// let pool = ThreadPool::new();
+///
+/// pool.spawn(lazy(move || {
+/// // Because `blocking` returns `Poll`, it is intended to be used
+/// // from the context of a `Future` implementation. Since we don't
+/// // have a complicated requirement, we can use `poll_fn` in this
+/// // case.
+/// poll_fn(move || {
+/// blocking(|| {
+/// let msg = rx.recv().unwrap();
+/// println!("message = {}", msg);
+/// }).map_err(|_| panic!("the threadpool shut down"))
+/// })
+/// }));
+///
+/// // Wait for the task we just spawned to complete.
+/// pool.shutdown_on_idle().wait().unwrap();
+/// }
+/// ```
+pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
+where
+ F: FnOnce() -> T,
+{
+ CURRENT.with(|cell| {
+ let blocking = cell.get();
+
+ // Object-safety workaround: the `Blocking` trait must be object-safe,
+ // since we use a trait object in the thread-local. However, a blocking
+ // _operation_ will be generic over the return type of the blocking
+ // function. Therefore, rather than passing a function with a return
+ // type to `Blocking::run_blocking`, we pass a _new_ closure which
+ // doesn't have a return value. That closure invokes the blocking
+ // function and assigns its value to `ret`, which we then unpack when
+ // the blocking call finishes.
+ let mut f = Some(f);
+ let mut ret = None;
+ {
+ let ret2 = &mut ret;
+ let mut run = move || {
+ let f = f
+ .take()
+ .expect("blocking closure invoked twice; this is a bug!");
+ *ret2 = Some((f)());
+ };
+
+ try_ready!((blocking)(&mut run));
+ }
+
+ // Return the result
+ let ret =
+ ret.expect("blocking function finished, but return value was unset; this is a bug!");
+ Ok(ret.into())
+ })
+}
+
+// === impl DefaultGuard ===
+
+impl<'a> fmt::Debug for DefaultGuard<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("DefaultGuard { .. }")
+ }
+}
+
+impl<'a> Drop for DefaultGuard<'a> {
+ fn drop(&mut self) {
+ // if the TLS value has already been torn down, there's nothing else we
+ // can do. we're almost certainly panicking anyway.
+ let _ = CURRENT.try_with(|cell| {
+ cell.set(self.prior);
+ });
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/blocking/mod.rs b/third_party/rust/tokio-threadpool/src/blocking/mod.rs
new file mode 100644
index 0000000000..27151d44b8
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/blocking/mod.rs
@@ -0,0 +1,88 @@
+use worker::Worker;
+
+use futures::{Async, Poll};
+use tokio_executor;
+
+use std::error::Error;
+use std::fmt;
+
+mod global;
+pub use self::global::blocking;
+#[doc(hidden)]
+pub use self::global::{set_default, with_default, DefaultGuard};
+
+/// Error raised by `blocking`.
+pub struct BlockingError {
+ _p: (),
+}
+
+/// A function implementing the behavior run on calls to `blocking`.
+///
+/// **NOTE:** This is intended specifically for use by `tokio` 0.2's
+/// backwards-compatibility layer. In general, user code should not override the
+/// blocking implementation. If you use this, make sure you know what you're
+/// doing.
+#[doc(hidden)]
+pub type BlockingImpl = fn(&mut dyn FnMut()) -> Poll<(), BlockingError>;
+
+fn default_blocking(f: &mut dyn FnMut()) -> Poll<(), BlockingError> {
+ let res = Worker::with_current(|worker| {
+ let worker = match worker {
+ Some(worker) => worker,
+ None => {
+ return Err(BlockingError::new());
+ }
+ };
+
+ // Transition the worker state to blocking. This will exit the fn early
+ // with `NotReady` if the pool does not have enough capacity to enter
+ // blocking mode.
+ worker.transition_to_blocking()
+ });
+
+ // If the transition cannot happen, exit early
+ try_ready!(res);
+
+ // Currently in blocking mode, so call the inner closure.
+ //
+ // "Exit" the current executor in case the blocking function wants
+ // to call a different executor.
+ tokio_executor::exit(move || (f)());
+
+ // Try to transition out of blocking mode. This is a fast path that takes
+ // back ownership of the worker if the worker handoff didn't complete yet.
+ Worker::with_current(|worker| {
+ // Worker must be set since it was above.
+ worker.unwrap().transition_from_blocking();
+ });
+
+ Ok(Async::Ready(()))
+}
+
+impl BlockingError {
+ /// Returns a new `BlockingError`.
+ #[doc(hidden)]
+ pub fn new() -> Self {
+ Self { _p: () }
+ }
+}
+
+impl fmt::Display for BlockingError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "{}", self.description())
+ }
+}
+
+impl fmt::Debug for BlockingError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("BlockingError")
+ .field("reason", &self.description())
+ .finish()
+ }
+}
+
+impl Error for BlockingError {
+ fn description(&self) -> &str {
+ "`blocking` annotation used from outside the context of a thread pool"
+ }
+}