diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-threadpool/src/blocking | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/blocking')
-rw-r--r-- | third_party/rust/tokio-threadpool/src/blocking/global.rs | 218 | ||||
-rw-r--r-- | third_party/rust/tokio-threadpool/src/blocking/mod.rs | 88 |
2 files changed, 306 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/blocking/global.rs b/third_party/rust/tokio-threadpool/src/blocking/global.rs new file mode 100644 index 0000000000..c732a58335 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/blocking/global.rs @@ -0,0 +1,218 @@ +use super::{BlockingError, BlockingImpl}; +use futures::Poll; +use std::cell::Cell; +use std::fmt; +use std::marker::PhantomData; +use tokio_executor::Enter; + +thread_local! { + static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking); +} + +/// Ensures that the executor is removed from the thread-local context +/// when leaving the scope. This handles cases that involve panicking. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +pub struct DefaultGuard<'a> { + prior: BlockingImpl, + _lifetime: PhantomData<&'a ()>, +} + +/// Set the default blocking implementation, returning a guard that resets the +/// blocking implementation when dropped. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> { + CURRENT.with(|cell| { + let prior = cell.replace(blocking); + DefaultGuard { + prior, + _lifetime: PhantomData, + } + }) +} + +/// Set the default blocking implementation for the duration of the closure. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R +where + F: FnOnce(&mut Enter) -> R, +{ + let _guard = set_default(blocking); + f(enter) +} + +/// Enter a blocking section of code. +/// +/// The `blocking` function annotates a section of code that performs a blocking +/// operation, either by issuing a blocking syscall or by performing a long +/// running CPU-bound computation. +/// +/// When the `blocking` function enters, it hands off the responsibility of +/// processing the current work queue to another thread. Then, it calls the +/// supplied closure. The closure is permitted to block indefinitely. +/// +/// If the maximum number of concurrent `blocking` calls has been reached, then +/// `NotReady` is returned and the task is notified once existing `blocking` +/// calls complete. The maximum value is specified when creating a thread pool +/// using [`Builder::max_blocking`][build] +/// +/// NB: The entire task that called `blocking` is blocked whenever the supplied +/// closure blocks, even if you have used future combinators such as `select` - +/// the other futures in this task will not make progress until the closure +/// returns. +/// If this is not desired, ensure that `blocking` runs in its own task (e.g. +/// using `futures::sync::oneshot::spawn`). +/// +/// [build]: struct.Builder.html#method.max_blocking +/// +/// # Return +/// +/// When the blocking closure is executed, `Ok(Ready(T))` is returned, where +/// `T` is the closure's return value. +/// +/// If the thread pool has shutdown, `Err` is returned. +/// +/// If the number of concurrent `blocking` calls has reached the maximum, +/// `Ok(NotReady)` is returned and the current task is notified when a call to +/// `blocking` will succeed. +/// +/// If `blocking` is called from outside the context of a Tokio thread pool, +/// `Err` is returned. +/// +/// # Background +/// +/// By default, the Tokio thread pool expects that tasks will only run for short +/// periods at a time before yielding back to the thread pool. This is the basic +/// premise of cooperative multitasking. +/// +/// However, it is common to want to perform a blocking operation while +/// processing an asynchronous computation. Examples of blocking operation +/// include: +/// +/// * Performing synchronous file operations (reading and writing). +/// * Blocking on acquiring a mutex. +/// * Performing a CPU bound computation, like cryptographic encryption or +/// decryption. +/// +/// One option for dealing with blocking operations in an asynchronous context +/// is to use a thread pool dedicated to performing these operations. This not +/// ideal as it requires bidirectional message passing as well as a channel to +/// communicate which adds a level of buffering. +/// +/// Instead, `blocking` hands off the responsibility of processing the work queue +/// to another thread. This hand off is light compared to a channel and does not +/// require buffering. +/// +/// # Examples +/// +/// Block on receiving a message from a `std` channel. This example is a little +/// silly as using the non-blocking channel from the `futures` crate would make +/// more sense. The blocking receive can be replaced with any blocking operation +/// that needs to be performed. +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio_threadpool; +/// +/// use tokio_threadpool::{ThreadPool, blocking}; +/// +/// use futures::Future; +/// use futures::future::{lazy, poll_fn}; +/// +/// use std::sync::mpsc; +/// use std::thread; +/// use std::time::Duration; +/// +/// pub fn main() { +/// // This is a *blocking* channel +/// let (tx, rx) = mpsc::channel(); +/// +/// // Spawn a thread to send a message +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// tx.send("hello").unwrap(); +/// }); +/// +/// let pool = ThreadPool::new(); +/// +/// pool.spawn(lazy(move || { +/// // Because `blocking` returns `Poll`, it is intended to be used +/// // from the context of a `Future` implementation. Since we don't +/// // have a complicated requirement, we can use `poll_fn` in this +/// // case. +/// poll_fn(move || { +/// blocking(|| { +/// let msg = rx.recv().unwrap(); +/// println!("message = {}", msg); +/// }).map_err(|_| panic!("the threadpool shut down")) +/// }) +/// })); +/// +/// // Wait for the task we just spawned to complete. +/// pool.shutdown_on_idle().wait().unwrap(); +/// } +/// ``` +pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError> +where + F: FnOnce() -> T, +{ + CURRENT.with(|cell| { + let blocking = cell.get(); + + // Object-safety workaround: the `Blocking` trait must be object-safe, + // since we use a trait object in the thread-local. However, a blocking + // _operation_ will be generic over the return type of the blocking + // function. Therefore, rather than passing a function with a return + // type to `Blocking::run_blocking`, we pass a _new_ closure which + // doesn't have a return value. That closure invokes the blocking + // function and assigns its value to `ret`, which we then unpack when + // the blocking call finishes. + let mut f = Some(f); + let mut ret = None; + { + let ret2 = &mut ret; + let mut run = move || { + let f = f + .take() + .expect("blocking closure invoked twice; this is a bug!"); + *ret2 = Some((f)()); + }; + + try_ready!((blocking)(&mut run)); + } + + // Return the result + let ret = + ret.expect("blocking function finished, but return value was unset; this is a bug!"); + Ok(ret.into()) + }) +} + +// === impl DefaultGuard === + +impl<'a> fmt::Debug for DefaultGuard<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("DefaultGuard { .. }") + } +} + +impl<'a> Drop for DefaultGuard<'a> { + fn drop(&mut self) { + // if the TLS value has already been torn down, there's nothing else we + // can do. we're almost certainly panicking anyway. + let _ = CURRENT.try_with(|cell| { + cell.set(self.prior); + }); + } +} diff --git a/third_party/rust/tokio-threadpool/src/blocking/mod.rs b/third_party/rust/tokio-threadpool/src/blocking/mod.rs new file mode 100644 index 0000000000..27151d44b8 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/blocking/mod.rs @@ -0,0 +1,88 @@ +use worker::Worker; + +use futures::{Async, Poll}; +use tokio_executor; + +use std::error::Error; +use std::fmt; + +mod global; +pub use self::global::blocking; +#[doc(hidden)] +pub use self::global::{set_default, with_default, DefaultGuard}; + +/// Error raised by `blocking`. +pub struct BlockingError { + _p: (), +} + +/// A function implementing the behavior run on calls to `blocking`. +/// +/// **NOTE:** This is intended specifically for use by `tokio` 0.2's +/// backwards-compatibility layer. In general, user code should not override the +/// blocking implementation. If you use this, make sure you know what you're +/// doing. +#[doc(hidden)] +pub type BlockingImpl = fn(&mut dyn FnMut()) -> Poll<(), BlockingError>; + +fn default_blocking(f: &mut dyn FnMut()) -> Poll<(), BlockingError> { + let res = Worker::with_current(|worker| { + let worker = match worker { + Some(worker) => worker, + None => { + return Err(BlockingError::new()); + } + }; + + // Transition the worker state to blocking. This will exit the fn early + // with `NotReady` if the pool does not have enough capacity to enter + // blocking mode. + worker.transition_to_blocking() + }); + + // If the transition cannot happen, exit early + try_ready!(res); + + // Currently in blocking mode, so call the inner closure. + // + // "Exit" the current executor in case the blocking function wants + // to call a different executor. + tokio_executor::exit(move || (f)()); + + // Try to transition out of blocking mode. This is a fast path that takes + // back ownership of the worker if the worker handoff didn't complete yet. + Worker::with_current(|worker| { + // Worker must be set since it was above. + worker.unwrap().transition_from_blocking(); + }); + + Ok(Async::Ready(())) +} + +impl BlockingError { + /// Returns a new `BlockingError`. + #[doc(hidden)] + pub fn new() -> Self { + Self { _p: () } + } +} + +impl fmt::Display for BlockingError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl fmt::Debug for BlockingError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BlockingError") + .field("reason", &self.description()) + .finish() + } +} + +impl Error for BlockingError { + fn description(&self) -> &str { + "`blocking` annotation used from outside the context of a thread pool" + } +} |