use crate::latch::Latch; use crate::unwind; use crossbeam_deque::{Injector, Steal}; use std::any::Any; use std::cell::UnsafeCell; use std::mem; use std::sync::Arc; pub(super) enum JobResult { None, Ok(T), Panic(Box), } /// A `Job` is used to advertise work for other threads that they may /// want to steal. In accordance with time honored tradition, jobs are /// arranged in a deque, so that thieves can take from the top of the /// deque while the main worker manages the bottom of the deque. This /// deque is managed by the `thread_pool` module. pub(super) trait Job { /// Unsafe: this may be called from a different thread than the one /// which scheduled the job, so the implementer must ensure the /// appropriate traits are met, whether `Send`, `Sync`, or both. unsafe fn execute(this: *const ()); } /// Effectively a Job trait object. Each JobRef **must** be executed /// exactly once, or else data may leak. /// /// Internally, we store the job's data in a `*const ()` pointer. The /// true type is something like `*const StackJob<...>`, but we hide /// it. We also carry the "execute fn" from the `Job` trait. #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub(super) struct JobRef { pointer: *const (), execute_fn: unsafe fn(*const ()), } unsafe impl Send for JobRef {} unsafe impl Sync for JobRef {} impl JobRef { /// Unsafe: caller asserts that `data` will remain valid until the /// job is executed. pub(super) unsafe fn new(data: *const T) -> JobRef where T: Job, { // erase types: JobRef { pointer: data as *const (), execute_fn: ::execute, } } #[inline] pub(super) unsafe fn execute(self) { (self.execute_fn)(self.pointer) } } /// A job that will be owned by a stack slot. This means that when it /// executes it need not free any heap data, the cleanup occurs when /// the stack frame is later popped. The function parameter indicates /// `true` if the job was stolen -- executed on a different thread. pub(super) struct StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { pub(super) latch: L, func: UnsafeCell>, result: UnsafeCell>, } impl StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { pub(super) fn new(func: F, latch: L) -> StackJob { StackJob { latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), } } pub(super) unsafe fn as_job_ref(&self) -> JobRef { JobRef::new(self) } pub(super) unsafe fn run_inline(self, stolen: bool) -> R { self.func.into_inner().unwrap()(stolen) } pub(super) unsafe fn into_result(self) -> R { self.result.into_inner().into_return_value() } } impl Job for StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { unsafe fn execute(this: *const ()) { let this = &*(this as *const Self); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = JobResult::call(func); Latch::set(&this.latch); mem::forget(abort); } } /// Represents a job stored in the heap. Used to implement /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply /// invokes a closure, which then triggers the appropriate logic to /// signal that the job executed. /// /// (Probably `StackJob` should be refactored in a similar fashion.) pub(super) struct HeapJob where BODY: FnOnce() + Send, { job: BODY, } impl HeapJob where BODY: FnOnce() + Send, { pub(super) fn new(job: BODY) -> Box { Box::new(HeapJob { job }) } /// Creates a `JobRef` from this job -- note that this hides all /// lifetimes, so it is up to you to ensure that this JobRef /// doesn't outlive any data that it closes over. pub(super) unsafe fn into_job_ref(self: Box) -> JobRef { JobRef::new(Box::into_raw(self)) } /// Creates a static `JobRef` from this job. pub(super) fn into_static_job_ref(self: Box) -> JobRef where BODY: 'static, { unsafe { self.into_job_ref() } } } impl Job for HeapJob where BODY: FnOnce() + Send, { unsafe fn execute(this: *const ()) { let this = Box::from_raw(this as *mut Self); (this.job)(); } } /// Represents a job stored in an `Arc` -- like `HeapJob`, but may /// be turned into multiple `JobRef`s and called multiple times. pub(super) struct ArcJob where BODY: Fn() + Send + Sync, { job: BODY, } impl ArcJob where BODY: Fn() + Send + Sync, { pub(super) fn new(job: BODY) -> Arc { Arc::new(ArcJob { job }) } /// Creates a `JobRef` from this job -- note that this hides all /// lifetimes, so it is up to you to ensure that this JobRef /// doesn't outlive any data that it closes over. pub(super) unsafe fn as_job_ref(this: &Arc) -> JobRef { JobRef::new(Arc::into_raw(Arc::clone(this))) } /// Creates a static `JobRef` from this job. pub(super) fn as_static_job_ref(this: &Arc) -> JobRef where BODY: 'static, { unsafe { Self::as_job_ref(this) } } } impl Job for ArcJob where BODY: Fn() + Send + Sync, { unsafe fn execute(this: *const ()) { let this = Arc::from_raw(this as *mut Self); (this.job)(); } } impl JobResult { fn call(func: impl FnOnce(bool) -> T) -> Self { match unwind::halt_unwinding(|| func(true)) { Ok(x) => JobResult::Ok(x), Err(x) => JobResult::Panic(x), } } /// Convert the `JobResult` for a job that has finished (and hence /// its JobResult is populated) into its return value. /// /// NB. This will panic if the job panicked. pub(super) fn into_return_value(self) -> T { match self { JobResult::None => unreachable!(), JobResult::Ok(x) => x, JobResult::Panic(x) => unwind::resume_unwinding(x), } } } /// Indirect queue to provide FIFO job priority. pub(super) struct JobFifo { inner: Injector, } impl JobFifo { pub(super) fn new() -> Self { JobFifo { inner: Injector::new(), } } pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { // A little indirection ensures that spawns are always prioritized in FIFO order. The // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front // (FIFO), but either way they will end up popping from the front of this queue. self.inner.push(job_ref); JobRef::new(self) } } impl Job for JobFifo { unsafe fn execute(this: *const ()) { // We "execute" a queue by executing its first job, FIFO. let this = &*(this as *const Self); loop { match this.inner.steal() { Steal::Success(job_ref) => break job_ref.execute(), Steal::Empty => panic!("FIFO is empty"), Steal::Retry => {} } } } }