use crossbeam_utils::thread; use std::any::Any; use std::env; use std::io; /// Represents the types of errors that may occur while using build-parallel. #[derive(Debug)] pub enum Error { /// Error occurred while internally performing I/O. IOError(io::Error), /// Error occurred during build callback. BuildError(E), /// Panic occurred during build callback. BuildPanic(Box), } fn compile_object(f: F, obj: &T) -> Result> where T: 'static + Sync, R: 'static + Sync + Send, E: 'static + Sync + Send, F: Fn(&T) -> Result + Sync + Send, { f(obj).map_err(Error::BuildError) } pub fn compile_objects(f: &F, objs: &[T]) -> Result, Error> where T: 'static + Sync, R: 'static + Sync + Send, E: 'static + Sync + Send, F: Fn(&T) -> Result + Sync + Send, { use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use std::sync::Once; // Limit our parallelism globally with a jobserver. Start off by // releasing our own token for this process so we can have a bit of an // easier to write loop below. If this fails, though, then we're likely // on Windows with the main implicit token, so we just have a bit extra // parallelism for a bit and don't reacquire later. let server = jobserver(); let reacquire = server.release_raw().is_ok(); let res = thread::scope(|s| { // When compiling objects in parallel we do a few dirty tricks to speed // things up: // // * First is that we use the `jobserver` crate to limit the parallelism // of this build script. The `jobserver` crate will use a jobserver // configured by Cargo for build scripts to ensure that parallelism is // coordinated across C compilations and Rust compilations. Before we // compile anything we make sure to wait until we acquire a token. // // Note that this jobserver is cached globally so we only used one per // process and only worry about creating it once. // // * Next we use a raw `thread::spawn` per thread to actually compile // objects in parallel. We only actually spawn a thread after we've // acquired a token to perform some work // // * Finally though we want to keep the dependencies of this crate // pretty light, so we avoid using a safe abstraction like `rayon` and // instead rely on some bits of `unsafe` code. We know that this stack // frame persists while everything is compiling so we use all the // stack-allocated objects without cloning/reallocating. We use a // transmute to `State` with a `'static` lifetime to persist // everything we need across the boundary, and the join-on-drop // semantics of `JoinOnDrop` should ensure that our stack frame is // alive while threads are alive. // // With all that in mind we compile all objects in a loop here, after we // acquire the appropriate tokens, Once all objects have been compiled // we join on all the threads and propagate the results of compilation. // // Note that as a slight optimization we try to break out as soon as // possible as soon as any compilation fails to ensure that errors get // out to the user as fast as possible. let error = AtomicBool::new(false); let mut handles = Vec::new(); for obj in objs { if error.load(SeqCst) { break; } let token = server.acquire().map_err(Error::IOError)?; let state = State { obj, error: &error }; let state = unsafe { std::mem::transmute::, State<'static, T>>(state) }; handles.push(s.spawn(|_| { let state: State = state; // erase the `'static` lifetime let result = compile_object(f, state.obj); if result.is_err() { state.error.store(true, SeqCst); } drop(token); // make sure our jobserver token is released after the compile result })); } let mut output = Vec::new(); for handle in handles { match handle.join().map_err(Error::BuildPanic)? { Ok(r) => output.push(r), Err(err) => return Err(err), } } Ok(output) }) .map_err(Error::BuildPanic)?; // Reacquire our process's token before we proceed, which we released // before entering the loop above. if reacquire { server.acquire_raw().map_err(Error::IOError)?; } return res; /// Shared state from the parent thread to the child thread. This /// package of pointers is temporarily transmuted to a `'static` /// lifetime to cross the thread boundary and then once the thread is /// running we erase the `'static` to go back to an anonymous lifetime. struct State<'a, O> { obj: &'a O, error: &'a AtomicBool, } /// Returns a suitable `jobserver::Client` used to coordinate /// parallelism between build scripts. fn jobserver() -> &'static jobserver::Client { static INIT: Once = Once::new(); static mut JOBSERVER: Option = None; fn _assert_sync() {} _assert_sync::(); unsafe { INIT.call_once(|| { let server = default_jobserver(); JOBSERVER = Some(server); }); JOBSERVER.as_ref().unwrap() } } unsafe fn default_jobserver() -> jobserver::Client { // Try to use the environmental jobserver which Cargo typically // initializes for us... if let Some(client) = jobserver::Client::from_env() { return client; } // ... but if that fails for whatever reason fall back to the number // of cpus on the system or the `NUM_JOBS` env var. let mut parallelism = num_cpus::get(); if let Ok(amt) = env::var("NUM_JOBS") { if let Ok(amt) = amt.parse() { parallelism = amt; } } // If we create our own jobserver then be sure to reserve one token // for ourselves. let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); client.acquire_raw().expect("failed to acquire initial"); client } } #[test] fn it_works() { struct Object; let mut v = Vec::new(); for _ in 0..4000 { v.push(Object); } compile_objects::( &|_| { println!("compile {:?}", std::thread::current().id()); Ok(()) }, &v, ) .unwrap(); } #[test] fn test_build_error() { struct Object; let mut v = Vec::new(); v.push(Object); let err = compile_objects::( &|_| { return Err(()); }, &v, ) .unwrap_err(); match err { Error::BuildError(_) => {}, _ => panic!("Unexpected error."), } } #[test] fn test_build_panic() { struct Object; let mut v = Vec::new(); v.push(Object); let err = compile_objects::( &|_| { panic!("Panic."); }, &v, ) .unwrap_err(); match err { Error::BuildPanic(_) => {}, _ => panic!("Unexpected error."), } }