summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/blocking
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/blocking
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/blocking')
-rw-r--r--vendor/tokio/src/runtime/blocking/mod.rs36
-rw-r--r--vendor/tokio/src/runtime/blocking/pool.rs398
-rw-r--r--vendor/tokio/src/runtime/blocking/schedule.rs50
-rw-r--r--vendor/tokio/src/runtime/blocking/shutdown.rs8
-rw-r--r--vendor/tokio/src/runtime/blocking/task.rs6
5 files changed, 382 insertions, 116 deletions
diff --git a/vendor/tokio/src/runtime/blocking/mod.rs b/vendor/tokio/src/runtime/blocking/mod.rs
index fece3c279..c42924be7 100644
--- a/vendor/tokio/src/runtime/blocking/mod.rs
+++ b/vendor/tokio/src/runtime/blocking/mod.rs
@@ -6,37 +6,21 @@
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
+cfg_fs! {
+ pub(crate) use pool::spawn_mandatory_blocking;
+}
+
+cfg_trace! {
+ pub(crate) use pool::Mandatory;
+}
+
mod schedule;
mod shutdown;
-pub(crate) mod task;
+mod task;
+pub(crate) use task::BlockingTask;
use crate::runtime::Builder;
pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
BlockingPool::new(builder, thread_cap)
}
-
-/*
-cfg_not_blocking_impl! {
- use crate::runtime::Builder;
- use std::time::Duration;
-
- #[derive(Debug, Clone)]
- pub(crate) struct BlockingPool {}
-
- pub(crate) use BlockingPool as Spawner;
-
- pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
- BlockingPool {}
- }
-
- impl BlockingPool {
- pub(crate) fn spawner(&self) -> &BlockingPool {
- self
- }
-
- pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
- }
- }
-}
-*/
diff --git a/vendor/tokio/src/runtime/blocking/pool.rs b/vendor/tokio/src/runtime/blocking/pool.rs
index b7d725128..a23b0a0d2 100644
--- a/vendor/tokio/src/runtime/blocking/pool.rs
+++ b/vendor/tokio/src/runtime/blocking/pool.rs
@@ -2,16 +2,16 @@
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
-use crate::runtime::blocking::schedule::NoopSchedule;
-use crate::runtime::blocking::shutdown;
+use crate::runtime::blocking::schedule::BlockingSchedule;
+use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
-use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
-use crate::util::error::CONTEXT_MISSING_ERROR;
use std::collections::{HashMap, VecDeque};
use std::fmt;
+use std::io;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
pub(crate) struct BlockingPool {
@@ -24,36 +24,84 @@ pub(crate) struct Spawner {
inner: Arc<Inner>,
}
+#[derive(Default)]
+pub(crate) struct SpawnerMetrics {
+ num_threads: AtomicUsize,
+ num_idle_threads: AtomicUsize,
+ queue_depth: AtomicUsize,
+}
+
+impl SpawnerMetrics {
+ fn num_threads(&self) -> usize {
+ self.num_threads.load(Ordering::Relaxed)
+ }
+
+ fn num_idle_threads(&self) -> usize {
+ self.num_idle_threads.load(Ordering::Relaxed)
+ }
+
+ cfg_metrics! {
+ fn queue_depth(&self) -> usize {
+ self.queue_depth.load(Ordering::Relaxed)
+ }
+ }
+
+ fn inc_num_threads(&self) {
+ self.num_threads.fetch_add(1, Ordering::Relaxed);
+ }
+
+ fn dec_num_threads(&self) {
+ self.num_threads.fetch_sub(1, Ordering::Relaxed);
+ }
+
+ fn inc_num_idle_threads(&self) {
+ self.num_idle_threads.fetch_add(1, Ordering::Relaxed);
+ }
+
+ fn dec_num_idle_threads(&self) -> usize {
+ self.num_idle_threads.fetch_sub(1, Ordering::Relaxed)
+ }
+
+ fn inc_queue_depth(&self) {
+ self.queue_depth.fetch_add(1, Ordering::Relaxed);
+ }
+
+ fn dec_queue_depth(&self) {
+ self.queue_depth.fetch_sub(1, Ordering::Relaxed);
+ }
+}
+
struct Inner {
- /// State shared between worker threads
+ /// State shared between worker threads.
shared: Mutex<Shared>,
/// Pool threads wait on this.
condvar: Condvar,
- /// Spawned threads use this name
+ /// Spawned threads use this name.
thread_name: ThreadNameFn,
- /// Spawned thread stack size
+ /// Spawned thread stack size.
stack_size: Option<usize>,
- /// Call after a thread starts
+ /// Call after a thread starts.
after_start: Option<Callback>,
- /// Call before a thread stops
+ /// Call before a thread stops.
before_stop: Option<Callback>,
- // Maximum number of threads
+ // Maximum number of threads.
thread_cap: usize,
- // Customizable wait timeout
+ // Customizable wait timeout.
keep_alive: Duration,
+
+ // Metrics about the pool.
+ metrics: SpawnerMetrics,
}
struct Shared {
queue: VecDeque<Task>,
- num_th: usize,
- num_idle: u32,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
@@ -67,24 +115,93 @@ struct Shared {
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
- /// benefit)
+ /// benefit).
worker_thread_index: usize,
}
-type Task = task::Notified<NoopSchedule>;
+pub(crate) struct Task {
+ task: task::UnownedTask<BlockingSchedule>,
+ mandatory: Mandatory,
+}
+
+#[derive(PartialEq, Eq)]
+pub(crate) enum Mandatory {
+ #[cfg_attr(not(fs), allow(dead_code))]
+ Mandatory,
+ NonMandatory,
+}
+
+pub(crate) enum SpawnError {
+ /// Pool is shutting down and the task was not scheduled
+ ShuttingDown,
+ /// There are no worker threads available to take the task
+ /// and the OS failed to spawn a new one
+ NoThreads(io::Error),
+}
+
+impl From<SpawnError> for io::Error {
+ fn from(e: SpawnError) -> Self {
+ match e {
+ SpawnError::ShuttingDown => {
+ io::Error::new(io::ErrorKind::Other, "blocking pool shutting down")
+ }
+ SpawnError::NoThreads(e) => e,
+ }
+ }
+}
+
+impl Task {
+ pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task {
+ Task { task, mandatory }
+ }
+
+ fn run(self) {
+ self.task.run();
+ }
+
+ fn shutdown_or_run_if_mandatory(self) {
+ match self.mandatory {
+ Mandatory::NonMandatory => self.task.shutdown(),
+ Mandatory::Mandatory => self.task.run(),
+ }
+ }
+}
const KEEP_ALIVE: Duration = Duration::from_secs(10);
-/// Run the provided function on an executor dedicated to blocking operations.
+/// Runs the provided function on an executor dedicated to blocking operations.
+/// Tasks will be scheduled as non-mandatory, meaning they may not get executed
+/// in case of runtime shutdown.
+#[track_caller]
+#[cfg_attr(tokio_wasi, allow(dead_code))]
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- let rt = context::current().expect(CONTEXT_MISSING_ERROR);
+ let rt = Handle::current();
rt.spawn_blocking(func)
}
+cfg_fs! {
+ #[cfg_attr(any(
+ all(loom, not(test)), // the function is covered by loom tests
+ test
+ ), allow(dead_code))]
+ /// Runs the provided function on an executor dedicated to blocking
+ /// operations. Tasks will be scheduled as mandatory, meaning they are
+ /// guaranteed to run unless a shutdown is already taking place. In case a
+ /// shutdown is already taking place, `None` will be returned.
+ pub(crate) fn spawn_mandatory_blocking<F, R>(func: F) -> Option<JoinHandle<R>>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let rt = Handle::current();
+ rt.inner.blocking_spawner().spawn_mandatory_blocking(&rt, func)
+ }
+}
+
// ===== impl BlockingPool =====
impl BlockingPool {
@@ -97,8 +214,6 @@ impl BlockingPool {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
- num_th: 0,
- num_idle: 0,
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
@@ -113,6 +228,7 @@ impl BlockingPool {
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
+ metrics: Default::default(),
}),
},
shutdown_rx,
@@ -172,53 +288,164 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====
impl Spawner {
- pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
- let shutdown_tx = {
- let mut shared = self.inner.shared.lock();
-
- if shared.shutdown {
- // Shutdown the task
- task.shutdown();
-
- // no need to even push this task; it would never get picked up
- return Err(());
+ #[track_caller]
+ pub(crate) fn spawn_blocking<F, R>(&self, rt: &Handle, func: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let (join_handle, spawn_result) =
+ if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
+ self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt)
+ } else {
+ self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt)
+ };
+
+ match spawn_result {
+ Ok(()) => join_handle,
+ // Compat: do not panic here, return the join_handle even though it will never resolve
+ Err(SpawnError::ShuttingDown) => join_handle,
+ Err(SpawnError::NoThreads(e)) => {
+ panic!("OS can't spawn worker thread: {}", e)
}
+ }
+ }
- shared.queue.push_back(task);
-
- if shared.num_idle == 0 {
- // No threads are able to process the task.
-
- if shared.num_th == self.inner.thread_cap {
- // At max number of threads
- None
- } else {
- shared.num_th += 1;
- assert!(shared.shutdown_tx.is_some());
- shared.shutdown_tx.clone()
- }
+ cfg_fs! {
+ #[track_caller]
+ #[cfg_attr(any(
+ all(loom, not(test)), // the function is covered by loom tests
+ test
+ ), allow(dead_code))]
+ pub(crate) fn spawn_mandatory_blocking<F, R>(&self, rt: &Handle, func: F) -> Option<JoinHandle<R>>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
+ self.spawn_blocking_inner(
+ Box::new(func),
+ Mandatory::Mandatory,
+ None,
+ rt,
+ )
+ } else {
+ self.spawn_blocking_inner(
+ func,
+ Mandatory::Mandatory,
+ None,
+ rt,
+ )
+ };
+
+ if spawn_result.is_ok() {
+ Some(join_handle)
} else {
- // Notify an idle worker thread. The notification counter
- // is used to count the needed amount of notifications
- // exactly. Thread libraries may generate spurious
- // wakeups, this counter is used to keep us in a
- // consistent state.
- shared.num_idle -= 1;
- shared.num_notify += 1;
- self.inner.condvar.notify_one();
None
}
+ }
+ }
+
+ #[track_caller]
+ pub(crate) fn spawn_blocking_inner<F, R>(
+ &self,
+ func: F,
+ is_mandatory: Mandatory,
+ name: Option<&str>,
+ rt: &Handle,
+ ) -> (JoinHandle<R>, Result<(), SpawnError>)
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let fut = BlockingTask::new(func);
+ let id = task::Id::next();
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let fut = {
+ use tracing::Instrument;
+ let location = std::panic::Location::caller();
+ let span = tracing::trace_span!(
+ target: "tokio::task::blocking",
+ "runtime.spawn",
+ kind = %"blocking",
+ task.name = %name.unwrap_or_default(),
+ task.id = id.as_u64(),
+ "fn" = %std::any::type_name::<F>(),
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+ fut.instrument(span)
};
- if let Some(shutdown_tx) = shutdown_tx {
- let mut shared = self.inner.shared.lock();
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let _ = name;
- let id = shared.worker_thread_index;
- shared.worker_thread_index += 1;
+ let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
- let handle = self.spawn_thread(shutdown_tx, rt, id);
+ let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
+ (handle, spawned)
+ }
- shared.worker_threads.insert(id, handle);
+ fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
+ let mut shared = self.inner.shared.lock();
+
+ if shared.shutdown {
+ // Shutdown the task: it's fine to shutdown this task (even if
+ // mandatory) because it was scheduled after the shutdown of the
+ // runtime began.
+ task.task.shutdown();
+
+ // no need to even push this task; it would never get picked up
+ return Err(SpawnError::ShuttingDown);
+ }
+
+ shared.queue.push_back(task);
+ self.inner.metrics.inc_queue_depth();
+
+ if self.inner.metrics.num_idle_threads() == 0 {
+ // No threads are able to process the task.
+
+ if self.inner.metrics.num_threads() == self.inner.thread_cap {
+ // At max number of threads
+ } else {
+ assert!(shared.shutdown_tx.is_some());
+ let shutdown_tx = shared.shutdown_tx.clone();
+
+ if let Some(shutdown_tx) = shutdown_tx {
+ let id = shared.worker_thread_index;
+
+ match self.spawn_thread(shutdown_tx, rt, id) {
+ Ok(handle) => {
+ self.inner.metrics.inc_num_threads();
+ shared.worker_thread_index += 1;
+ shared.worker_threads.insert(id, handle);
+ }
+ Err(ref e)
+ if is_temporary_os_thread_error(e)
+ && self.inner.metrics.num_threads() > 0 =>
+ {
+ // OS temporarily failed to spawn a new thread.
+ // The task will be picked up eventually by a currently
+ // busy thread.
+ }
+ Err(e) => {
+ // The OS refused to spawn the thread and there is no thread
+ // to pick up the task that has just been pushed to the queue.
+ return Err(SpawnError::NoThreads(e));
+ }
+ }
+ }
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ self.inner.metrics.dec_num_idle_threads();
+ shared.num_notify += 1;
+ self.inner.condvar.notify_one();
}
Ok(())
@@ -229,7 +456,7 @@ impl Spawner {
shutdown_tx: shutdown::Sender,
rt: &Handle,
id: usize,
- ) -> thread::JoinHandle<()> {
+ ) -> std::io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
if let Some(stack_size) = self.inner.stack_size {
@@ -238,17 +465,37 @@ impl Spawner {
let rt = rt.clone();
- builder
- .spawn(move || {
- // Only the reference should be moved into the closure
- let _enter = crate::runtime::context::enter(rt.clone());
- rt.blocking_spawner.inner.run(id);
- drop(shutdown_tx);
- })
- .unwrap()
+ builder.spawn(move || {
+ // Only the reference should be moved into the closure
+ let _enter = rt.enter();
+ rt.inner.blocking_spawner().inner.run(id);
+ drop(shutdown_tx);
+ })
+ }
+}
+
+cfg_metrics! {
+ impl Spawner {
+ pub(crate) fn num_threads(&self) -> usize {
+ self.inner.metrics.num_threads()
+ }
+
+ pub(crate) fn num_idle_threads(&self) -> usize {
+ self.inner.metrics.num_idle_threads()
+ }
+
+ pub(crate) fn queue_depth(&self) -> usize {
+ self.inner.metrics.queue_depth()
+ }
}
}
+// Tells whether the error when spawning a thread is temporary.
+#[inline]
+fn is_temporary_os_thread_error(error: &std::io::Error) -> bool {
+ matches!(error.kind(), std::io::ErrorKind::WouldBlock)
+}
+
impl Inner {
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
@@ -261,6 +508,7 @@ impl Inner {
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
+ self.metrics.dec_queue_depth();
drop(shared);
task.run();
@@ -268,7 +516,7 @@ impl Inner {
}
// IDLE
- shared.num_idle += 1;
+ self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
@@ -302,8 +550,10 @@ impl Inner {
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
+ self.metrics.dec_queue_depth();
drop(shared);
- task.shutdown();
+
+ task.shutdown_or_run_if_mandatory();
shared = self.shared.lock();
}
@@ -311,7 +561,7 @@ impl Inner {
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
- shared.num_idle += 1;
+ self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
@@ -319,17 +569,17 @@ impl Inner {
}
// Thread exit
- shared.num_th -= 1;
+ self.metrics.dec_num_threads();
// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
- shared.num_idle = shared
- .num_idle
- .checked_sub(1)
- .expect("num_idle underflowed on thread exit");
+ let prev_idle = self.metrics.dec_num_idle_threads();
+ if prev_idle < self.metrics.num_idle_threads() {
+ panic!("num_idle_threads underflowed on thread exit")
+ }
- if shared.shutdown && shared.num_th == 0 {
+ if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}
diff --git a/vendor/tokio/src/runtime/blocking/schedule.rs b/vendor/tokio/src/runtime/blocking/schedule.rs
index 4e044ab29..edf775be8 100644
--- a/vendor/tokio/src/runtime/blocking/schedule.rs
+++ b/vendor/tokio/src/runtime/blocking/schedule.rs
@@ -1,20 +1,52 @@
+#[cfg(feature = "test-util")]
+use crate::runtime::scheduler;
use crate::runtime::task::{self, Task};
+use crate::runtime::Handle;
-/// `task::Schedule` implementation that does nothing. This is unique to the
-/// blocking scheduler as tasks scheduled are not really futures but blocking
-/// operations.
+/// `task::Schedule` implementation that does nothing (except some bookkeeping
+/// in test-util builds). This is unique to the blocking scheduler as tasks
+/// scheduled are not really futures but blocking operations.
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
-/// in `release.
-pub(crate) struct NoopSchedule;
+/// in `release`.
+pub(crate) struct BlockingSchedule {
+ #[cfg(feature = "test-util")]
+ handle: Handle,
+}
-impl task::Schedule for NoopSchedule {
- fn bind(_task: Task<Self>) -> NoopSchedule {
- // Do nothing w/ the task
- NoopSchedule
+impl BlockingSchedule {
+ #[cfg_attr(not(feature = "test-util"), allow(unused_variables))]
+ pub(crate) fn new(handle: &Handle) -> Self {
+ #[cfg(feature = "test-util")]
+ {
+ match &handle.inner {
+ scheduler::Handle::CurrentThread(handle) => {
+ handle.driver.clock.inhibit_auto_advance();
+ }
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Handle::MultiThread(_) => {}
+ }
+ }
+ BlockingSchedule {
+ #[cfg(feature = "test-util")]
+ handle: handle.clone(),
+ }
}
+}
+impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ #[cfg(feature = "test-util")]
+ {
+ match &self.handle.inner {
+ scheduler::Handle::CurrentThread(handle) => {
+ handle.driver.clock.allow_auto_advance();
+ handle.driver.unpark();
+ }
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Handle::MultiThread(_) => {}
+ }
+ }
None
}
diff --git a/vendor/tokio/src/runtime/blocking/shutdown.rs b/vendor/tokio/src/runtime/blocking/shutdown.rs
index 0cf22859b..fe5abae07 100644
--- a/vendor/tokio/src/runtime/blocking/shutdown.rs
+++ b/vendor/tokio/src/runtime/blocking/shutdown.rs
@@ -10,7 +10,7 @@ use std::time::Duration;
#[derive(Debug, Clone)]
pub(super) struct Sender {
- tx: Arc<oneshot::Sender<()>>,
+ _tx: Arc<oneshot::Sender<()>>,
}
#[derive(Debug)]
@@ -20,7 +20,7 @@ pub(super) struct Receiver {
pub(super) fn channel() -> (Sender, Receiver) {
let (tx, rx) = oneshot::channel();
- let tx = Sender { tx: Arc::new(tx) };
+ let tx = Sender { _tx: Arc::new(tx) };
let rx = Receiver { rx };
(tx, rx)
@@ -35,13 +35,13 @@ impl Receiver {
///
/// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
- use crate::runtime::enter::try_enter;
+ use crate::runtime::context::try_enter_blocking_region;
if timeout == Some(Duration::from_nanos(0)) {
return false;
}
- let mut e = match try_enter(false) {
+ let mut e = match try_enter_blocking_region() {
Some(enter) => enter,
_ => {
if std::thread::panicking() {
diff --git a/vendor/tokio/src/runtime/blocking/task.rs b/vendor/tokio/src/runtime/blocking/task.rs
index ee2d8d6d6..c44617540 100644
--- a/vendor/tokio/src/runtime/blocking/task.rs
+++ b/vendor/tokio/src/runtime/blocking/task.rs
@@ -2,13 +2,13 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
-/// Converts a function to a future that completes on poll
+/// Converts a function to a future that completes on poll.
pub(crate) struct BlockingTask<T> {
func: Option<T>,
}
impl<T> BlockingTask<T> {
- /// Initializes a new blocking task from the given function
+ /// Initializes a new blocking task from the given function.
pub(crate) fn new(func: T) -> BlockingTask<T> {
BlockingTask { func: Some(func) }
}
@@ -37,7 +37,7 @@ where
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
- crate::coop::stop();
+ crate::runtime::coop::stop();
Poll::Ready(func())
}