diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/tokio/src/coop.rs | 284 |
1 files changed, 284 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/coop.rs b/third_party/rust/tokio/src/coop.rs new file mode 100644 index 0000000000..96905319b6 --- /dev/null +++ b/third_party/rust/tokio/src/coop.rs @@ -0,0 +1,284 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + +//! Yield points for improved cooperative scheduling. +//! +//! Documentation for this can be found in the [`tokio::task`] module. +//! +//! [`tokio::task`]: crate::task. + +// ```ignore +// # use tokio_stream::{Stream, StreamExt}; +// async fn drop_all<I: Stream + Unpin>(mut input: I) { +// while let Some(_) = input.next().await { +// tokio::coop::proceed().await; +// } +// } +// ``` +// +// The `proceed` future will coordinate with the executor to make sure that +// every so often control is yielded back to the executor so it can run other +// tasks. +// +// # Placing yield points +// +// Voluntary yield points should be placed _after_ at least some work has been +// done. If they are not, a future sufficiently deep in the task hierarchy may +// end up _never_ getting to run because of the number of yield points that +// inevitably appear before it is reached. In general, you will want yield +// points to only appear in "leaf" futures -- those that do not themselves poll +// other futures. By doing this, you avoid double-counting each iteration of +// the outer future against the cooperating budget. + +use std::cell::Cell; + +thread_local! { + static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained()); +} + +/// Opaque type tracking the amount of "work" a task may still do before +/// yielding back to the scheduler. +#[derive(Debug, Copy, Clone)] +pub(crate) struct Budget(Option<u8>); + +impl Budget { + /// Budget assigned to a task on each poll. + /// + /// The value itself is chosen somewhat arbitrarily. It needs to be high + /// enough to amortize wakeup and scheduling costs, but low enough that we + /// do not starve other tasks for too long. The value also needs to be high + /// enough that particularly deep tasks are able to do at least some useful + /// work at all. + /// + /// Note that as more yield points are added in the ecosystem, this value + /// will probably also have to be raised. + const fn initial() -> Budget { + Budget(Some(128)) + } + + /// Returns an unconstrained budget. Operations will not be limited. + const fn unconstrained() -> Budget { + Budget(None) + } + + fn has_remaining(self) -> bool { + self.0.map(|budget| budget > 0).unwrap_or(true) + } +} + +/// Runs the given closure with a cooperative task budget. When the function +/// returns, the budget is reset to the value prior to calling the function. +#[inline(always)] +pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { + with_budget(Budget::initial(), f) +} + +/// Runs the given closure with an unconstrained task budget. When the function returns, the budget +/// is reset to the value prior to calling the function. +#[inline(always)] +pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { + with_budget(Budget::unconstrained(), f) +} + +#[inline(always)] +fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { + struct ResetGuard<'a> { + cell: &'a Cell<Budget>, + prev: Budget, + } + + impl<'a> Drop for ResetGuard<'a> { + fn drop(&mut self) { + self.cell.set(self.prev); + } + } + + CURRENT.with(move |cell| { + let prev = cell.get(); + + cell.set(budget); + + let _guard = ResetGuard { cell, prev }; + + f() + }) +} + +#[inline(always)] +pub(crate) fn has_budget_remaining() -> bool { + CURRENT.with(|cell| cell.get().has_remaining()) +} + +cfg_rt_multi_thread! { + /// Sets the current task's budget. + pub(crate) fn set(budget: Budget) { + CURRENT.with(|cell| cell.set(budget)) + } +} + +cfg_rt! { + /// Forcibly removes the budgeting constraints early. + /// + /// Returns the remaining budget + pub(crate) fn stop() -> Budget { + CURRENT.with(|cell| { + let prev = cell.get(); + cell.set(Budget::unconstrained()); + prev + }) + } +} + +cfg_coop! { + use std::task::{Context, Poll}; + + #[must_use] + pub(crate) struct RestoreOnPending(Cell<Budget>); + + impl RestoreOnPending { + pub(crate) fn made_progress(&self) { + self.0.set(Budget::unconstrained()); + } + } + + impl Drop for RestoreOnPending { + fn drop(&mut self) { + // Don't reset if budget was unconstrained or if we made progress. + // They are both represented as the remembered budget being unconstrained. + let budget = self.0.get(); + if !budget.is_unconstrained() { + CURRENT.with(|cell| { + cell.set(budget); + }); + } + } + } + + /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. + /// + /// When you call this method, the current budget is decremented. However, to ensure that + /// progress is made every time a task is polled, the budget is automatically restored to its + /// former value if the returned `RestoreOnPending` is dropped. It is the caller's + /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure + /// that the budget empties appropriately. + /// + /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**. + /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and + /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates + /// that progress was made. + #[inline] + pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> { + CURRENT.with(|cell| { + let mut budget = cell.get(); + + if budget.decrement() { + let restore = RestoreOnPending(Cell::new(cell.get())); + cell.set(budget); + Poll::Ready(restore) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + } + + impl Budget { + /// Decrements the budget. Returns `true` if successful. Decrementing fails + /// when there is not enough remaining budget. + fn decrement(&mut self) -> bool { + if let Some(num) = &mut self.0 { + if *num > 0 { + *num -= 1; + true + } else { + false + } + } else { + true + } + } + + fn is_unconstrained(self) -> bool { + self.0.is_none() + } + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test as test; + + fn get() -> Budget { + CURRENT.with(|cell| cell.get()) + } + + #[test] + fn bugeting() { + use futures::future::poll_fn; + use tokio_test::*; + + assert!(get().0.is_none()); + + let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + + assert!(get().0.is_none()); + drop(coop); + assert!(get().0.is_none()); + + budget(|| { + assert_eq!(get().0, Budget::initial().0); + + let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); + drop(coop); + // we didn't make progress + assert_eq!(get().0, Budget::initial().0); + + let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); + coop.made_progress(); + drop(coop); + // we _did_ make progress + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); + + let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); + coop.made_progress(); + drop(coop); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); + + budget(|| { + assert_eq!(get().0, Budget::initial().0); + + let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); + coop.made_progress(); + drop(coop); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); + }); + + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); + }); + + assert!(get().0.is_none()); + + budget(|| { + let n = get().0.unwrap(); + + for _ in 0..n { + let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + coop.made_progress(); + } + + let mut task = task::spawn(poll_fn(|cx| { + let coop = ready!(poll_proceed(cx)); + coop.made_progress(); + Poll::Ready(()) + })); + + assert_pending!(task.poll()); + }); + } +} |