diff options
Diffstat (limited to 'third_party/rust/mio-extras/test/test_poll_channel.rs')
-rw-r--r-- | third_party/rust/mio-extras/test/test_poll_channel.rs | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/third_party/rust/mio-extras/test/test_poll_channel.rs b/third_party/rust/mio-extras/test/test_poll_channel.rs new file mode 100644 index 0000000000..2091f65cb4 --- /dev/null +++ b/third_party/rust/mio-extras/test/test_poll_channel.rs @@ -0,0 +1,338 @@ +use expect_events; +use mio::event::Event; +use mio::{Events, Poll, PollOpt, Ready, Token}; +use mio_extras::channel; +use std::sync::mpsc::TryRecvError; +use std::thread; +use std::time::Duration; + +#[test] +pub fn test_poll_channel_edge() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Poll again and there should be no events + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Poll again, nothing + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push a value + tx.send("goodbye").unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Read the value + rx.try_recv().unwrap(); + + // Drop the sender half + drop(tx); + + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + match rx.try_recv() { + Err(TryRecvError::Disconnected) => {} + no => panic!("unexpected value {:?}", no), + } +} + +#[test] +pub fn test_poll_channel_oneshot() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ).unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + + // Poll again and there should be no events + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Poll again, nothing + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push a value + tx.send("goodbye").unwrap(); + + // Poll again, nothing + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Reregistering will re-trigger the notification + for _ in 0..3 { + poll.reregister( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ).unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(1, num); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + } + + // Get the value + assert_eq!("goodbye", rx.try_recv().unwrap()); + + poll.reregister( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ).unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + poll.reregister( + &rx, + Token(123), + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ).unwrap(); + + // Have an event + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_poll_channel_level() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()) + .unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Polling will contain the event + for i in 0..5 { + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert!(1 == num, "actually got {} on iteration {}", num, i); + + let event = events.iter().next().unwrap(); + assert_eq!(event.token(), Token(123)); + assert_eq!(event.readiness(), Ready::readable()); + } + + // Read the value + assert_eq!("hello", rx.try_recv().unwrap()); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_poll_channel_writable() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()) + .unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); + + // Push the value + tx.send("hello").unwrap(); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_dropping_receive_before_poll() { + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Push the value + tx.send("hello").unwrap(); + + // Drop the receive end + drop(rx); + + // Wait, but nothing should happen + let num = poll.poll(&mut events, Some(Duration::from_millis(300))) + .unwrap(); + assert_eq!(0, num); +} + +#[test] +pub fn test_mixing_channel_with_socket() { + use mio::net::{TcpListener, TcpStream}; + + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + let (tx, rx) = channel::channel(); + + // Create the listener + let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + + // Register the listener with `Poll` + poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Push a value onto the channel + tx.send("hello").unwrap(); + + // Connect a TCP socket + let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); + + // Register the socket + poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()) + .unwrap(); + + // Sleep a bit to ensure it arrives at dest + thread::sleep(Duration::from_millis(250)); + + expect_events( + &poll, + &mut events, + 2, + vec![ + Event::new(Ready::empty(), Token(0)), + Event::new(Ready::empty(), Token(1)), + ], + ); +} + +#[test] +pub fn test_sending_from_other_thread_while_polling() { + const ITERATIONS: usize = 20; + const THREADS: usize = 5; + + // Make sure to run multiple times + let poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(1024); + + for _ in 0..ITERATIONS { + let (tx, rx) = channel::channel(); + poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()) + .unwrap(); + + for _ in 0..THREADS { + let tx = tx.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + tx.send("ping").unwrap(); + }); + } + + let mut recv = 0; + + while recv < THREADS { + let num = poll.poll(&mut events, None).unwrap(); + + if num != 0 { + assert_eq!(1, num); + assert_eq!(events.iter().next().unwrap().token(), Token(0)); + + while let Ok(_) = rx.try_recv() { + recv += 1; + } + } + } + } +} |