summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/tests/support
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/tests/support')
-rw-r--r--vendor/tokio/tests/support/io_vec.rs45
-rw-r--r--vendor/tokio/tests/support/mpsc_stream.rs42
-rw-r--r--vendor/tokio/tests/support/signal.rs7
3 files changed, 94 insertions, 0 deletions
diff --git a/vendor/tokio/tests/support/io_vec.rs b/vendor/tokio/tests/support/io_vec.rs
new file mode 100644
index 000000000..4ea47c748
--- /dev/null
+++ b/vendor/tokio/tests/support/io_vec.rs
@@ -0,0 +1,45 @@
+use std::io::IoSlice;
+use std::ops::Deref;
+use std::slice;
+
+pub struct IoBufs<'a, 'b>(&'b mut [IoSlice<'a>]);
+
+impl<'a, 'b> IoBufs<'a, 'b> {
+ pub fn new(slices: &'b mut [IoSlice<'a>]) -> Self {
+ IoBufs(slices)
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ pub fn advance(mut self, n: usize) -> IoBufs<'a, 'b> {
+ let mut to_remove = 0;
+ let mut remaining_len = n;
+ for slice in self.0.iter() {
+ if remaining_len < slice.len() {
+ break;
+ } else {
+ remaining_len -= slice.len();
+ to_remove += 1;
+ }
+ }
+ self.0 = self.0.split_at_mut(to_remove).1;
+ if let Some(slice) = self.0.first_mut() {
+ let tail = &slice[remaining_len..];
+ // Safety: recasts slice to the original lifetime
+ let tail = unsafe { slice::from_raw_parts(tail.as_ptr(), tail.len()) };
+ *slice = IoSlice::new(tail);
+ } else if remaining_len != 0 {
+ panic!("advance past the end of the slice vector");
+ }
+ self
+ }
+}
+
+impl<'a, 'b> Deref for IoBufs<'a, 'b> {
+ type Target = [IoSlice<'a>];
+ fn deref(&self) -> &[IoSlice<'a>] {
+ self.0
+ }
+}
diff --git a/vendor/tokio/tests/support/mpsc_stream.rs b/vendor/tokio/tests/support/mpsc_stream.rs
new file mode 100644
index 000000000..aa385a39d
--- /dev/null
+++ b/vendor/tokio/tests/support/mpsc_stream.rs
@@ -0,0 +1,42 @@
+#![allow(dead_code)]
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio_stream::Stream;
+
+struct UnboundedStream<T> {
+ recv: UnboundedReceiver<T>,
+}
+impl<T> Stream for UnboundedStream<T> {
+ type Item = T;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Pin::into_inner(self).recv.poll_recv(cx)
+ }
+}
+
+pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ let stream = UnboundedStream { recv: rx };
+
+ (tx, stream)
+}
+
+struct BoundedStream<T> {
+ recv: Receiver<T>,
+}
+impl<T> Stream for BoundedStream<T> {
+ type Item = T;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ Pin::into_inner(self).recv.poll_recv(cx)
+ }
+}
+
+pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
+ let (tx, rx) = mpsc::channel(size);
+
+ let stream = BoundedStream { recv: rx };
+
+ (tx, stream)
+}
diff --git a/vendor/tokio/tests/support/signal.rs b/vendor/tokio/tests/support/signal.rs
new file mode 100644
index 000000000..ea0605876
--- /dev/null
+++ b/vendor/tokio/tests/support/signal.rs
@@ -0,0 +1,7 @@
+pub fn send_signal(signal: libc::c_int) {
+ use libc::{getpid, kill};
+
+ unsafe {
+ assert_eq!(kill(getpid(), signal), 0);
+ }
+}