use mio::{Events, Poll, PollOpt, Ready, Token}; use mio::net::UdpSocket; use bytes::{Buf, RingBuf, SliceBuf, MutBuf}; use std::io::ErrorKind; use std::str; use std::time; use localhost; use iovec::IoVec; const LISTENER: Token = Token(0); const SENDER: Token = Token(1); pub struct UdpHandlerSendRecv { tx: UdpSocket, rx: UdpSocket, msg: &'static str, buf: SliceBuf<'static>, rx_buf: RingBuf, connected: bool, shutdown: bool, } impl UdpHandlerSendRecv { fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv { UdpHandlerSendRecv { tx, rx, msg, buf: SliceBuf::wrap(msg.as_bytes()), rx_buf: RingBuf::new(1024), connected, shutdown: false, } } } fn assert_send() { } fn assert_sync() { } #[cfg(test)] fn test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool) { debug!("Starting TEST_UDP_SOCKETS"); let poll = Poll::new().unwrap(); assert_send::(); assert_sync::(); // ensure that the sockets are non-blocking let mut buf = [0; 128]; assert_eq!(ErrorKind::WouldBlock, rx.recv_from(&mut buf).unwrap_err().kind()); info!("Registering SENDER"); poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap(); info!("Registering LISTENER"); poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap(); let mut events = Events::with_capacity(1024); info!("Starting event loop to test with..."); let mut handler = UdpHandlerSendRecv::new(tx, rx, connected, "hello world"); while !handler.shutdown { poll.poll(&mut events, None).unwrap(); for event in &events { if event.readiness().is_readable() { if let LISTENER = event.token() { debug!("We are receiving a datagram now..."); let cnt = unsafe { if !handler.connected { handler.rx.recv_from(handler.rx_buf.mut_bytes()).unwrap().0 } else { handler.rx.recv(handler.rx_buf.mut_bytes()).unwrap() } }; unsafe { MutBuf::advance(&mut handler.rx_buf, cnt); } assert!(str::from_utf8(handler.rx_buf.bytes()).unwrap() == handler.msg); handler.shutdown = true; } } if event.readiness().is_writable() { if let SENDER = event.token() { let cnt = if !handler.connected { let addr = handler.rx.local_addr().unwrap(); handler.tx.send_to(handler.buf.bytes(), &addr).unwrap() } else { handler.tx.send(handler.buf.bytes()).unwrap() }; handler.buf.advance(cnt); } } } } } /// Returns the sender and the receiver fn connected_sockets() -> (UdpSocket, UdpSocket) { let addr = localhost(); let any = localhost(); let tx = UdpSocket::bind(&any).unwrap(); let rx = UdpSocket::bind(&addr).unwrap(); let tx_addr = tx.local_addr().unwrap(); let rx_addr = rx.local_addr().unwrap(); assert!(tx.connect(rx_addr).is_ok()); assert!(rx.connect(tx_addr).is_ok()); (tx, rx) } #[test] pub fn test_udp_socket() { let addr = localhost(); let any = localhost(); let tx = UdpSocket::bind(&any).unwrap(); let rx = UdpSocket::bind(&addr).unwrap(); test_send_recv_udp(tx, rx, false); } #[test] pub fn test_udp_socket_send_recv() { let (tx, rx) = connected_sockets(); test_send_recv_udp(tx, rx, true); } #[test] pub fn test_udp_socket_discard() { let addr = localhost(); let any = localhost(); let outside = localhost(); let tx = UdpSocket::bind(&any).unwrap(); let rx = UdpSocket::bind(&addr).unwrap(); let udp_outside = UdpSocket::bind(&outside).unwrap(); let tx_addr = tx.local_addr().unwrap(); let rx_addr = rx.local_addr().unwrap(); assert!(tx.connect(rx_addr).is_ok()); assert!(udp_outside.connect(rx_addr).is_ok()); assert!(rx.connect(tx_addr).is_ok()); let poll = Poll::new().unwrap(); let r = udp_outside.send(b"hello world"); assert!(r.is_ok() || r.unwrap_err().kind() == ErrorKind::WouldBlock); poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap(); poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap(); let mut events = Events::with_capacity(1024); poll.poll(&mut events, Some(time::Duration::from_secs(5))).unwrap(); for event in &events { if event.readiness().is_readable() { if let LISTENER = event.token() { assert!(false, "Expected to no receive a packet but got something") } } } } #[cfg(all(unix, not(target_os = "fuchsia")))] #[test] pub fn test_udp_socket_send_recv_bufs() { let (tx, rx) = connected_sockets(); let poll = Poll::new().unwrap(); poll.register(&tx, SENDER, Ready::writable(), PollOpt::edge()) .unwrap(); poll.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()) .unwrap(); let mut events = Events::with_capacity(1024); let data = b"hello, world"; let write_bufs: Vec<_> = vec![b"hello, " as &[u8], b"world"] .into_iter() .flat_map(IoVec::from_bytes) .collect(); let (a, b, c) = ( &mut [0u8; 4] as &mut [u8], &mut [0u8; 6] as &mut [u8], &mut [0u8; 8] as &mut [u8], ); let mut read_bufs: Vec<_> = vec![a, b, c] .into_iter() .flat_map(IoVec::from_bytes_mut) .collect(); let times = 5; let mut rtimes = 0; let mut wtimes = 0; 'outer: loop { poll.poll(&mut events, None).unwrap(); for event in &events { if event.readiness().is_readable() { if let LISTENER = event.token() { loop { let cnt = match rx.recv_bufs(read_bufs.as_mut()) { Ok(cnt) => cnt, Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, Err(e) => panic!("read error {}", e), }; assert_eq!(cnt, data.len()); let res: Vec = read_bufs .iter() .flat_map(|buf| buf.iter()) .cloned() .collect(); assert_eq!(&res[..cnt], &data[..cnt]); rtimes += 1; if rtimes == times { break 'outer; } } } } if event.readiness().is_writable() { if let SENDER = event.token() { while wtimes < times { let cnt = match tx.send_bufs(write_bufs.as_slice()) { Ok(cnt) => cnt, Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, Err(e) => panic!("write error {}", e), }; assert_eq!(cnt, data.len()); wtimes += 1; } } } } } }