diff options
Diffstat (limited to 'third_party/rust/ws/src/io.rs')
-rw-r--r-- | third_party/rust/ws/src/io.rs | 985 |
1 files changed, 985 insertions, 0 deletions
diff --git a/third_party/rust/ws/src/io.rs b/third_party/rust/ws/src/io.rs new file mode 100644 index 0000000000..7739cdc472 --- /dev/null +++ b/third_party/rust/ws/src/io.rs @@ -0,0 +1,985 @@ +use std::borrow::Borrow; +use std::io::{Error as IoError, ErrorKind}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::time::Duration; +use std::usize; + +use mio; +use mio::tcp::{TcpListener, TcpStream}; +use mio::{Poll, PollOpt, Ready, Token}; +use mio_extras; + +use url::Url; + +#[cfg(feature = "native_tls")] +use native_tls::Error as SslError; + +use super::Settings; +use communication::{Command, Sender, Signal}; +use connection::Connection; +use factory::Factory; +use slab::Slab; +use result::{Error, Kind, Result}; + + +const QUEUE: Token = Token(usize::MAX - 3); +const TIMER: Token = Token(usize::MAX - 4); +pub const ALL: Token = Token(usize::MAX - 5); +const SYSTEM: Token = Token(usize::MAX - 6); + +type Conn<F> = Connection<<F as Factory>::Handler>; + +const MAX_EVENTS: usize = 1024; +const MESSAGES_PER_TICK: usize = 256; +const TIMER_TICK_MILLIS: u64 = 100; +const TIMER_WHEEL_SIZE: usize = 1024; +const TIMER_CAPACITY: usize = 65_536; + +#[cfg(not(windows))] +const CONNECTION_REFUSED: i32 = 111; +#[cfg(windows)] +const CONNECTION_REFUSED: i32 = 61; + +fn url_to_addrs(url: &Url) -> Result<Vec<SocketAddr>> { + let host = url.host_str(); + if host.is_none() || (url.scheme() != "ws" && url.scheme() != "wss") { + return Err(Error::new( + Kind::Internal, + format!("Not a valid websocket url: {}", url), + )); + } + let host = host.unwrap(); + + let port = url.port_or_known_default().unwrap_or(80); + let mut addrs = (&host[..], port) + .to_socket_addrs()? + .collect::<Vec<SocketAddr>>(); + addrs.dedup(); + Ok(addrs) +} + +enum State { + Active, + Inactive, +} + +impl State { + fn is_active(&self) -> bool { + match *self { + State::Active => true, + State::Inactive => false, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Timeout { + connection: Token, + event: Token, +} + +pub struct Handler<F> +where + F: Factory, +{ + listener: Option<TcpListener>, + connections: Slab<Conn<F>>, + factory: F, + settings: Settings, + state: State, + queue_tx: mio::channel::SyncSender<Command>, + queue_rx: mio::channel::Receiver<Command>, + timer: mio_extras::timer::Timer<Timeout>, + next_connection_id: u32, +} + +impl<F> Handler<F> +where + F: Factory, +{ + pub fn new(factory: F, settings: Settings) -> Handler<F> { + let (tx, rx) = mio::channel::sync_channel(settings.max_connections * settings.queue_size); + let timer = mio_extras::timer::Builder::default() + .tick_duration(Duration::from_millis(TIMER_TICK_MILLIS)) + .num_slots(TIMER_WHEEL_SIZE) + .capacity(TIMER_CAPACITY) + .build(); + Handler { + listener: None, + connections: Slab::with_capacity(settings.max_connections), + factory, + settings, + state: State::Inactive, + queue_tx: tx, + queue_rx: rx, + timer, + next_connection_id: 0, + } + } + + pub fn sender(&self) -> Sender { + Sender::new(ALL, self.queue_tx.clone(), 0) + } + + pub fn listen(&mut self, poll: &mut Poll, addr: &SocketAddr) -> Result<&mut Handler<F>> { + debug_assert!( + self.listener.is_none(), + "Attempted to listen for connections from two addresses on the same websocket." + ); + + let tcp = TcpListener::bind(addr)?; + // TODO: consider net2 in order to set reuse_addr + poll.register(&tcp, ALL, Ready::readable(), PollOpt::level())?; + self.listener = Some(tcp); + Ok(self) + } + + pub fn local_addr(&self) -> ::std::io::Result<SocketAddr> { + if let Some(ref listener) = self.listener { + listener.local_addr() + } else { + Err(IoError::new(ErrorKind::NotFound, "Not a listening socket")) + } + } + + #[cfg(any(feature = "ssl", feature = "nativetls"))] + pub fn connect(&mut self, poll: &mut Poll, url: Url) -> Result<()> { + let settings = self.settings; + + let (tok, addresses) = { + let (tok, entry, connection_id, handler) = + if self.connections.len() < settings.max_connections { + let entry = self.connections.vacant_entry(); + let tok = Token(entry.key()); + let connection_id = self.next_connection_id; + self.next_connection_id = self.next_connection_id.wrapping_add(1); + ( + tok, + entry, + connection_id, + self.factory.client_connected(Sender::new( + tok, + self.queue_tx.clone(), + connection_id, + )), + ) + } else { + return Err(Error::new( + Kind::Capacity, + "Unable to add another connection to the event loop.", + )); + }; + + let mut addresses = match url_to_addrs(&url) { + Ok(addresses) => addresses, + Err(err) => { + self.factory.connection_lost(handler); + return Err(err); + } + }; + + loop { + if let Some(addr) = addresses.pop() { + if let Ok(sock) = TcpStream::connect(&addr) { + if settings.tcp_nodelay { + sock.set_nodelay(true)? + } + addresses.push(addr); // Replace the first addr in case ssl fails and we fallback + entry.insert(Connection::new(tok, sock, handler, settings, connection_id)); + break; + } + } else { + self.factory.connection_lost(handler); + return Err(Error::new( + Kind::Internal, + format!("Unable to obtain any socket address for {}", url), + )); + } + } + + (tok, addresses) + }; + + let will_encrypt = url.scheme() == "wss"; + + if let Err(error) = self.connections[tok.into()].as_client(url, addresses) { + let handler = self.connections.remove(tok.into()).consume(); + self.factory.connection_lost(handler); + return Err(error); + } + + if will_encrypt { + while let Err(ssl_error) = self.connections[tok.into()].encrypt() { + match ssl_error.kind { + #[cfg(feature = "ssl")] + Kind::Ssl(ref inner_ssl_error) => { + if let Some(io_error) = inner_ssl_error.io_error() { + if let Some(errno) = io_error.raw_os_error() { + if errno == CONNECTION_REFUSED { + if let Err(reset_error) = self.connections[tok.into()].reset() { + trace!( + "Encountered error while trying to reset connection: {:?}", + reset_error + ); + } else { + continue; + } + } + } + } + } + #[cfg(feature = "nativetls")] + Kind::Ssl(_) => { + if let Err(reset_error) = self.connections[tok.into()].reset() { + trace!( + "Encountered error while trying to reset connection: {:?}", + reset_error + ); + } else { + continue; + } + } + _ => (), + } + self.connections[tok.into()].error(ssl_error); + // Allow socket to be registered anyway to await hangup + break; + } + } + + poll.register( + self.connections[tok.into()].socket(), + self.connections[tok.into()].token(), + self.connections[tok.into()].events(), + PollOpt::edge() | PollOpt::oneshot(), + ).map_err(Error::from) + .or_else(|err| { + error!( + "Encountered error while trying to build WebSocket connection: {}", + err + ); + let handler = self.connections.remove(tok.into()).consume(); + self.factory.connection_lost(handler); + Err(err) + }) + } + + #[cfg(not(any(feature = "ssl", feature = "nativetls")))] + pub fn connect(&mut self, poll: &mut Poll, url: Url) -> Result<()> { + let settings = self.settings; + + let (tok, addresses) = { + let (tok, entry, connection_id, handler) = + if self.connections.len() < settings.max_connections { + let entry = self.connections.vacant_entry(); + let tok = Token(entry.key()); + let connection_id = self.next_connection_id; + self.next_connection_id = self.next_connection_id.wrapping_add(1); + ( + tok, + entry, + connection_id, + self.factory.client_connected(Sender::new( + tok, + self.queue_tx.clone(), + connection_id, + )), + ) + } else { + return Err(Error::new( + Kind::Capacity, + "Unable to add another connection to the event loop.", + )); + }; + + let mut addresses = match url_to_addrs(&url) { + Ok(addresses) => addresses, + Err(err) => { + self.factory.connection_lost(handler); + return Err(err); + } + }; + + loop { + if let Some(addr) = addresses.pop() { + if let Ok(sock) = TcpStream::connect(&addr) { + if settings.tcp_nodelay { + sock.set_nodelay(true)? + } + entry.insert(Connection::new(tok, sock, handler, settings, connection_id)); + break; + } + } else { + self.factory.connection_lost(handler); + return Err(Error::new( + Kind::Internal, + format!("Unable to obtain any socket address for {}", url), + )); + } + } + + (tok, addresses) + }; + + if url.scheme() == "wss" { + let error = Error::new( + Kind::Protocol, + "The ssl feature is not enabled. Please enable it to use wss urls.", + ); + let handler = self.connections.remove(tok.into()).consume(); + self.factory.connection_lost(handler); + return Err(error); + } + + if let Err(error) = self.connections[tok.into()].as_client(url, addresses) { + let handler = self.connections.remove(tok.into()).consume(); + self.factory.connection_lost(handler); + return Err(error); + } + + poll.register( + self.connections[tok.into()].socket(), + self.connections[tok.into()].token(), + self.connections[tok.into()].events(), + PollOpt::edge() | PollOpt::oneshot(), + ).map_err(Error::from) + .or_else(|err| { + error!( + "Encountered error while trying to build WebSocket connection: {}", + err + ); + let handler = self.connections.remove(tok.into()).consume(); + self.factory.connection_lost(handler); + Err(err) + }) + } + + #[cfg(any(feature = "ssl", feature = "nativetls"))] + pub fn accept(&mut self, poll: &mut Poll, sock: TcpStream) -> Result<()> { + let factory = &mut self.factory; + let settings = self.settings; + + if settings.tcp_nodelay { + sock.set_nodelay(true)? + } + + let tok = { + if self.connections.len() < settings.max_connections { + let entry = self.connections.vacant_entry(); + let tok = Token(entry.key()); + let connection_id = self.next_connection_id; + self.next_connection_id = self.next_connection_id.wrapping_add(1); + let handler = factory.server_connected(Sender::new( + tok, + self.queue_tx.clone(), + connection_id, + )); + entry.insert(Connection::new(tok, sock, handler, settings, connection_id)); + tok + } else { + return Err(Error::new( + Kind::Capacity, + "Unable to add another connection to the event loop.", + )); + } + }; + + let conn = &mut self.connections[tok.into()]; + + conn.as_server()?; + if settings.encrypt_server { + conn.encrypt()? + } + + poll.register( + conn.socket(), + conn.token(), + conn.events(), + PollOpt::edge() | PollOpt::oneshot(), + ).map_err(Error::from) + .or_else(|err| { + error!( + "Encountered error while trying to build WebSocket connection: {}", + err + ); + conn.error(err); + if settings.panic_on_new_connection { + panic!("Encountered error while trying to build WebSocket connection."); + } + Ok(()) + }) + } + + #[cfg(not(any(feature = "ssl", feature = "nativetls")))] + pub fn accept(&mut self, poll: &mut Poll, sock: TcpStream) -> Result<()> { + let factory = &mut self.factory; + let settings = self.settings; + + if settings.tcp_nodelay { + sock.set_nodelay(true)? + } + + let tok = { + if self.connections.len() < settings.max_connections { + let entry = self.connections.vacant_entry(); + let tok = Token(entry.key()); + let connection_id = self.next_connection_id; + self.next_connection_id = self.next_connection_id.wrapping_add(1); + let handler = factory.server_connected(Sender::new( + tok, + self.queue_tx.clone(), + connection_id, + )); + entry.insert(Connection::new(tok, sock, handler, settings, connection_id)); + tok + } else { + return Err(Error::new( + Kind::Capacity, + "Unable to add another connection to the event loop.", + )); + } + }; + + let conn = &mut self.connections[tok.into()]; + + conn.as_server()?; + if settings.encrypt_server { + return Err(Error::new( + Kind::Protocol, + "The ssl feature is not enabled. Please enable it to use wss urls.", + )); + } + + poll.register( + conn.socket(), + conn.token(), + conn.events(), + PollOpt::edge() | PollOpt::oneshot(), + ).map_err(Error::from) + .or_else(|err| { + error!( + "Encountered error while trying to build WebSocket connection: {}", + err + ); + conn.error(err); + if settings.panic_on_new_connection { + panic!("Encountered error while trying to build WebSocket connection."); + } + Ok(()) + }) + } + + pub fn run(&mut self, poll: &mut Poll) -> Result<()> { + trace!("Running event loop"); + poll.register( + &self.queue_rx, + QUEUE, + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + )?; + poll.register(&self.timer, TIMER, Ready::readable(), PollOpt::edge())?; + + self.state = State::Active; + let result = self.event_loop(poll); + self.state = State::Inactive; + + result + .and(poll.deregister(&self.timer).map_err(Error::from)) + .and(poll.deregister(&self.queue_rx).map_err(Error::from)) + } + + #[inline] + fn event_loop(&mut self, poll: &mut Poll) -> Result<()> { + let mut events = mio::Events::with_capacity(MAX_EVENTS); + while self.state.is_active() { + trace!("Waiting for event"); + let nevents = match poll.poll(&mut events, None) { + Ok(nevents) => nevents, + Err(err) => { + if err.kind() == ErrorKind::Interrupted { + if self.settings.shutdown_on_interrupt { + error!("Websocket shutting down for interrupt."); + self.state = State::Inactive; + } else { + error!("Websocket received interrupt."); + } + 0 + } else { + return Err(Error::from(err)); + } + } + }; + trace!("Processing {} events", nevents); + + for i in 0..nevents { + let evt = events.get(i).unwrap(); + self.handle_event(poll, evt.token(), evt.kind()); + } + + self.check_count(); + } + Ok(()) + } + + #[inline] + fn schedule(&self, poll: &mut Poll, conn: &Conn<F>) -> Result<()> { + trace!( + "Scheduling connection to {} as {:?}", + conn.socket() + .peer_addr() + .map(|addr| addr.to_string()) + .unwrap_or_else(|_| "UNKNOWN".into()), + conn.events() + ); + poll.reregister( + conn.socket(), + conn.token(), + conn.events(), + PollOpt::edge() | PollOpt::oneshot(), + )?; + Ok(()) + } + + fn shutdown(&mut self) { + debug!("Received shutdown signal. WebSocket is attempting to shut down."); + for (_, conn) in self.connections.iter_mut() { + conn.shutdown(); + } + self.factory.on_shutdown(); + self.state = State::Inactive; + if self.settings.panic_on_shutdown { + panic!("Panicking on shutdown as per setting.") + } + } + + #[inline] + fn check_active(&mut self, poll: &mut Poll, active: bool, token: Token) { + // NOTE: Closing state only applies after a ws connection was successfully + // established. It's possible that we may go inactive while in a connecting + // state if the handshake fails. + if !active { + if let Ok(addr) = self.connections[token.into()].socket().peer_addr() { + debug!("WebSocket connection to {} disconnected.", addr); + } else { + trace!("WebSocket connection to token={:?} disconnected.", token); + } + let handler = self.connections.remove(token.into()).consume(); + self.factory.connection_lost(handler); + } else { + self.schedule(poll, &self.connections[token.into()]) + .or_else(|err| { + // This will be an io error, so disconnect will already be called + self.connections[token.into()].error(err); + let handler = self.connections.remove(token.into()).consume(); + self.factory.connection_lost(handler); + Ok::<(), Error>(()) + }) + .unwrap() + } + } + + #[inline] + fn is_client(&self) -> bool { + self.listener.is_none() + } + + #[inline] + fn check_count(&mut self) { + trace!("Active connections {:?}", self.connections.len()); + if self.connections.is_empty() { + if !self.state.is_active() { + debug!("Shutting down websocket server."); + } else if self.is_client() { + debug!("Shutting down websocket client."); + self.factory.on_shutdown(); + self.state = State::Inactive; + } + } + } + + fn handle_event(&mut self, poll: &mut Poll, token: Token, events: Ready) { + match token { + SYSTEM => { + debug_assert!(false, "System token used for io event. This is a bug!"); + error!("System token used for io event. This is a bug!"); + } + ALL => { + if events.is_readable() { + match self.listener + .as_ref() + .expect("No listener provided for server websocket connections") + .accept() + { + Ok((sock, addr)) => { + info!("Accepted a new tcp connection from {}.", addr); + if let Err(err) = self.accept(poll, sock) { + error!("Unable to build WebSocket connection {:?}", err); + if self.settings.panic_on_new_connection { + panic!("Unable to build WebSocket connection {:?}", err); + } + } + } + Err(err) => error!( + "Encountered an error {:?} while accepting tcp connection.", + err + ), + } + } + } + TIMER => while let Some(t) = self.timer.poll() { + self.handle_timeout(poll, t); + }, + QUEUE => { + for _ in 0..MESSAGES_PER_TICK { + match self.queue_rx.try_recv() { + Ok(cmd) => self.handle_queue(poll, cmd), + _ => break, + } + } + let _ = poll.reregister( + &self.queue_rx, + QUEUE, + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot(), + ); + } + _ => { + let active = { + let conn_events = self.connections[token.into()].events(); + + if (events & conn_events).is_readable() { + if let Err(err) = self.connections[token.into()].read() { + trace!("Encountered error while reading: {}", err); + if let Kind::Io(ref err) = err.kind { + if let Some(errno) = err.raw_os_error() { + if errno == CONNECTION_REFUSED { + match self.connections[token.into()].reset() { + Ok(_) => { + poll.register( + self.connections[token.into()].socket(), + self.connections[token.into()].token(), + self.connections[token.into()].events(), + PollOpt::edge() | PollOpt::oneshot(), + ).or_else(|err| { + self.connections[token.into()] + .error(Error::from(err)); + let handler = self.connections + .remove(token.into()) + .consume(); + self.factory.connection_lost(handler); + Ok::<(), Error>(()) + }) + .unwrap(); + return; + } + Err(err) => { + trace!("Encountered error while trying to reset connection: {:?}", err); + } + } + } + } + } + // This will trigger disconnect if the connection is open + self.connections[token.into()].error(err) + } + } + + let conn_events = self.connections[token.into()].events(); + + if (events & conn_events).is_writable() { + if let Err(err) = self.connections[token.into()].write() { + trace!("Encountered error while writing: {}", err); + if let Kind::Io(ref err) = err.kind { + if let Some(errno) = err.raw_os_error() { + if errno == CONNECTION_REFUSED { + match self.connections[token.into()].reset() { + Ok(_) => { + poll.register( + self.connections[token.into()].socket(), + self.connections[token.into()].token(), + self.connections[token.into()].events(), + PollOpt::edge() | PollOpt::oneshot(), + ).or_else(|err| { + self.connections[token.into()] + .error(Error::from(err)); + let handler = self.connections + .remove(token.into()) + .consume(); + self.factory.connection_lost(handler); + Ok::<(), Error>(()) + }) + .unwrap(); + return; + } + Err(err) => { + trace!("Encountered error while trying to reset connection: {:?}", err); + } + } + } + } + } + // This will trigger disconnect if the connection is open + self.connections[token.into()].error(err) + } + } + + // connection events may have changed + self.connections[token.into()].events().is_readable() + || self.connections[token.into()].events().is_writable() + }; + + self.check_active(poll, active, token) + } + } + } + + fn handle_queue(&mut self, poll: &mut Poll, cmd: Command) { + match cmd.token() { + SYSTEM => { + // Scaffolding for system events such as internal timeouts + } + ALL => { + let mut dead = Vec::with_capacity(self.connections.len()); + + match cmd.into_signal() { + Signal::Message(msg) => { + trace!("Broadcasting message: {:?}", msg); + for (_, conn) in self.connections.iter_mut() { + if let Err(err) = conn.send_message(msg.clone()) { + dead.push((conn.token(), err)) + } + } + } + Signal::Close(code, reason) => { + trace!("Broadcasting close: {:?} - {}", code, reason); + for (_, conn) in self.connections.iter_mut() { + if let Err(err) = conn.send_close(code, reason.borrow()) { + dead.push((conn.token(), err)) + } + } + } + Signal::Ping(data) => { + trace!("Broadcasting ping"); + for (_, conn) in self.connections.iter_mut() { + if let Err(err) = conn.send_ping(data.clone()) { + dead.push((conn.token(), err)) + } + } + } + Signal::Pong(data) => { + trace!("Broadcasting pong"); + for (_, conn) in self.connections.iter_mut() { + if let Err(err) = conn.send_pong(data.clone()) { + dead.push((conn.token(), err)) + } + } + } + Signal::Connect(url) => { + if let Err(err) = self.connect(poll, url.clone()) { + if self.settings.panic_on_new_connection { + panic!("Unable to establish connection to {}: {:?}", url, err); + } + error!("Unable to establish connection to {}: {:?}", url, err); + } + return; + } + Signal::Shutdown => self.shutdown(), + Signal::Timeout { + delay, + token: event, + } => { + let timeout = self.timer.set_timeout( + Duration::from_millis(delay), + Timeout { + connection: ALL, + event, + }, + ); + for (_, conn) in self.connections.iter_mut() { + if let Err(err) = conn.new_timeout(event, timeout.clone()) { + conn.error(err); + } + } + return; + } + Signal::Cancel(timeout) => { + self.timer.cancel_timeout(&timeout); + return; + } + } + + for (_, conn) in self.connections.iter() { + if let Err(err) = self.schedule(poll, conn) { + dead.push((conn.token(), err)) + } + } + for (token, err) in dead { + // note the same connection may be called twice + self.connections[token.into()].error(err) + } + } + token => { + let connection_id = cmd.connection_id(); + match cmd.into_signal() { + Signal::Message(msg) => { + if let Some(conn) = self.connections.get_mut(token.into()) { + if conn.connection_id() == connection_id { + if let Err(err) = conn.send_message(msg) { + conn.error(err) + } + } else { + trace!("Connection disconnected while a message was waiting in the queue.") + } + } else { + trace!( + "Connection disconnected while a message was waiting in the queue." + ) + } + } + Signal::Close(code, reason) => { + if let Some(conn) = self.connections.get_mut(token.into()) { + if conn.connection_id() == connection_id { + if let Err(err) = conn.send_close(code, reason) { + conn.error(err) + } + } else { + trace!("Connection disconnected while close signal was waiting in the queue.") + } + } else { + trace!("Connection disconnected while close signal was waiting in the queue.") + } + } + Signal::Ping(data) => { + if let Some(conn) = self.connections.get_mut(token.into()) { + if conn.connection_id() == connection_id { + if let Err(err) = conn.send_ping(data) { + conn.error(err) + } + } else { + trace!("Connection disconnected while ping signal was waiting in the queue.") + } + } else { + trace!("Connection disconnected while ping signal was waiting in the queue.") + } + } + Signal::Pong(data) => { + if let Some(conn) = self.connections.get_mut(token.into()) { + if conn.connection_id() == connection_id { + if let Err(err) = conn.send_pong(data) { + conn.error(err) + } + } else { + trace!("Connection disconnected while pong signal was waiting in the queue.") + } + } else { + trace!("Connection disconnected while pong signal was waiting in the queue.") + } + } + Signal::Connect(url) => { + if let Err(err) = self.connect(poll, url.clone()) { + if let Some(conn) = self.connections.get_mut(token.into()) { + conn.error(err) + } else { + if self.settings.panic_on_new_connection { + panic!("Unable to establish connection to {}: {:?}", url, err); + } + error!("Unable to establish connection to {}: {:?}", url, err); + } + } + return; + } + Signal::Shutdown => self.shutdown(), + Signal::Timeout { + delay, + token: event, + } => { + let timeout = self.timer.set_timeout( + Duration::from_millis(delay), + Timeout { + connection: token, + event, + }, + ); + if let Some(conn) = self.connections.get_mut(token.into()) { + if let Err(err) = conn.new_timeout(event, timeout) { + conn.error(err) + } + } else { + trace!("Connection disconnected while pong signal was waiting in the queue.") + } + return; + } + Signal::Cancel(timeout) => { + self.timer.cancel_timeout(&timeout); + return; + } + } + + if self.connections.get(token.into()).is_some() { + if let Err(err) = self.schedule(poll, &self.connections[token.into()]) { + self.connections[token.into()].error(err) + } + } + } + } + } + + fn handle_timeout(&mut self, poll: &mut Poll, Timeout { connection, event }: Timeout) { + let active = { + if let Some(conn) = self.connections.get_mut(connection.into()) { + if let Err(err) = conn.timeout_triggered(event) { + conn.error(err) + } + + conn.events().is_readable() || conn.events().is_writable() + } else { + trace!("Connection disconnected while timeout was waiting."); + return; + } + }; + self.check_active(poll, active, connection); + } +} + +mod test { + #![allow(unused_imports, unused_variables, dead_code)] + use std::str::FromStr; + + use url::Url; + + use super::url_to_addrs; + use super::*; + use result::{Error, Kind}; + + #[test] + fn test_url_to_addrs() { + let ws_url = Url::from_str("ws://example.com?query=me").unwrap(); + let wss_url = Url::from_str("wss://example.com/suburl#fragment").unwrap(); + let bad_url = Url::from_str("http://howdy.bad.com").unwrap(); + let no_resolve = Url::from_str("ws://bad.elucitrans.com").unwrap(); + + assert!(url_to_addrs(&ws_url).is_ok()); + assert!(url_to_addrs(&ws_url).unwrap().len() > 0); + assert!(url_to_addrs(&wss_url).is_ok()); + assert!(url_to_addrs(&wss_url).unwrap().len() > 0); + + match url_to_addrs(&bad_url) { + Ok(_) => panic!("url_to_addrs accepts http urls."), + Err(Error { + kind: Kind::Internal, + details: _, + }) => (), // pass + err => panic!("{:?}", err), + } + + match url_to_addrs(&no_resolve) { + Ok(_) => panic!("url_to_addrs creates addresses for non-existent domains."), + Err(Error { + kind: Kind::Io(_), + details: _, + }) => (), // pass + err => panic!("{:?}", err), + } + } + +} |