diff options
Diffstat (limited to 'third_party/rust/build-parallel')
-rw-r--r-- | third_party/rust/build-parallel/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/build-parallel/Cargo.toml | 31 | ||||
-rw-r--r-- | third_party/rust/build-parallel/src/lib.rs | 223 |
3 files changed, 255 insertions, 0 deletions
diff --git a/third_party/rust/build-parallel/.cargo-checksum.json b/third_party/rust/build-parallel/.cargo-checksum.json new file mode 100644 index 0000000000..0aaffc9bc5 --- /dev/null +++ b/third_party/rust/build-parallel/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"b4a0a0ab788e6823f07a15a777ecfa23d6353a73620451b8db943cc3138e8e48","src/lib.rs":"3f3bbc7e3cb00c52db8cb4ba829808e36e92f711af4dff9d5f8546c9e26fe0a5"},"package":"b8e3ff9db740167616e528c509b3618046fc05d337f8f3182d300f4aa977d2bb"}
\ No newline at end of file diff --git a/third_party/rust/build-parallel/Cargo.toml b/third_party/rust/build-parallel/Cargo.toml new file mode 100644 index 0000000000..fe5c5b7f88 --- /dev/null +++ b/third_party/rust/build-parallel/Cargo.toml @@ -0,0 +1,31 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +edition = "2018" +name = "build-parallel" +version = "0.1.2" +authors = ["Jeff Muizelaar <jrmuizel@gmail.com>"] +description = "A helper library to let you parallelize work in build.rs\nusing the jobserver\n" +documentation = "https://doc.rs/build-parallel" +keywords = ["build-dependencies"] +categories = ["development-tools::build-utils"] +license = "MIT/Apache-2.0" +repository = "https://github.com/jrmuizel/build-parallel" +[dependencies.crossbeam-utils] +version = "0.8" + +[dependencies.jobserver] +version = "0.1.19" + +[dependencies.num_cpus] +version = "1.0" diff --git a/third_party/rust/build-parallel/src/lib.rs b/third_party/rust/build-parallel/src/lib.rs new file mode 100644 index 0000000000..66a105e717 --- /dev/null +++ b/third_party/rust/build-parallel/src/lib.rs @@ -0,0 +1,223 @@ +use crossbeam_utils::thread; +use std::any::Any; +use std::env; +use std::io; + +/// Represents the types of errors that may occur while using build-parallel. +#[derive(Debug)] +pub enum Error<E> { + /// Error occurred while internally performing I/O. + IOError(io::Error), + /// Error occurred during build callback. + BuildError(E), + /// Panic occurred during build callback. + BuildPanic(Box<dyn Any + Send + 'static>), +} + +fn compile_object<T, R, E, F>(f: F, obj: &T) -> Result<R, Error<E>> +where + T: 'static + Sync, + R: 'static + Sync + Send, + E: 'static + Sync + Send, + F: Fn(&T) -> Result<R, E> + Sync + Send, +{ + f(obj).map_err(Error::BuildError) +} + +pub fn compile_objects<T, R, E, F>(f: &F, objs: &[T]) -> Result<Vec<R>, Error<E>> +where + T: 'static + Sync, + R: 'static + Sync + Send, + E: 'static + Sync + Send, + F: Fn(&T) -> Result<R, E> + Sync + Send, +{ + use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; + use std::sync::Once; + + // Limit our parallelism globally with a jobserver. Start off by + // releasing our own token for this process so we can have a bit of an + // easier to write loop below. If this fails, though, then we're likely + // on Windows with the main implicit token, so we just have a bit extra + // parallelism for a bit and don't reacquire later. + let server = jobserver(); + let reacquire = server.release_raw().is_ok(); + + let res = thread::scope(|s| { + // When compiling objects in parallel we do a few dirty tricks to speed + // things up: + // + // * First is that we use the `jobserver` crate to limit the parallelism + // of this build script. The `jobserver` crate will use a jobserver + // configured by Cargo for build scripts to ensure that parallelism is + // coordinated across C compilations and Rust compilations. Before we + // compile anything we make sure to wait until we acquire a token. + // + // Note that this jobserver is cached globally so we only used one per + // process and only worry about creating it once. + // + // * Next we use a raw `thread::spawn` per thread to actually compile + // objects in parallel. We only actually spawn a thread after we've + // acquired a token to perform some work + // + // * Finally though we want to keep the dependencies of this crate + // pretty light, so we avoid using a safe abstraction like `rayon` and + // instead rely on some bits of `unsafe` code. We know that this stack + // frame persists while everything is compiling so we use all the + // stack-allocated objects without cloning/reallocating. We use a + // transmute to `State` with a `'static` lifetime to persist + // everything we need across the boundary, and the join-on-drop + // semantics of `JoinOnDrop` should ensure that our stack frame is + // alive while threads are alive. + // + // With all that in mind we compile all objects in a loop here, after we + // acquire the appropriate tokens, Once all objects have been compiled + // we join on all the threads and propagate the results of compilation. + // + // Note that as a slight optimization we try to break out as soon as + // possible as soon as any compilation fails to ensure that errors get + // out to the user as fast as possible. + let error = AtomicBool::new(false); + let mut handles = Vec::new(); + for obj in objs { + if error.load(SeqCst) { + break; + } + let token = server.acquire().map_err(Error::IOError)?; + let state = State { obj, error: &error }; + let state = unsafe { std::mem::transmute::<State<T>, State<'static, T>>(state) }; + handles.push(s.spawn(|_| { + let state: State<T> = state; // erase the `'static` lifetime + let result = compile_object(f, state.obj); + if result.is_err() { + state.error.store(true, SeqCst); + } + drop(token); // make sure our jobserver token is released after the compile + result + })); + } + + let mut output = Vec::new(); + for handle in handles { + match handle.join().map_err(Error::BuildPanic)? { + Ok(r) => output.push(r), + Err(err) => return Err(err), + } + } + + Ok(output) + }) + .map_err(Error::BuildPanic)?; + + // Reacquire our process's token before we proceed, which we released + // before entering the loop above. + if reacquire { + server.acquire_raw().map_err(Error::IOError)?; + } + + return res; + + /// Shared state from the parent thread to the child thread. This + /// package of pointers is temporarily transmuted to a `'static` + /// lifetime to cross the thread boundary and then once the thread is + /// running we erase the `'static` to go back to an anonymous lifetime. + struct State<'a, O> { + obj: &'a O, + error: &'a AtomicBool, + } + + /// Returns a suitable `jobserver::Client` used to coordinate + /// parallelism between build scripts. + fn jobserver() -> &'static jobserver::Client { + static INIT: Once = Once::new(); + static mut JOBSERVER: Option<jobserver::Client> = None; + + fn _assert_sync<T: Sync>() {} + _assert_sync::<jobserver::Client>(); + + unsafe { + INIT.call_once(|| { + let server = default_jobserver(); + JOBSERVER = Some(server); + }); + JOBSERVER.as_ref().unwrap() + } + } + + unsafe fn default_jobserver() -> jobserver::Client { + // Try to use the environmental jobserver which Cargo typically + // initializes for us... + if let Some(client) = jobserver::Client::from_env() { + return client; + } + + // ... but if that fails for whatever reason fall back to the number + // of cpus on the system or the `NUM_JOBS` env var. + let mut parallelism = num_cpus::get(); + if let Ok(amt) = env::var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + // If we create our own jobserver then be sure to reserve one token + // for ourselves. + let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); + client.acquire_raw().expect("failed to acquire initial"); + client + } +} + +#[test] +fn it_works() { + struct Object; + let mut v = Vec::new(); + for _ in 0..4000 { + v.push(Object); + } + compile_objects::<Object, (), (), _>( + &|_| { + println!("compile {:?}", std::thread::current().id()); + Ok(()) + }, + &v, + ) + .unwrap(); +} + +#[test] +fn test_build_error() { + struct Object; + let mut v = Vec::new(); + v.push(Object); + let err = compile_objects::<Object, (), (), _>( + &|_| { + return Err(()); + }, + &v, + ) + .unwrap_err(); + + match err { + Error::BuildError(_) => {}, + _ => panic!("Unexpected error."), + } +} + +#[test] +fn test_build_panic() { + struct Object; + let mut v = Vec::new(); + v.push(Object); + let err = compile_objects::<Object, (), (), _>( + &|_| { + panic!("Panic."); + }, + &v, + ) + .unwrap_err(); + + match err { + Error::BuildPanic(_) => {}, + _ => panic!("Unexpected error."), + } +} |