65 lines
1.8 KiB
Rust
65 lines
1.8 KiB
Rust
#![warn(rust_2018_idioms)]
|
|
|
|
use std::pin::Pin;
|
|
use std::task::{Context, Poll};
|
|
use tokio::io::{AsyncRead, ReadBuf};
|
|
use tokio_stream::StreamExt;
|
|
|
|
/// produces at most `remaining` zeros, that returns error.
|
|
/// each time it reads at most 31 byte.
|
|
struct Reader {
|
|
remaining: usize,
|
|
}
|
|
|
|
impl AsyncRead for Reader {
|
|
fn poll_read(
|
|
self: Pin<&mut Self>,
|
|
_cx: &mut Context<'_>,
|
|
buf: &mut ReadBuf<'_>,
|
|
) -> Poll<std::io::Result<()>> {
|
|
let this = Pin::into_inner(self);
|
|
assert_ne!(buf.remaining(), 0);
|
|
if this.remaining > 0 {
|
|
let n = std::cmp::min(this.remaining, buf.remaining());
|
|
let n = std::cmp::min(n, 31);
|
|
for x in &mut buf.initialize_unfilled_to(n)[..n] {
|
|
*x = 0;
|
|
}
|
|
buf.advance(n);
|
|
this.remaining -= n;
|
|
Poll::Ready(Ok(()))
|
|
} else {
|
|
Poll::Ready(Err(std::io::Error::from_raw_os_error(22)))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn correct_behavior_on_errors() {
|
|
let reader = Reader { remaining: 8000 };
|
|
let mut stream = tokio_util::io::ReaderStream::new(reader);
|
|
let mut zeros_received = 0;
|
|
let mut had_error = false;
|
|
loop {
|
|
let item = stream.next().await.unwrap();
|
|
println!("{:?}", item);
|
|
match item {
|
|
Ok(bytes) => {
|
|
let bytes = &*bytes;
|
|
for byte in bytes {
|
|
assert_eq!(*byte, 0);
|
|
zeros_received += 1;
|
|
}
|
|
}
|
|
Err(_) => {
|
|
assert!(!had_error);
|
|
had_error = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
assert!(had_error);
|
|
assert_eq!(zeros_received, 8000);
|
|
assert!(stream.next().await.is_none());
|
|
}
|