use crate::latch::Latch; use crate::tlv; use crate::unwind; use crossbeam_deque::{Injector, Steal}; use std::any::Any; use std::cell::UnsafeCell; use std::mem; pub(super) enum JobResult { None, Ok(T), Panic(Box), } /// A `Job` is used to advertise work for other threads that they may /// want to steal. In accordance with time honored tradition, jobs are /// arranged in a deque, so that thieves can take from the top of the /// deque while the main worker manages the bottom of the deque. This /// deque is managed by the `thread_pool` module. pub(super) trait Job { /// Unsafe: this may be called from a different thread than the one /// which scheduled the job, so the implementer must ensure the /// appropriate traits are met, whether `Send`, `Sync`, or both. unsafe fn execute(this: *const Self); } /// Effectively a Job trait object. Each JobRef **must** be executed /// exactly once, or else data may leak. /// /// Internally, we store the job's data in a `*const ()` pointer. The /// true type is something like `*const StackJob<...>`, but we hide /// it. We also carry the "execute fn" from the `Job` trait. #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub(super) struct JobRef { pointer: *const (), execute_fn: unsafe fn(*const ()), } unsafe impl Send for JobRef {} unsafe impl Sync for JobRef {} impl JobRef { /// Unsafe: caller asserts that `data` will remain valid until the /// job is executed. pub(super) unsafe fn new(data: *const T) -> JobRef where T: Job, { let fn_ptr: unsafe fn(*const T) = ::execute; // erase types: JobRef { pointer: data as *const (), execute_fn: mem::transmute(fn_ptr), } } #[inline] pub(super) unsafe fn execute(&self) { (self.execute_fn)(self.pointer) } } /// A job that will be owned by a stack slot. This means that when it /// executes it need not free any heap data, the cleanup occurs when /// the stack frame is later popped. The function parameter indicates /// `true` if the job was stolen -- executed on a different thread. pub(super) struct StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { pub(super) latch: L, func: UnsafeCell>, result: UnsafeCell>, tlv: usize, } impl StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { pub(super) fn new(tlv: usize, func: F, latch: L) -> StackJob { StackJob { latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), tlv, } } pub(super) unsafe fn as_job_ref(&self) -> JobRef { JobRef::new(self) } pub(super) unsafe fn run_inline(self, stolen: bool) -> R { self.func.into_inner().unwrap()(stolen) } pub(super) unsafe fn into_result(self) -> R { self.result.into_inner().into_return_value() } } impl Job for StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { unsafe fn execute(this: *const Self) { fn call(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R { move || func(true) } let this = &*this; tlv::set(this.tlv); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = match unwind::halt_unwinding(call(func)) { Ok(x) => JobResult::Ok(x), Err(x) => JobResult::Panic(x), }; this.latch.set(); mem::forget(abort); } } /// Represents a job stored in the heap. Used to implement /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply /// invokes a closure, which then triggers the appropriate logic to /// signal that the job executed. /// /// (Probably `StackJob` should be refactored in a similar fashion.) pub(super) struct HeapJob where BODY: FnOnce() + Send, { job: UnsafeCell>, tlv: usize, } impl HeapJob where BODY: FnOnce() + Send, { pub(super) fn new(tlv: usize, func: BODY) -> Self { HeapJob { job: UnsafeCell::new(Some(func)), tlv, } } /// Creates a `JobRef` from this job -- note that this hides all /// lifetimes, so it is up to you to ensure that this JobRef /// doesn't outlive any data that it closes over. pub(super) unsafe fn as_job_ref(self: Box) -> JobRef { let this: *const Self = mem::transmute(self); JobRef::new(this) } } impl Job for HeapJob where BODY: FnOnce() + Send, { unsafe fn execute(this: *const Self) { let this: Box = mem::transmute(this); tlv::set(this.tlv); let job = (*this.job.get()).take().unwrap(); job(); } } impl JobResult { /// Convert the `JobResult` for a job that has finished (and hence /// its JobResult is populated) into its return value. /// /// NB. This will panic if the job panicked. pub(super) fn into_return_value(self) -> T { match self { JobResult::None => unreachable!(), JobResult::Ok(x) => x, JobResult::Panic(x) => unwind::resume_unwinding(x), } } } /// Indirect queue to provide FIFO job priority. pub(super) struct JobFifo { inner: Injector, } impl JobFifo { pub(super) fn new() -> Self { JobFifo { inner: Injector::new(), } } pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { // A little indirection ensures that spawns are always prioritized in FIFO order. The // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front // (FIFO), but either way they will end up popping from the front of this queue. self.inner.push(job_ref); JobRef::new(self) } } impl Job for JobFifo { unsafe fn execute(this: *const Self) { // We "execute" a queue by executing its first job, FIFO. loop { match (*this).inner.steal() { Steal::Success(job_ref) => break job_ref.execute(), Steal::Empty => panic!("FIFO is empty"), Steal::Retry => {} } } } }