diff options
Diffstat (limited to 'third_party/rust/rayon/src/iter/collect')
-rw-r--r-- | third_party/rust/rayon/src/iter/collect/consumer.rs | 186 | ||||
-rw-r--r-- | third_party/rust/rayon/src/iter/collect/mod.rs | 116 | ||||
-rw-r--r-- | third_party/rust/rayon/src/iter/collect/test.rs | 371 |
3 files changed, 673 insertions, 0 deletions
diff --git a/third_party/rust/rayon/src/iter/collect/consumer.rs b/third_party/rust/rayon/src/iter/collect/consumer.rs new file mode 100644 index 0000000000..acd67dffa7 --- /dev/null +++ b/third_party/rust/rayon/src/iter/collect/consumer.rs @@ -0,0 +1,186 @@ +use super::super::plumbing::*; +use crate::SendPtr; +use std::marker::PhantomData; +use std::ptr; +use std::slice; + +pub(super) struct CollectConsumer<'c, T: Send> { + /// See `CollectResult` for explanation of why this is not a slice + start: SendPtr<T>, + len: usize, + marker: PhantomData<&'c mut T>, +} + +impl<T: Send> CollectConsumer<'_, T> { + /// Create a collector for `len` items in the unused capacity of the vector. + pub(super) fn appender(vec: &mut Vec<T>, len: usize) -> CollectConsumer<'_, T> { + let start = vec.len(); + assert!(vec.capacity() - start >= len); + + // SAFETY: We already made sure to have the additional space allocated. + // The pointer is derived from `Vec` directly, not through a `Deref`, + // so it has provenance over the whole allocation. + unsafe { CollectConsumer::new(vec.as_mut_ptr().add(start), len) } + } +} + +impl<'c, T: Send + 'c> CollectConsumer<'c, T> { + /// The target memory is considered uninitialized, and will be + /// overwritten without reading or dropping existing values. + unsafe fn new(start: *mut T, len: usize) -> Self { + CollectConsumer { + start: SendPtr(start), + len, + marker: PhantomData, + } + } +} + +/// CollectResult represents an initialized part of the target slice. +/// +/// This is a proxy owner of the elements in the slice; when it drops, +/// the elements will be dropped, unless its ownership is released before then. +#[must_use] +pub(super) struct CollectResult<'c, T> { + /// This pointer and length has the same representation as a slice, + /// but retains the provenance of the entire array so that we can merge + /// these regions together in `CollectReducer`. + start: SendPtr<T>, + total_len: usize, + /// The current initialized length after `start` + initialized_len: usize, + /// Lifetime invariance guarantees that the data flows from consumer to result, + /// especially for the `scope_fn` callback in `Collect::with_consumer`. + invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, +} + +unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} + +impl<'c, T> CollectResult<'c, T> { + /// The current length of the collect result + pub(super) fn len(&self) -> usize { + self.initialized_len + } + + /// Release ownership of the slice of elements, and return the length + pub(super) fn release_ownership(mut self) -> usize { + let ret = self.initialized_len; + self.initialized_len = 0; + ret + } +} + +impl<'c, T> Drop for CollectResult<'c, T> { + fn drop(&mut self) { + // Drop the first `self.initialized_len` elements, which have been recorded + // to be initialized by the folder. + unsafe { + ptr::drop_in_place(slice::from_raw_parts_mut( + self.start.0, + self.initialized_len, + )); + } + } +} + +impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> { + type Folder = CollectResult<'c, T>; + type Reducer = CollectReducer; + type Result = CollectResult<'c, T>; + + fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { + let CollectConsumer { start, len, .. } = self; + + // Produce new consumers. + // SAFETY: This assert checks that `index` is a valid offset for `start` + unsafe { + assert!(index <= len); + ( + CollectConsumer::new(start.0, index), + CollectConsumer::new(start.0.add(index), len - index), + CollectReducer, + ) + } + } + + fn into_folder(self) -> Self::Folder { + // Create a result/folder that consumes values and writes them + // into the region after start. The initial result has length 0. + CollectResult { + start: self.start, + total_len: self.len, + initialized_len: 0, + invariant_lifetime: PhantomData, + } + } + + fn full(&self) -> bool { + false + } +} + +impl<'c, T: Send + 'c> Folder<T> for CollectResult<'c, T> { + type Result = Self; + + fn consume(mut self, item: T) -> Self { + assert!( + self.initialized_len < self.total_len, + "too many values pushed to consumer" + ); + + // SAFETY: The assert above is a bounds check for this write, and we + // avoid assignment here so we do not drop an uninitialized T. + unsafe { + // Write item and increase the initialized length + self.start.0.add(self.initialized_len).write(item); + self.initialized_len += 1; + } + + self + } + + fn complete(self) -> Self::Result { + // NB: We don't explicitly check that the local writes were complete, + // but Collect will assert the total result length in the end. + self + } + + fn full(&self) -> bool { + false + } +} + +/// Pretend to be unindexed for `special_collect_into_vec`, +/// but we should never actually get used that way... +impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> { + fn split_off_left(&self) -> Self { + unreachable!("CollectConsumer must be indexed!") + } + fn to_reducer(&self) -> Self::Reducer { + CollectReducer + } +} + +/// CollectReducer combines adjacent chunks; the result must always +/// be contiguous so that it is one combined slice. +pub(super) struct CollectReducer; + +impl<'c, T> Reducer<CollectResult<'c, T>> for CollectReducer { + fn reduce( + self, + mut left: CollectResult<'c, T>, + right: CollectResult<'c, T>, + ) -> CollectResult<'c, T> { + // Merge if the CollectResults are adjacent and in left to right order + // else: drop the right piece now and total length will end up short in the end, + // when the correctness of the collected result is asserted. + unsafe { + let left_end = left.start.0.add(left.initialized_len); + if left_end == right.start.0 { + left.total_len += right.total_len; + left.initialized_len += right.release_ownership(); + } + left + } + } +} diff --git a/third_party/rust/rayon/src/iter/collect/mod.rs b/third_party/rust/rayon/src/iter/collect/mod.rs new file mode 100644 index 0000000000..4044a685b8 --- /dev/null +++ b/third_party/rust/rayon/src/iter/collect/mod.rs @@ -0,0 +1,116 @@ +use super::{IndexedParallelIterator, ParallelIterator}; + +mod consumer; +use self::consumer::CollectConsumer; +use self::consumer::CollectResult; +use super::unzip::unzip_indexed; + +mod test; + +/// Collects the results of the exact iterator into the specified vector. +/// +/// This is called by `IndexedParallelIterator::collect_into_vec`. +pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>) +where + I: IndexedParallelIterator<Item = T>, + T: Send, +{ + v.truncate(0); // clear any old data + let len = pi.len(); + collect_with_consumer(v, len, |consumer| pi.drive(consumer)); +} + +/// Collects the results of the iterator into the specified vector. +/// +/// Technically, this only works for `IndexedParallelIterator`, but we're faking a +/// bit of specialization here until Rust can do that natively. Callers are +/// using `opt_len` to find the length before calling this, and only exact +/// iterators will return anything but `None` there. +/// +/// Since the type system doesn't understand that contract, we have to allow +/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement +/// `UnindexedConsumer`. That implementation panics `unreachable!` in case +/// there's a bug where we actually do try to use this unindexed. +pub(super) fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>) +where + I: ParallelIterator<Item = T>, + T: Send, +{ + collect_with_consumer(v, len, |consumer| pi.drive_unindexed(consumer)); +} + +/// Unzips the results of the exact iterator into the specified vectors. +/// +/// This is called by `IndexedParallelIterator::unzip_into_vecs`. +pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>) +where + I: IndexedParallelIterator<Item = (A, B)>, + A: Send, + B: Send, +{ + // clear any old data + left.truncate(0); + right.truncate(0); + + let len = pi.len(); + collect_with_consumer(right, len, |right_consumer| { + let mut right_result = None; + collect_with_consumer(left, len, |left_consumer| { + let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer); + right_result = Some(right_r); + left_r + }); + right_result.unwrap() + }); +} + +/// Create a consumer on the slice of memory we are collecting into. +/// +/// The consumer needs to be used inside the scope function, and the +/// complete collect result passed back. +/// +/// This method will verify the collect result, and panic if the slice +/// was not fully written into. Otherwise, in the successful case, +/// the vector is complete with the collected result. +fn collect_with_consumer<T, F>(vec: &mut Vec<T>, len: usize, scope_fn: F) +where + T: Send, + F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, +{ + // Reserve space for `len` more elements in the vector, + vec.reserve(len); + + // Create the consumer and run the callback for collection. + let result = scope_fn(CollectConsumer::appender(vec, len)); + + // The `CollectResult` represents a contiguous part of the slice, that has + // been written to. On unwind here, the `CollectResult` will be dropped. If + // some producers on the way did not produce enough elements, partial + // `CollectResult`s may have been dropped without being reduced to the final + // result, and we will see that as the length coming up short. + // + // Here, we assert that added length is fully initialized. This is checked + // by the following assert, which verifies if a complete `CollectResult` + // was produced; if the length is correct, it is necessarily covering the + // target slice. Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any stores that + // will happen, have happened. Unless some code is buggy, that means we + // should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == len, + "expected {} total writes, but got {}", + len, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = vec.len() + len; + + unsafe { + vec.set_len(new_len); + } +} diff --git a/third_party/rust/rayon/src/iter/collect/test.rs b/third_party/rust/rayon/src/iter/collect/test.rs new file mode 100644 index 0000000000..b5f676f5de --- /dev/null +++ b/third_party/rust/rayon/src/iter/collect/test.rs @@ -0,0 +1,371 @@ +#![cfg(test)] +#![allow(unused_assignments)] + +// These tests are primarily targeting "abusive" producers that will +// try to drive the "collect consumer" incorrectly. These should +// result in panics. + +use super::collect_with_consumer; +use crate::iter::plumbing::*; +use rayon_core::join; + +use std::fmt; +use std::panic; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::Result as ThreadResult; + +/// Promises to produce 2 items, but then produces 3. Does not do any +/// splits at all. +#[test] +#[should_panic(expected = "too many values")] +fn produce_too_many_items() { + let mut v = vec![]; + collect_with_consumer(&mut v, 2, |consumer| { + let mut folder = consumer.into_folder(); + folder = folder.consume(22); + folder = folder.consume(23); + folder = folder.consume(24); + unreachable!("folder does not complete") + }); +} + +/// Produces fewer items than promised. Does not do any +/// splits at all. +#[test] +#[should_panic(expected = "expected 5 total writes, but got 2")] +fn produce_fewer_items() { + let mut v = vec![]; + collect_with_consumer(&mut v, 5, |consumer| { + let mut folder = consumer.into_folder(); + folder = folder.consume(22); + folder = folder.consume(23); + folder.complete() + }); +} + +// Complete is not called by the consumer. Hence,the collection vector is not fully initialized. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn left_produces_items_with_no_complete() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + right_folder.complete() + }); +} + +// Complete is not called by the right consumer. Hence,the +// collection vector is not fully initialized. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn right_produces_items_with_no_complete() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + left_folder.complete() + }); +} + +// Complete is not called by the consumer. Hence,the collection vector is not fully initialized. +#[test] +fn produces_items_with_no_complete() { + let counter = DropCounter::default(); + let mut v = vec![]; + let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + collect_with_consumer(&mut v, 2, |consumer| { + let mut folder = consumer.into_folder(); + folder = folder.consume(counter.element()); + folder = folder.consume(counter.element()); + panic!("folder does not complete"); + }); + })); + assert!(v.is_empty()); + assert_is_panic_with_message(&panic_result, "folder does not complete"); + counter.assert_drop_count(); +} + +// The left consumer produces too many items while the right +// consumer produces correct number. +#[test] +#[should_panic(expected = "too many values")] +fn left_produces_too_many_items() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1).consume(2); + right_folder = right_folder.consume(2).consume(3); + let _ = right_folder.complete(); + unreachable!("folder does not complete"); + }); +} + +// The right consumer produces too many items while the left +// consumer produces correct number. +#[test] +#[should_panic(expected = "too many values")] +fn right_produces_too_many_items() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3).consume(4); + let _ = left_folder.complete(); + unreachable!("folder does not complete"); + }); +} + +// The left consumer produces fewer items while the right +// consumer produces correct number. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 1")] +fn left_produces_fewer_items() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0); + right_folder = right_folder.consume(2).consume(3); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(left_result, right_result) + }); +} + +// The left and right consumer produce the correct number but +// only left result is returned +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn only_left_result() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + let left_result = left_folder.complete(); + let _ = right_folder.complete(); + left_result + }); +} + +// The left and right consumer produce the correct number but +// only right result is returned +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn only_right_result() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + let _ = left_folder.complete(); + right_folder.complete() + }); +} + +// The left and right consumer produce the correct number but reduce +// in the wrong order. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 2")] +fn reducer_does_not_preserve_order() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2).consume(3); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(right_result, left_result) + }); +} + +// The right consumer produces fewer items while the left +// consumer produces correct number. +#[test] +#[should_panic(expected = "expected 4 total writes, but got 3")] +fn right_produces_fewer_items() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(0).consume(1); + right_folder = right_folder.consume(2); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(left_result, right_result) + }); +} + +// The left consumer panics and the right stops short, like `panic_fuse()`. +// We should get the left panic without finishing `collect_with_consumer`. +#[test] +#[should_panic(expected = "left consumer panic")] +fn left_panics() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let (left_result, right_result) = join( + || { + let mut left_folder = left_consumer.into_folder(); + left_folder = left_folder.consume(0); + panic!("left consumer panic"); + }, + || { + let mut right_folder = right_consumer.into_folder(); + right_folder = right_folder.consume(2); + right_folder.complete() // early return + }, + ); + reducer.reduce(left_result, right_result) + }); + unreachable!(); +} + +// The right consumer panics and the left stops short, like `panic_fuse()`. +// We should get the right panic without finishing `collect_with_consumer`. +#[test] +#[should_panic(expected = "right consumer panic")] +fn right_panics() { + let mut v = vec![]; + collect_with_consumer(&mut v, 4, |consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let (left_result, right_result) = join( + || { + let mut left_folder = left_consumer.into_folder(); + left_folder = left_folder.consume(0); + left_folder.complete() // early return + }, + || { + let mut right_folder = right_consumer.into_folder(); + right_folder = right_folder.consume(2); + panic!("right consumer panic"); + }, + ); + reducer.reduce(left_result, right_result) + }); + unreachable!(); +} + +// The left consumer produces fewer items while the right +// consumer produces correct number; check that created elements are dropped +#[test] +fn left_produces_fewer_items_drops() { + let counter = DropCounter::default(); + let mut v = vec![]; + let panic_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + collect_with_consumer(&mut v, 4, |consumer| { + let reducer = consumer.to_reducer(); + let (left_consumer, right_consumer, _) = consumer.split_at(2); + let mut left_folder = left_consumer.into_folder(); + let mut right_folder = right_consumer.into_folder(); + left_folder = left_folder.consume(counter.element()); + right_folder = right_folder + .consume(counter.element()) + .consume(counter.element()); + let left_result = left_folder.complete(); + let right_result = right_folder.complete(); + reducer.reduce(left_result, right_result) + }); + })); + assert!(v.is_empty()); + assert_is_panic_with_message(&panic_result, "expected 4 total writes, but got 1"); + counter.assert_drop_count(); +} + +/// This counter can create elements, and then count and verify +/// the number of which have actually been dropped again. +#[derive(Default)] +struct DropCounter { + created: AtomicUsize, + dropped: AtomicUsize, +} + +struct Element<'a>(&'a AtomicUsize); + +impl DropCounter { + fn created(&self) -> usize { + self.created.load(Ordering::SeqCst) + } + + fn dropped(&self) -> usize { + self.dropped.load(Ordering::SeqCst) + } + + fn element(&self) -> Element<'_> { + self.created.fetch_add(1, Ordering::SeqCst); + Element(&self.dropped) + } + + fn assert_drop_count(&self) { + assert_eq!( + self.created(), + self.dropped(), + "Expected {} dropped elements, but found {}", + self.created(), + self.dropped() + ); + } +} + +impl<'a> Drop for Element<'a> { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } +} + +/// Assert that the result from catch_unwind is a panic that contains expected message +fn assert_is_panic_with_message<T>(result: &ThreadResult<T>, expected: &str) +where + T: fmt::Debug, +{ + match result { + Ok(value) => { + panic!( + "assertion failure: Expected panic, got successful {:?}", + value + ); + } + Err(error) => { + let message_str = error.downcast_ref::<&'static str>().cloned(); + let message_string = error.downcast_ref::<String>().map(String::as_str); + if let Some(message) = message_str.or(message_string) { + if !message.contains(expected) { + panic!( + "assertion failure: Expected {:?}, but found panic with {:?}", + expected, message + ); + } + // assertion passes + } else { + panic!( + "assertion failure: Expected {:?}, but found panic with unknown value", + expected + ); + } + } + } +} |