diff options
Diffstat (limited to 'vendor/tokio/tests/support')
-rw-r--r-- | vendor/tokio/tests/support/io_vec.rs | 45 | ||||
-rw-r--r-- | vendor/tokio/tests/support/mpsc_stream.rs | 42 | ||||
-rw-r--r-- | vendor/tokio/tests/support/signal.rs | 7 |
3 files changed, 94 insertions, 0 deletions
diff --git a/vendor/tokio/tests/support/io_vec.rs b/vendor/tokio/tests/support/io_vec.rs new file mode 100644 index 000000000..4ea47c748 --- /dev/null +++ b/vendor/tokio/tests/support/io_vec.rs @@ -0,0 +1,45 @@ +use std::io::IoSlice; +use std::ops::Deref; +use std::slice; + +pub struct IoBufs<'a, 'b>(&'b mut [IoSlice<'a>]); + +impl<'a, 'b> IoBufs<'a, 'b> { + pub fn new(slices: &'b mut [IoSlice<'a>]) -> Self { + IoBufs(slices) + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn advance(mut self, n: usize) -> IoBufs<'a, 'b> { + let mut to_remove = 0; + let mut remaining_len = n; + for slice in self.0.iter() { + if remaining_len < slice.len() { + break; + } else { + remaining_len -= slice.len(); + to_remove += 1; + } + } + self.0 = self.0.split_at_mut(to_remove).1; + if let Some(slice) = self.0.first_mut() { + let tail = &slice[remaining_len..]; + // Safety: recasts slice to the original lifetime + let tail = unsafe { slice::from_raw_parts(tail.as_ptr(), tail.len()) }; + *slice = IoSlice::new(tail); + } else if remaining_len != 0 { + panic!("advance past the end of the slice vector"); + } + self + } +} + +impl<'a, 'b> Deref for IoBufs<'a, 'b> { + type Target = [IoSlice<'a>]; + fn deref(&self) -> &[IoSlice<'a>] { + self.0 + } +} diff --git a/vendor/tokio/tests/support/mpsc_stream.rs b/vendor/tokio/tests/support/mpsc_stream.rs new file mode 100644 index 000000000..aa385a39d --- /dev/null +++ b/vendor/tokio/tests/support/mpsc_stream.rs @@ -0,0 +1,42 @@ +#![allow(dead_code)] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio_stream::Stream; + +struct UnboundedStream<T> { + recv: UnboundedReceiver<T>, +} +impl<T> Stream for UnboundedStream<T> { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + +pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) { + let (tx, rx) = mpsc::unbounded_channel(); + + let stream = UnboundedStream { recv: rx }; + + (tx, stream) +} + +struct BoundedStream<T> { + recv: Receiver<T>, +} +impl<T> Stream for BoundedStream<T> { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Pin::into_inner(self).recv.poll_recv(cx) + } +} + +pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) { + let (tx, rx) = mpsc::channel(size); + + let stream = BoundedStream { recv: rx }; + + (tx, stream) +} diff --git a/vendor/tokio/tests/support/signal.rs b/vendor/tokio/tests/support/signal.rs new file mode 100644 index 000000000..ea0605876 --- /dev/null +++ b/vendor/tokio/tests/support/signal.rs @@ -0,0 +1,7 @@ +pub fn send_signal(signal: libc::c_int) { + use libc::{getpid, kill}; + + unsafe { + assert_eq!(kill(getpid(), signal), 0); + } +} |