diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-threadpool/src/pool | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/pool')
-rw-r--r-- | third_party/rust/tokio-threadpool/src/pool/backup.rs | 308 | ||||
-rw-r--r-- | third_party/rust/tokio-threadpool/src/pool/backup_stack.rs | 191 | ||||
-rw-r--r-- | third_party/rust/tokio-threadpool/src/pool/mod.rs | 475 | ||||
-rw-r--r-- | third_party/rust/tokio-threadpool/src/pool/state.rs | 132 |
4 files changed, 1106 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/pool/backup.rs b/third_party/rust/tokio-threadpool/src/pool/backup.rs new file mode 100644 index 0000000000..e94e95d6f2 --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/backup.rs @@ -0,0 +1,308 @@ +use park::DefaultPark; +use worker::WorkerId; + +use std::cell::UnsafeCell; +use std::fmt; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed}; +use std::time::{Duration, Instant}; + +/// State associated with a thread in the thread pool. +/// +/// The pool manages a number of threads. Some of those threads are considered +/// "primary" threads and process the work queue. When a task being run on a +/// primary thread enters a blocking context, the responsibility of processing +/// the work queue must be handed off to another thread. This is done by first +/// checking for idle threads on the backup stack. If one is found, the worker +/// token (`WorkerId`) is handed off to that running thread. If none are found, +/// a new thread is spawned. +/// +/// This state manages the exchange. A thread that is idle, not assigned to a +/// work queue, sits around for a specified amount of time. When the worker +/// token is handed off, it is first stored in `handoff`. The backup thread is +/// then signaled. At this point, the backup thread wakes up from sleep and +/// reads `handoff`. At that point, it has been promoted to a primary thread and +/// will begin processing inbound work on the work queue. +/// +/// The name `Backup` isn't really great for what the type does, but I have not +/// come up with a better name... Maybe it should just be named `Thread`. +#[derive(Debug)] +pub(crate) struct Backup { + /// Worker ID that is being handed to this thread. + handoff: UnsafeCell<Option<WorkerId>>, + + /// Thread state. + /// + /// This tracks: + /// + /// * Is queued flag + /// * If the pool is shutting down. + /// * If the thread is running + state: AtomicUsize, + + /// Next entry in the Treiber stack. + next_sleeper: UnsafeCell<BackupId>, + + /// Used to put the thread to sleep + park: DefaultPark, +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub(crate) struct BackupId(pub(crate) usize); + +#[derive(Debug)] +pub(crate) enum Handoff { + Worker(WorkerId), + Idle, + Terminated, +} + +/// Tracks thread state. +#[derive(Clone, Copy, Eq, PartialEq)] +struct State(usize); + +/// Set when the worker is pushed onto the scheduler's stack of sleeping +/// threads. +/// +/// This flag also serves as a "notification" bit. If another thread is +/// attempting to hand off a worker to the backup thread, then the pushed bit +/// will not be set when the thread tries to shutdown. +pub const PUSHED: usize = 0b001; + +/// Set when the thread is running +pub const RUNNING: usize = 0b010; + +/// Set when the thread pool has terminated +pub const TERMINATED: usize = 0b100; + +// ===== impl Backup ===== + +impl Backup { + pub fn new() -> Backup { + Backup { + handoff: UnsafeCell::new(None), + state: AtomicUsize::new(State::new().into()), + next_sleeper: UnsafeCell::new(BackupId(0)), + park: DefaultPark::new(), + } + } + + /// Called when the thread is starting + pub fn start(&self, worker_id: &WorkerId) { + debug_assert!({ + let state: State = self.state.load(Relaxed).into(); + + debug_assert!(!state.is_pushed()); + debug_assert!(state.is_running()); + debug_assert!(!state.is_terminated()); + + true + }); + + // The handoff value is equal to `worker_id` + debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); + + unsafe { + *self.handoff.get() = None; + } + } + + pub fn is_running(&self) -> bool { + let state: State = self.state.load(Relaxed).into(); + state.is_running() + } + + /// Hands off the worker to a thread. + /// + /// Returns `true` if the thread needs to be spawned. + pub fn worker_handoff(&self, worker_id: WorkerId) -> bool { + unsafe { + // The backup worker should not already have been handoff a worker. + debug_assert!((*self.handoff.get()).is_none()); + + // Set the handoff + *self.handoff.get() = Some(worker_id); + } + + // This *probably* can just be `Release`... memory orderings, how do + // they work? + let prev = State::worker_handoff(&self.state); + debug_assert!(prev.is_pushed()); + + if prev.is_running() { + // Wakeup the backup thread + self.park.notify(); + false + } else { + true + } + } + + /// Terminate the worker + pub fn signal_stop(&self) { + let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into(); + + debug_assert!(!prev.is_terminated()); + debug_assert!(prev.is_pushed()); + + if prev.is_running() { + self.park.notify(); + } + } + + /// Release the worker + pub fn release(&self) { + let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into(); + + debug_assert!(prev.is_running()); + } + + /// Wait for a worker handoff + pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff { + let sleep_until = timeout.map(|dur| Instant::now() + dur); + let mut state: State = self.state.load(Acquire).into(); + + // Run in a loop since there can be spurious wakeups + loop { + if !state.is_pushed() { + if state.is_terminated() { + return Handoff::Terminated; + } + + let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") }; + return Handoff::Worker(worker_id); + } + + match sleep_until { + None => { + self.park.park_sync(None); + state = self.state.load(Acquire).into(); + } + Some(when) => { + let now = Instant::now(); + + if now < when { + self.park.park_sync(Some(when - now)); + state = self.state.load(Acquire).into(); + } else { + debug_assert!(state.is_running()); + + // Transition out of running + let mut next = state; + next.unset_running(); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + debug_assert!(!next.is_running()); + return Handoff::Idle; + } + + state = actual; + } + } + } + } + } + + pub fn is_pushed(&self) -> bool { + let state: State = self.state.load(Relaxed).into(); + state.is_pushed() + } + + pub fn set_pushed(&self, ordering: Ordering) { + let prev: State = self.state.fetch_or(PUSHED, ordering).into(); + debug_assert!(!prev.is_pushed()); + } + + #[inline] + pub fn next_sleeper(&self) -> BackupId { + unsafe { *self.next_sleeper.get() } + } + + #[inline] + pub fn set_next_sleeper(&self, val: BackupId) { + unsafe { + *self.next_sleeper.get() = val; + } + } +} + +// ===== impl State ===== + +impl State { + /// Returns a new, default, thread `State` + pub fn new() -> State { + State(0) + } + + /// Returns true if the thread entry is pushed in the sleeper stack + pub fn is_pushed(&self) -> bool { + self.0 & PUSHED == PUSHED + } + + fn unset_pushed(&mut self) { + self.0 &= !PUSHED; + } + + pub fn is_running(&self) -> bool { + self.0 & RUNNING == RUNNING + } + + pub fn set_running(&mut self) { + self.0 |= RUNNING; + } + + pub fn unset_running(&mut self) { + self.0 &= !RUNNING; + } + + pub fn is_terminated(&self) -> bool { + self.0 & TERMINATED == TERMINATED + } + + fn worker_handoff(state: &AtomicUsize) -> State { + let mut curr: State = state.load(Acquire).into(); + + loop { + let mut next = curr; + next.set_running(); + next.unset_pushed(); + + let actual = state + .compare_and_swap(curr.into(), next.into(), AcqRel) + .into(); + + if actual == curr { + return curr; + } + + curr = actual; + } + } +} + +impl From<usize> for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("backup::State") + .field("is_pushed", &self.is_pushed()) + .field("is_running", &self.is_running()) + .field("is_terminated", &self.is_terminated()) + .finish() + } +} diff --git a/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs new file mode 100644 index 0000000000..b9a46d08ef --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs @@ -0,0 +1,191 @@ +use pool::{Backup, BackupId}; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; + +#[derive(Debug)] +pub(crate) struct BackupStack { + state: AtomicUsize, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +struct State(usize); + +pub(crate) const MAX_BACKUP: usize = 1 << 15; + +/// Extracts the head of the backup stack from the state +const STACK_MASK: usize = ((1 << 16) - 1); + +/// Used to mark the stack as empty +pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP); + +/// Used to mark the stack as terminated +pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1); + +/// How many bits the Treiber ABA guard is offset by +const ABA_GUARD_SHIFT: usize = 16; + +#[cfg(target_pointer_width = "64")] +const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; + +#[cfg(target_pointer_width = "32")] +const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; + +// ===== impl BackupStack ===== + +impl BackupStack { + pub fn new() -> BackupStack { + let state = AtomicUsize::new(State::new().into()); + BackupStack { state } + } + + /// Push a backup thread onto the stack + /// + /// # Return + /// + /// Returns `Ok` on success. + /// + /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. + /// When terminated, pushing new entries is no longer permitted. + pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> { + let mut state: State = self.state.load(Acquire).into(); + + entries[id.0].set_pushed(AcqRel); + + loop { + let mut next = state; + + let head = state.head(); + + if head == TERMINATED { + // The pool is terminated, cannot push the sleeper. + return Err(()); + } + + entries[id.0].set_next_sleeper(head); + next.set_head(id); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + return Ok(()); + } + + state = actual; + } + } + + /// Pop a backup thread off the stack. + /// + /// If `terminate` is set and the stack is empty when this function is + /// called, the state of the stack is transitioned to "terminated". At this + /// point, no further entries can be pushed onto the stack. + /// + /// # Return + /// + /// * Returns the index of the popped worker and the worker's observed + /// state. + /// + /// * `Ok(None)` if the stack is empty. + /// * `Err(_)` is returned if the pool has been shutdown. + pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> { + // Figure out the empty value + let terminal = match terminate { + true => TERMINATED, + false => EMPTY, + }; + + let mut state: State = self.state.load(Acquire).into(); + + loop { + let head = state.head(); + + if head == EMPTY { + let mut next = state; + next.set_head(terminal); + + if next == state { + debug_assert!(terminal == EMPTY); + return Ok(None); + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual != state { + state = actual; + continue; + } + + return Ok(None); + } else if head == TERMINATED { + return Err(()); + } + + debug_assert!(head.0 < MAX_BACKUP); + + let mut next = state; + + let next_head = entries[head.0].next_sleeper(); + + // TERMINATED can never be set as the "next pointer" on a worker. + debug_assert!(next_head != TERMINATED); + + if next_head == EMPTY { + next.set_head(terminal); + } else { + next.set_head(next_head); + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + debug_assert!(entries[head.0].is_pushed()); + return Ok(Some(head)); + } + + state = actual; + } + } +} + +// ===== impl State ===== + +impl State { + fn new() -> State { + State(EMPTY.0) + } + + fn head(&self) -> BackupId { + BackupId(self.0 & STACK_MASK) + } + + fn set_head(&mut self, val: BackupId) { + let val = val.0; + + // The ABA guard protects against the ABA problem w/ Treiber stacks + let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; + + self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} diff --git a/third_party/rust/tokio-threadpool/src/pool/mod.rs b/third_party/rust/tokio-threadpool/src/pool/mod.rs new file mode 100644 index 0000000000..0a42359b3c --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/mod.rs @@ -0,0 +1,475 @@ +mod backup; +mod backup_stack; +mod state; + +pub(crate) use self::backup::{Backup, BackupId}; +pub(crate) use self::backup_stack::MAX_BACKUP; +pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES}; + +use self::backup::Handoff; +use self::backup_stack::BackupStack; + +use config::Config; +use shutdown::ShutdownTrigger; +use task::{Blocking, Task}; +use worker::{self, Worker, WorkerId}; + +use futures::Poll; + +use std::cell::Cell; +use std::collections::hash_map::RandomState; +use std::hash::{BuildHasher, Hash, Hasher}; +use std::num::Wrapping; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::{Arc, Weak}; +use std::thread; + +use crossbeam_deque::Injector; +use crossbeam_utils::CachePadded; + +#[derive(Debug)] +pub(crate) struct Pool { + // Tracks the state of the thread pool (running, shutting down, ...). + // + // While workers check this field as a hint to detect shutdown, it is + // **not** used as a primary point of coordination for workers. The sleep + // stack is used as the primary point of coordination for workers. + // + // The value of this atomic is deserialized into a `pool::State` instance. + // See comments for that type. + pub state: CachePadded<AtomicUsize>, + + // Stack tracking sleeping workers. + sleep_stack: CachePadded<worker::Stack>, + + // Worker state + // + // A worker is a thread that is processing the work queue and polling + // futures. + // + // The number of workers will *usually* be small. + pub workers: Arc<[worker::Entry]>, + + // The global MPMC queue of tasks. + // + // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated + // task queues, they periodically steal tasks from this global queue, too. + pub queue: Arc<Injector<Arc<Task>>>, + + // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped. + // + // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new + // thread. + pub trigger: Weak<ShutdownTrigger>, + + // Backup thread state + // + // In order to efficiently support `blocking`, a pool of backup threads is + // needed. These backup threads are ready to take over a worker if the + // future being processed requires blocking. + backup: Box<[Backup]>, + + // Stack of sleeping backup threads + pub backup_stack: BackupStack, + + // State regarding coordinating blocking sections and tracking tasks that + // are pending blocking capacity. + blocking: Blocking, + + // Configuration + pub config: Config, +} + +impl Pool { + /// Create a new `Pool` + pub fn new( + workers: Arc<[worker::Entry]>, + trigger: Weak<ShutdownTrigger>, + max_blocking: usize, + config: Config, + queue: Arc<Injector<Arc<Task>>>, + ) -> Pool { + let pool_size = workers.len(); + let total_size = max_blocking + pool_size; + + // Create the set of backup entries + // + // This is `backup + pool_size` because the core thread pool running the + // workers is spawned from backup as well. + let backup = (0..total_size) + .map(|_| Backup::new()) + .collect::<Vec<_>>() + .into_boxed_slice(); + + let backup_stack = BackupStack::new(); + + for i in (0..backup.len()).rev() { + backup_stack.push(&backup, BackupId(i)).unwrap(); + } + + // Initialize the blocking state + let blocking = Blocking::new(max_blocking); + + let ret = Pool { + state: CachePadded::new(AtomicUsize::new(State::new().into())), + sleep_stack: CachePadded::new(worker::Stack::new()), + workers, + queue, + trigger, + backup, + backup_stack, + blocking, + config, + }; + + // Now, we prime the sleeper stack + for i in 0..pool_size { + ret.sleep_stack.push(&ret.workers, i).unwrap(); + } + + ret + } + + /// Start shutting down the pool. This means that no new futures will be + /// accepted. + pub fn shutdown(&self, now: bool, purge_queue: bool) { + let mut state: State = self.state.load(Acquire).into(); + + trace!("shutdown; state={:?}", state); + + // For now, this must be true + debug_assert!(!purge_queue || now); + + // Start by setting the shutdown flag + loop { + let mut next = state; + + let num_futures = next.num_futures(); + + if next.lifecycle() == Lifecycle::ShutdownNow { + // Already transitioned to shutting down state + + if !purge_queue || num_futures == 0 { + // Nothing more to do + return; + } + + // The queue must be purged + debug_assert!(purge_queue); + next.clear_num_futures(); + } else { + next.set_lifecycle(if now || num_futures == 0 { + // If already idle, always transition to shutdown now. + Lifecycle::ShutdownNow + } else { + Lifecycle::ShutdownOnIdle + }); + + if purge_queue { + next.clear_num_futures(); + } + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + state = next; + break; + } + + state = actual; + } + + trace!(" -> transitioned to shutdown"); + + // Only transition to terminate if there are no futures currently on the + // pool + if state.num_futures() != 0 { + return; + } + + self.terminate_sleeping_workers(); + } + + /// Called by `Worker` as it tries to enter a sleeping state. Before it + /// sleeps, it must push itself onto the sleep stack. This enables other + /// threads to see it when signaling work. + pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> { + self.sleep_stack.push(&self.workers, idx) + } + + pub fn terminate_sleeping_workers(&self) { + use worker::Lifecycle::Signaled; + + trace!(" -> shutting down workers"); + // Wakeup all sleeping workers. They will wake up, see the state + // transition, and terminate. + while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) { + self.workers[idx].signal_stop(worker_state); + } + + // Now terminate any backup threads + // + // The call to `pop` must be successful because shutting down the pool + // is coordinated and at this point, this is the only thread that will + // attempt to transition the backup stack to "terminated". + while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) { + self.backup[backup_id.0].signal_stop(); + } + } + + pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> { + self.blocking.poll_blocking_capacity(task) + } + + /// Submit a task to the scheduler. + /// + /// Called from either inside or outside of the scheduler. If currently on + /// the scheduler, then a fast path is taken. + pub fn submit(&self, task: Arc<Task>, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + Worker::with_current(|worker| { + if let Some(worker) = worker { + // If the worker is in blocking mode, then even though the + // thread-local variable is set, the current thread does not + // have ownership of that worker entry. This is because the + // worker entry has already been handed off to another thread. + // + // The second check handles the case where the current thread is + // part of a different threadpool than the one being submitted + // to. + if !worker.is_blocking() && *self == *worker.pool { + let idx = worker.id.0; + + trace!(" -> submit internal; idx={}", idx); + + worker.pool.workers[idx].submit_internal(task); + worker.pool.signal_work(pool); + return; + } + } + + self.submit_external(task, pool); + }); + } + + /// Submit a task to the scheduler from off worker + /// + /// Called from outside of the scheduler, this function is how new tasks + /// enter the system. + pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + trace!(" -> submit external"); + + self.queue.push(task); + self.signal_work(pool); + } + + pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> { + // First update the state, this cannot fail because the caller must have + // exclusive access to the backup token. + self.backup[backup_id.0].release(); + + // Push the backup entry back on the stack + self.backup_stack.push(&self.backup, backup_id) + } + + pub fn notify_blocking_task(&self, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + self.blocking.notify_task(&pool); + } + + /// Provision a thread to run a worker + pub fn spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + let backup_id = match self.backup_stack.pop(&self.backup, false) { + Ok(Some(backup_id)) => backup_id, + Ok(None) => panic!("no thread available"), + Err(_) => { + debug!("failed to spawn worker thread due to the thread pool shutting down"); + return; + } + }; + + let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone()); + + if !need_spawn { + return; + } + + let trigger = match self.trigger.upgrade() { + None => { + // The pool is shutting down. + return; + } + Some(t) => t, + }; + + let mut th = thread::Builder::new(); + + if let Some(ref prefix) = pool.config.name_prefix { + th = th.name(format!("{}{}", prefix, backup_id.0)); + } + + if let Some(stack) = pool.config.stack_size { + th = th.stack_size(stack); + } + + let pool = pool.clone(); + + let res = th.spawn(move || { + if let Some(ref f) = pool.config.after_start { + f(); + } + + let mut worker_id = id; + + pool.backup[backup_id.0].start(&worker_id); + + loop { + // The backup token should be in the running state. + debug_assert!(pool.backup[backup_id.0].is_running()); + + // TODO: Avoid always cloning + let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone()); + + // Run the worker. If the worker transitioned to a "blocking" + // state, then `is_blocking` will be true. + if !worker.do_run() { + // The worker shutdown, so exit the thread. + break; + } + + debug_assert!(!pool.backup[backup_id.0].is_pushed()); + + // Push the thread back onto the backup stack. This makes it + // available for future handoffs. + // + // This **must** happen before notifying the task. + let res = pool.backup_stack.push(&pool.backup, backup_id); + + if res.is_err() { + // The pool is being shutdown. + break; + } + + // The task switched the current thread to blocking mode. + // Now that the blocking task completed, any tasks + pool.notify_blocking_task(&pool); + + debug_assert!(pool.backup[backup_id.0].is_running()); + + // Wait for a handoff + let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive); + + match handoff { + Handoff::Worker(id) => { + debug_assert!(pool.backup[backup_id.0].is_running()); + worker_id = id; + } + Handoff::Idle | Handoff::Terminated => { + break; + } + } + } + + if let Some(ref f) = pool.config.before_stop { + f(); + } + }); + + if let Err(e) = res { + error!("failed to spawn worker thread; err={:?}", e); + panic!("failed to spawn worker thread: {:?}", e); + } + } + + /// If there are any other workers currently relaxing, signal them that work + /// is available so that they can try to find more work to process. + pub fn signal_work(&self, pool: &Arc<Pool>) { + debug_assert_eq!(*self, **pool); + + use worker::Lifecycle::Signaled; + + if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { + let entry = &self.workers[idx]; + + debug_assert!( + worker_state.lifecycle() != Signaled, + "actual={:?}", + worker_state.lifecycle(), + ); + + trace!("signal_work -- notify; idx={}", idx); + + if !entry.notify(worker_state) { + trace!("signal_work -- spawn; idx={}", idx); + self.spawn_thread(WorkerId(idx), pool); + } + } + } + + /// Generates a random number + /// + /// Uses a thread-local random number generator based on XorShift. + pub fn rand_usize(&self) -> usize { + thread_local! { + static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(prng_seed())); + } + + RNG.with(|rng| { + // This is the 32-bit variant of Xorshift. + // https://en.wikipedia.org/wiki/Xorshift + let mut x = rng.get(); + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + rng.set(x); + x.0 as usize + }) + } +} + +impl PartialEq for Pool { + fn eq(&self, other: &Pool) -> bool { + self as *const _ == other as *const _ + } +} + +unsafe impl Send for Pool {} +unsafe impl Sync for Pool {} + +// Return a thread-specific, 32-bit, non-zero seed value suitable for a 32-bit +// PRNG. This uses one libstd RandomState for a default hasher and hashes on +// the current thread ID to obtain an unpredictable, collision resistant seed. +fn prng_seed() -> u32 { + // This obtains a small number of random bytes from the host system (for + // example, on unix via getrandom(2)) in order to seed an unpredictable and + // HashDoS resistant 64-bit hash function (currently: `SipHasher13` with + // 128-bit state). We only need one of these, to make the seeds for all + // process threads different via hashed IDs, collision resistant, and + // unpredictable. + lazy_static! { + static ref RND_STATE: RandomState = RandomState::new(); + } + + // Hash the current thread ID to produce a u32 value + let mut hasher = RND_STATE.build_hasher(); + thread::current().id().hash(&mut hasher); + let hash: u64 = hasher.finish(); + let seed = (hash as u32) ^ ((hash >> 32) as u32); + + // Ensure non-zero seed (Xorshift yields only zero's for that seed) + if seed == 0 { + 0x9b4e_6d25 // misc bits, could be any non-zero + } else { + seed + } +} diff --git a/third_party/rust/tokio-threadpool/src/pool/state.rs b/third_party/rust/tokio-threadpool/src/pool/state.rs new file mode 100644 index 0000000000..5ecb514e5c --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/pool/state.rs @@ -0,0 +1,132 @@ +use std::{fmt, usize}; + +/// ThreadPool state. +/// +/// The two least significant bits are the shutdown flags. (0 for active, 1 for +/// shutdown on idle, 2 for shutting down). The remaining bits represent the +/// number of futures that still need to complete. +#[derive(Eq, PartialEq, Clone, Copy)] +pub(crate) struct State(usize); + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] +#[repr(usize)] +pub(crate) enum Lifecycle { + /// The thread pool is currently running + Running = 0, + + /// The thread pool should shutdown once it reaches an idle state. + ShutdownOnIdle = 1, + + /// The thread pool should start the process of shutting down. + ShutdownNow = 2, +} + +/// Mask used to extract the number of futures from the state +const LIFECYCLE_MASK: usize = 0b11; +const NUM_FUTURES_MASK: usize = !LIFECYCLE_MASK; +const NUM_FUTURES_OFFSET: usize = 2; + +/// Max number of futures the pool can handle. +pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET; + +// ===== impl State ===== + +impl State { + #[inline] + pub fn new() -> State { + State(0) + } + + /// Returns the number of futures still pending completion. + pub fn num_futures(&self) -> usize { + self.0 >> NUM_FUTURES_OFFSET + } + + /// Increment the number of futures pending completion. + /// + /// Returns false on failure. + pub fn inc_num_futures(&mut self) { + debug_assert!(self.num_futures() < MAX_FUTURES); + debug_assert!(self.lifecycle() < Lifecycle::ShutdownNow); + + self.0 += 1 << NUM_FUTURES_OFFSET; + } + + /// Decrement the number of futures pending completion. + pub fn dec_num_futures(&mut self) { + let num_futures = self.num_futures(); + + if num_futures == 0 { + // Already zero + return; + } + + self.0 -= 1 << NUM_FUTURES_OFFSET; + + if self.lifecycle() == Lifecycle::ShutdownOnIdle && num_futures == 1 { + self.set_lifecycle(Lifecycle::ShutdownNow); + } + } + + /// Set the number of futures pending completion to zero + pub fn clear_num_futures(&mut self) { + self.0 = self.0 & LIFECYCLE_MASK; + } + + pub fn lifecycle(&self) -> Lifecycle { + (self.0 & LIFECYCLE_MASK).into() + } + + pub fn set_lifecycle(&mut self, val: Lifecycle) { + self.0 = (self.0 & NUM_FUTURES_MASK) | (val as usize); + } + + pub fn is_terminated(&self) -> bool { + self.lifecycle() == Lifecycle::ShutdownNow && self.num_futures() == 0 + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("pool::State") + .field("lifecycle", &self.lifecycle()) + .field("num_futures", &self.num_futures()) + .finish() + } +} + +// ===== impl Lifecycle ===== + +impl From<usize> for Lifecycle { + fn from(src: usize) -> Lifecycle { + use self::Lifecycle::*; + + debug_assert!( + src == Running as usize + || src == ShutdownOnIdle as usize + || src == ShutdownNow as usize + ); + + unsafe { ::std::mem::transmute(src) } + } +} + +impl From<Lifecycle> for usize { + fn from(src: Lifecycle) -> usize { + let v = src as usize; + debug_assert!(v & LIFECYCLE_MASK == v); + v + } +} |