diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/tests/sink.rs | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/futures-0.1.31/tests/sink.rs | 446 |
1 files changed, 446 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/tests/sink.rs b/third_party/rust/futures-0.1.31/tests/sink.rs new file mode 100644 index 0000000000..460dbdf20c --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/sink.rs @@ -0,0 +1,446 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::mem; +use std::sync::Arc; +use std::rc::Rc; +use std::cell::{Cell, RefCell}; +use std::sync::atomic::{Ordering, AtomicBool}; + +use futures::prelude::*; +use futures::future::ok; +use futures::stream; +use futures::sync::{oneshot, mpsc}; +use futures::task::{self, Task}; +use futures::executor::{self, Notify}; +use futures::sink::SinkFromErr; + +mod support; +use support::*; + +#[test] +fn vec_sink() { + let mut v = Vec::new(); + assert_eq!(v.start_send(0), Ok(AsyncSink::Ready)); + assert_eq!(v.start_send(1), Ok(AsyncSink::Ready)); + assert_eq!(v, vec![0, 1]); + assert_done(move || v.flush(), Ok(vec![0, 1])); +} + +#[test] +fn send() { + let v = Vec::new(); + + let v = v.send(0).wait().unwrap(); + assert_eq!(v, vec![0]); + + let v = v.send(1).wait().unwrap(); + assert_eq!(v, vec![0, 1]); + + assert_done(move || v.send(2), + Ok(vec![0, 1, 2])); +} + +#[test] +fn send_all() { + let v = Vec::new(); + + let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap(); + assert_eq!(v, vec![0, 1]); + + let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap(); + assert_eq!(v, vec![0, 1, 2, 3]); + + assert_done( + move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v), + Ok(vec![0, 1, 2, 3, 4, 5])); +} + +// An Unpark struct that records unpark events for inspection +struct Flag(pub AtomicBool); + +impl Flag { + fn new() -> Arc<Flag> { + Arc::new(Flag(AtomicBool::new(false))) + } + + fn get(&self) -> bool { + self.0.load(Ordering::SeqCst) + } + + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } +} + +impl Notify for Flag { + fn notify(&self, _id: usize) { + self.set(true) + } +} + +// Sends a value on an i32 channel sink +struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>); + +impl<S: Sink> StartSendFut<S> { + fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> { + StartSendFut(Some(sink), Some(item)) + } +} + +impl<S: Sink> Future for StartSendFut<S> { + type Item = S; + type Error = S::SinkError; + + fn poll(&mut self) -> Poll<S, S::SinkError> { + match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? { + AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())), + AsyncSink::NotReady(item) => { + self.1 = Some(item); + Ok(Async::NotReady) + } + } + + } +} + +#[test] +// Test that `start_send` on an `mpsc` channel does indeed block when the +// channel is full +fn mpsc_blocking_start_send() { + let (mut tx, mut rx) = mpsc::channel::<i32>(0); + + futures::future::lazy(|| { + assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready); + + let flag = Flag::new(); + let mut task = executor::spawn(StartSendFut::new(tx, 1)); + + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + assert!(!flag.get()); + sassert_next(&mut rx, 0); + assert!(flag.get()); + flag.set(false); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready()); + assert!(!flag.get()); + sassert_next(&mut rx, 1); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + +#[test] +// test `flush` by using `with` to make the first insertion into a sink block +// until a oneshot is completed +fn with_flush() { + let (tx, rx) = oneshot::channel(); + let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>; + let mut sink = Vec::new().with(|elem| { + mem::replace(&mut block, Box::new(ok(()))) + .map(move |_| elem + 1).map_err(|_| -> () { panic!() }) + }); + + assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready)); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.flush()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + tx.send(()).unwrap(); + assert!(flag.get()); + + let sink = match task.poll_future_notify(&flag, 0).unwrap() { + Async::Ready(sink) => sink, + _ => panic!() + }; + + assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]); +} + +#[test] +// test simple use of with to change data +fn with_as_map() { + let sink = Vec::new().with(|item| -> Result<i32, ()> { + Ok(item * 2) + }); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + let sink = sink.send(2).wait().unwrap(); + assert_eq!(sink.get_ref(), &[0, 2, 4]); +} + +#[test] +// test simple use of with_flat_map +fn with_flat_map() { + let sink = Vec::new().with_flat_map(|item| { + stream::iter_ok(vec![item; item]) + }); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + let sink = sink.send(2).wait().unwrap(); + let sink = sink.send(3).wait().unwrap(); + assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]); +} + +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush<T> { + data: Vec<T>, + waiting_tasks: Vec<Task>, +} + +impl<T> Sink for ManualFlush<T> { + type SinkItem = Option<T>; // Pass None to flush + type SinkError = (); + + fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> { + if let Some(item) = op { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), ()> { + if self.data.is_empty() { + Ok(Async::Ready(())) + } else { + self.waiting_tasks.push(task::current()); + Ok(Async::NotReady) + } + } + + fn close(&mut self) -> Poll<(), ()> { + Ok(().into()) + } +} + +impl<T> ManualFlush<T> { + fn new() -> ManualFlush<T> { + ManualFlush { + data: Vec::new(), + waiting_tasks: Vec::new() + } + } + + fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.notify() + } + mem::replace(&mut self.data, Vec::new()) + } +} + +#[test] +// test that the `with` sink doesn't require the underlying sink to flush, +// but doesn't claim to be flushed until the underlying sink is +fn with_flush_propagate() { + let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) }); + assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready); + assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.flush()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + assert!(!flag.get()); + assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready()); +} + +#[test] +// test that a buffer is a no-nop around a sink that always accepts sends +fn buffer_noop() { + let sink = Vec::new().buffer(0); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); + + let sink = Vec::new().buffer(1); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); +} + +struct ManualAllow<T> { + data: Vec<T>, + allow: Rc<Allow>, +} + +struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Task>>, +} + +impl Allow { + fn new() -> Allow { + Allow { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + fn check(&self) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(task::current()); + false + } + } + + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.notify(); + } + } +} + +impl<T> Sink for ManualAllow<T> { + type SinkItem = T; + type SinkError = (); + + fn start_send(&mut self, item: T) -> StartSend<T, ()> { + if self.allow.check() { + self.data.push(item); + Ok(AsyncSink::Ready) + } else { + Ok(AsyncSink::NotReady(item)) + } + } + + fn poll_complete(&mut self) -> Poll<(), ()> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), ()> { + Ok(().into()) + } +} + +fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) +} + +#[test] +// test basic buffer functionality, including both filling up to capacity, +// and writing out when the underlying sink is ready +fn buffer() { + let (sink, allow) = manual_allow::<i32>(); + let sink = sink.buffer(2); + + let sink = StartSendFut::new(sink, 0).wait().unwrap(); + let sink = StartSendFut::new(sink, 1).wait().unwrap(); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.send(2)); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + assert!(!flag.get()); + allow.start(); + assert!(flag.get()); + match task.poll_future_notify(&flag, 0).unwrap() { + Async::Ready(sink) => { + assert_eq!(sink.get_ref().data, vec![0, 1, 2]); + } + _ => panic!() + } +} + +#[test] +fn fanout_smoke() { + let sink1 = Vec::new(); + let sink2 = Vec::new(); + let sink = sink1.fanout(sink2); + let stream = futures::stream::iter_ok(vec![1,2,3]); + let (sink, _) = sink.send_all(stream).wait().unwrap(); + let (sink1, sink2) = sink.into_inner(); + assert_eq!(sink1, vec![1,2,3]); + assert_eq!(sink2, vec![1,2,3]); +} + +#[test] +fn fanout_backpressure() { + let (left_send, left_recv) = mpsc::channel(0); + let (right_send, right_recv) = mpsc::channel(0); + let sink = left_send.fanout(right_send); + + let sink = StartSendFut::new(sink, 0).wait().unwrap(); + let sink = StartSendFut::new(sink, 1).wait().unwrap(); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.send(2)); + assert!(!flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(0)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(0)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(1)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(1)); + assert!(flag.get()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + match task.poll_future_notify(&flag, 0).unwrap() { + Async::Ready(_) => { + }, + _ => panic!() + }; + // make sure receivers live until end of test to prevent send errors + drop(left_recv); + drop(right_recv); +} + +#[test] +fn map_err() { + { + let (tx, _rx) = mpsc::channel(1); + let mut tx = tx.sink_map_err(|_| ()); + assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready)); + assert_eq!(tx.poll_complete(), Ok(Async::Ready(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(())); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest; + +impl<T> From<mpsc::SendError<T>> for FromErrTest { + fn from(_: mpsc::SendError<T>) -> FromErrTest { + FromErrTest + } +} + +#[test] +fn from_err() { + { + let (tx, _rx) = mpsc::channel(1); + let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err(); + assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready)); + assert_eq!(tx.poll_complete(), Ok(Async::Ready(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest)); +} |