use futures::channel::{mpsc, oneshot}; use futures::executor::block_on; use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; use futures::never::Never; use futures::ready; use futures::sink::{self, Sink, SinkErrInto, SinkExt}; use futures::stream::{self, Stream, StreamExt}; use futures::task::{self, ArcWake, Context, Poll, Waker}; use futures_test::task::panic_context; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; use std::fmt; use std::mem; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; fn sassert_next(s: &mut S, item: S::Item) where S: Stream + Unpin, S::Item: Eq + fmt::Debug, { match s.poll_next_unpin(&mut panic_context()) { Poll::Ready(None) => panic!("stream is at its end"), Poll::Ready(Some(e)) => assert_eq!(e, item), Poll::Pending => panic!("stream wasn't ready"), } } fn unwrap(x: Poll>) -> T { match x { Poll::Ready(Ok(x)) => x, Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), Poll::Pending => panic!("Poll::Pending"), } } // An Unpark struct that records unpark events for inspection struct Flag(AtomicBool); impl Flag { fn new() -> Arc { Arc::new(Self(AtomicBool::new(false))) } fn take(&self) -> bool { self.0.swap(false, Ordering::SeqCst) } fn set(&self, v: bool) { self.0.store(v, Ordering::SeqCst) } } impl ArcWake for Flag { fn wake_by_ref(arc_self: &Arc) { arc_self.set(true) } } fn flag_cx(f: F) -> R where F: FnOnce(Arc, &mut Context<'_>) -> R, { let flag = Flag::new(); let waker = task::waker_ref(&flag); let cx = &mut Context::from_waker(&waker); f(flag.clone(), cx) } // Sends a value on an i32 channel sink struct StartSendFut + Unpin, Item: Unpin>(Option, Option); impl + Unpin, Item: Unpin> StartSendFut { fn new(sink: S, item: Item) -> Self { Self(Some(sink), Some(item)) } } impl + Unpin, Item: Unpin> Future for StartSendFut { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let Self(inner, item) = self.get_mut(); { let mut inner = inner.as_mut().unwrap(); ready!(Pin::new(&mut inner).poll_ready(cx))?; Pin::new(&mut inner).start_send(item.take().unwrap())?; } Poll::Ready(Ok(inner.take().unwrap())) } } // Immediately accepts all requests to start pushing, but completion is managed // by manually flushing struct ManualFlush { data: Vec, waiting_tasks: Vec, } impl Sink> for ManualFlush { type Error = (); fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { if let Some(item) = item { self.data.push(item); } else { self.force_flush(); } Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.data.is_empty() { Poll::Ready(Ok(())) } else { self.waiting_tasks.push(cx.waker().clone()); Poll::Pending } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_flush(cx) } } impl ManualFlush { fn new() -> Self { Self { data: Vec::new(), waiting_tasks: Vec::new() } } fn force_flush(&mut self) -> Vec { for task in self.waiting_tasks.drain(..) { task.wake() } mem::take(&mut self.data) } } struct ManualAllow { data: Vec, allow: Rc, } struct Allow { flag: Cell, tasks: RefCell>, } impl Allow { fn new() -> Self { Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) } } fn check(&self, cx: &mut Context<'_>) -> bool { if self.flag.get() { true } else { self.tasks.borrow_mut().push(cx.waker().clone()); false } } fn start(&self) { self.flag.set(true); let mut tasks = self.tasks.borrow_mut(); for task in tasks.drain(..) { task.wake(); } } } impl Sink for ManualAllow { type Error = (); fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.allow.check(cx) { Poll::Ready(Ok(())) } else { Poll::Pending } } fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { self.data.push(item); Ok(()) } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } fn manual_allow() -> (ManualAllow, Rc) { let allow = Rc::new(Allow::new()); let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() }; (manual_allow, allow) } #[test] fn either_sink() { let mut s = if true { Vec::::new().left_sink() } else { VecDeque::::new().right_sink() }; Pin::new(&mut s).start_send(0).unwrap(); } #[test] fn vec_sink() { let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); assert_eq!(v, vec![0, 1]); block_on(v.flush()).unwrap(); assert_eq!(v, vec![0, 1]); } #[test] fn vecdeque_sink() { let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); assert_eq!(deque.pop_front(), Some(2)); assert_eq!(deque.pop_front(), Some(3)); assert_eq!(deque.pop_front(), None); } #[test] fn send() { let mut v = Vec::new(); block_on(v.send(0)).unwrap(); assert_eq!(v, vec![0]); block_on(v.send(1)).unwrap(); assert_eq!(v, vec![0, 1]); block_on(v.send(2)).unwrap(); assert_eq!(v, vec![0, 1, 2]); } #[test] fn send_all() { let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); assert_eq!(v, vec![0, 1]); block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap(); assert_eq!(v, vec![0, 1, 2, 3]); block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap(); assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); } // Test that `start_send` on an `mpsc` channel does indeed block when the // channel is full #[test] fn mpsc_blocking_start_send() { let (mut tx, mut rx) = mpsc::channel::(0); block_on(future::lazy(|_| { tx.start_send(0).unwrap(); flag_cx(|flag, cx| { let mut task = StartSendFut::new(tx, 1); assert!(task.poll_unpin(cx).is_pending()); assert!(!flag.take()); sassert_next(&mut rx, 0); assert!(flag.take()); unwrap(task.poll_unpin(cx)); assert!(!flag.take()); sassert_next(&mut rx, 1); }) })); } // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed #[test] fn with_flush() { let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { mem::replace(&mut block, future::ok(()).boxed()) .map_ok(move |()| elem + 1) .map_err(|_| -> Never { panic!() }) }); assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(())); flag_cx(|flag, cx| { let mut task = sink.flush(); assert!(task.poll_unpin(cx).is_pending()); tx.send(()).unwrap(); assert!(flag.take()); unwrap(task.poll_unpin(cx)); block_on(sink.send(1)).unwrap(); assert_eq!(sink.get_ref(), &[1, 2]); }) } // test simple use of with to change data #[test] fn with_as_map() { let mut sink = Vec::new().with(|item| future::ok::(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); block_on(sink.send(2)).unwrap(); assert_eq!(sink.get_ref(), &[0, 2, 4]); } // test simple use of with_flat_map #[test] fn with_flat_map() { let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); block_on(sink.send(2)).unwrap(); block_on(sink.send(3)).unwrap(); assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]); } // Check that `with` propagates `poll_ready` to the inner sink. // Regression test for the issue #1834. #[test] fn with_propagates_poll_ready() { let (tx, mut rx) = mpsc::channel::(0); let mut tx = tx.with(|item: i32| future::ok::(item + 10)); block_on(future::lazy(|_| { flag_cx(|flag, cx| { let mut tx = Pin::new(&mut tx); // Should be ready for the first item. assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); assert_eq!(tx.as_mut().start_send(0), Ok(())); // Should be ready for the second item only after the first one is received. assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending); assert!(!flag.take()); sassert_next(&mut rx, 10); assert!(flag.take()); assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); assert_eq!(tx.as_mut().start_send(1), Ok(())); }) })); } // test that the `with` sink doesn't require the underlying sink to flush, // but doesn't claim to be flushed until the underlying sink is #[test] fn with_flush_propagate() { let mut sink = ManualFlush::new().with(future::ok::, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); Pin::new(&mut sink).start_send(Some(0)).unwrap(); unwrap(Pin::new(&mut sink).poll_ready(cx)); Pin::new(&mut sink).start_send(Some(1)).unwrap(); { let mut task = sink.flush(); assert!(task.poll_unpin(cx).is_pending()); assert!(!flag.take()); } assert_eq!(sink.get_mut().force_flush(), vec![0, 1]); assert!(flag.take()); unwrap(sink.flush().poll_unpin(cx)); }) } // test that `Clone` is implemented on `with` sinks #[test] fn with_implements_clone() { let (mut tx, rx) = mpsc::channel(5); { let mut is_positive = tx.clone().with(|item| future::ok::(item > 0)); let mut is_long = tx.clone().with(|item: &str| future::ok::(item.len() > 5)); block_on(is_positive.clone().send(-1)).unwrap(); block_on(is_long.clone().send("123456")).unwrap(); block_on(is_long.send("123")).unwrap(); block_on(is_positive.send(1)).unwrap(); } block_on(tx.send(false)).unwrap(); block_on(tx.close()).unwrap(); assert_eq!(block_on(rx.collect::>()), vec![false, true, false, true, false]); } // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() { let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); assert_eq!(sink.get_ref(), &[0, 1]); let mut sink = Vec::new().buffer(1); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); assert_eq!(sink.get_ref(), &[0, 1]); } // test basic buffer functionality, including both filling up to capacity, // and writing out when the underlying sink is ready #[test] fn buffer() { let (sink, allow) = manual_allow::(); let sink = sink.buffer(2); let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap(); flag_cx(|flag, cx| { let mut task = sink.send(2); assert!(task.poll_unpin(cx).is_pending()); assert!(!flag.take()); allow.start(); assert!(flag.take()); unwrap(task.poll_unpin(cx)); assert_eq!(sink.get_ref().data, vec![0, 1, 2]); }) } #[test] fn fanout_smoke() { let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap(); let (sink1, sink2) = sink.into_inner(); assert_eq!(sink1, vec![1, 2, 3]); assert_eq!(sink2, vec![1, 2, 3]); } #[test] fn fanout_backpressure() { let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap(); flag_cx(|flag, cx| { let mut task = sink.send(2); assert!(!flag.take()); assert!(task.poll_unpin(cx).is_pending()); assert_eq!(block_on(left_recv.next()), Some(0)); assert!(flag.take()); assert!(task.poll_unpin(cx).is_pending()); assert_eq!(block_on(right_recv.next()), Some(0)); assert!(flag.take()); assert!(task.poll_unpin(cx).is_pending()); assert_eq!(block_on(left_recv.next()), Some(2)); assert!(flag.take()); assert!(task.poll_unpin(cx).is_pending()); assert_eq!(block_on(right_recv.next()), Some(2)); assert!(flag.take()); unwrap(task.poll_unpin(cx)); // make sure receivers live until end of test to prevent send errors drop(left_recv); drop(right_recv); }) } #[test] fn sink_map_err() { { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); let mut tx = tx.sink_map_err(|_| ()); assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); } let tx = mpsc::channel(0).0; assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(())); } #[test] fn sink_unfold() { block_on(poll_fn(|cx| { let (tx, mut rx) = mpsc::channel(1); let unfold = sink::unfold((), |(), i: i32| { let mut tx = tx.clone(); async move { tx.send(i).await.unwrap(); Ok::<_, String>(()) } }); futures::pin_mut!(unfold); assert_eq!(unfold.as_mut().start_send(1), Ok(())); assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(()))); assert_eq!(rx.try_next().unwrap(), Some(1)); assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); assert_eq!(unfold.as_mut().start_send(2), Ok(())); assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); assert_eq!(unfold.as_mut().start_send(3), Ok(())); assert_eq!(rx.try_next().unwrap(), Some(2)); assert!(rx.try_next().is_err()); assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); assert_eq!(unfold.as_mut().start_send(4), Ok(())); assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full assert_eq!(rx.try_next().unwrap(), Some(3)); assert_eq!(rx.try_next().unwrap(), Some(4)); Poll::Ready(()) })) } #[test] fn err_into() { #[derive(Copy, Clone, Debug, PartialEq, Eq)] struct ErrIntoTest; impl From for ErrIntoTest { fn from(_: mpsc::SendError) -> Self { Self } } { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); let mut tx: SinkErrInto, _, ErrIntoTest> = tx.sink_err_into(); assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); } let tx = mpsc::channel(0).0; assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest)); }