diff options
Diffstat (limited to 'third_party/rust/ws/examples')
-rw-r--r-- | third_party/rust/ws/examples/autobahn-client.rs | 76 | ||||
-rw-r--r-- | third_party/rust/ws/examples/autobahn-server.rs | 25 | ||||
-rw-r--r-- | third_party/rust/ws/examples/bench-server.rs | 16 | ||||
-rw-r--r-- | third_party/rust/ws/examples/bench.rs | 79 | ||||
-rw-r--r-- | third_party/rust/ws/examples/channel.rs | 156 | ||||
-rw-r--r-- | third_party/rust/ws/examples/cli.rs | 180 | ||||
-rw-r--r-- | third_party/rust/ws/examples/client.rs | 33 | ||||
-rw-r--r-- | third_party/rust/ws/examples/external_shutdown.rs | 40 | ||||
-rw-r--r-- | third_party/rust/ws/examples/html_chat.rs | 66 | ||||
-rw-r--r-- | third_party/rust/ws/examples/peer2peer.rs | 96 | ||||
-rw-r--r-- | third_party/rust/ws/examples/pong.rs | 130 | ||||
-rw-r--r-- | third_party/rust/ws/examples/remote_addr.rs | 25 | ||||
-rw-r--r-- | third_party/rust/ws/examples/router.rs | 141 | ||||
-rw-r--r-- | third_party/rust/ws/examples/server.rs | 26 | ||||
-rw-r--r-- | third_party/rust/ws/examples/shared.rs | 58 | ||||
-rw-r--r-- | third_party/rust/ws/examples/ssl-server.rs | 122 | ||||
-rw-r--r-- | third_party/rust/ws/examples/threaded.rs | 56 | ||||
-rw-r--r-- | third_party/rust/ws/examples/unsafe-ssl-client.rs | 69 |
18 files changed, 1394 insertions, 0 deletions
diff --git a/third_party/rust/ws/examples/autobahn-client.rs b/third_party/rust/ws/examples/autobahn-client.rs new file mode 100644 index 0000000000..110592d7a6 --- /dev/null +++ b/third_party/rust/ws/examples/autobahn-client.rs @@ -0,0 +1,76 @@ +/// WebSocket client used for testing against the Autobahn Test Suite +extern crate ws; + +use std::cell::Cell; +use std::rc::Rc; +use ws::{connect, CloseCode, Message, Result}; + +#[cfg(feature = "permessage-deflate")] +use ws::deflate::DeflateHandler; + +const AGENT: &str = "WS-RS"; + +#[cfg(not(feature = "permessage-deflate"))] +fn main() { + let total = get_case_count().unwrap(); + let mut case_id = 1; + + while case_id <= total { + let case_url = format!( + "ws://127.0.0.1:9001/runCase?case={}&agent={}", + case_id, AGENT + ); + + connect(case_url, |out| move |msg| out.send(msg)).unwrap(); + + case_id += 1 + } + + update_reports().unwrap(); +} + +#[cfg(feature = "permessage-deflate")] +fn main() { + let total = get_case_count().unwrap(); + let mut case_id = 1; + + while case_id <= total { + let case_url = format!( + "ws://127.0.0.1:9001/runCase?case={}&agent={}", + case_id, AGENT + ); + + connect(case_url, |out| { + DeflateHandler::new(move |msg| out.send(msg)) + }).unwrap(); + + case_id += 1 + } + + update_reports().unwrap(); +} + +fn get_case_count() -> Result<u32> { + // sadly we need to use a Cell because we need to set the total, and RC is immutable + let total = Rc::new(Cell::new(0)); + + connect("ws://127.0.0.1:9001/getCaseCount", |out| { + let my_total = total.clone(); + + move |msg: Message| { + let count = msg.as_text()?; + + my_total.set(count.parse::<u32>().unwrap()); + + out.close(CloseCode::Normal) + } + })?; + + Ok(total.get()) +} + +fn update_reports() -> Result<()> { + let report_url = format!("ws://127.0.0.1:9001/updateReports?agent={}", AGENT); + + connect(report_url, |out| move |_| out.close(CloseCode::Normal)) +} diff --git a/third_party/rust/ws/examples/autobahn-server.rs b/third_party/rust/ws/examples/autobahn-server.rs new file mode 100644 index 0000000000..c0a9a0009b --- /dev/null +++ b/third_party/rust/ws/examples/autobahn-server.rs @@ -0,0 +1,25 @@ +extern crate env_logger; +/// WebSocket server used for testing against the Autobahn Test Suite. This is basically the server +/// example without printing output or comments. +extern crate ws; + +#[cfg(feature = "permessage-deflate")] +use ws::deflate::DeflateHandler; + +#[cfg(not(feature = "permessage-deflate"))] +fn main() { + env_logger::init(); + + ws::listen("127.0.0.1:3012", |out| { + move |msg| out.send(msg) + }).unwrap() +} + +#[cfg(feature = "permessage-deflate")] +fn main() { + env_logger::init(); + + ws::listen("127.0.0.1:3012", |out| { + DeflateHandler::new(move |msg| out.send(msg)) + }).unwrap(); +} diff --git a/third_party/rust/ws/examples/bench-server.rs b/third_party/rust/ws/examples/bench-server.rs new file mode 100644 index 0000000000..bccfe66fb6 --- /dev/null +++ b/third_party/rust/ws/examples/bench-server.rs @@ -0,0 +1,16 @@ +/// WebSocket server used for testing the bench example. +extern crate ws; + +use ws::{Builder, Sender, Settings}; + +fn main() { + Builder::new() + .with_settings(Settings { + max_connections: 10_000, + ..Settings::default() + }) + .build(|out: Sender| move |msg| out.send(msg)) + .unwrap() + .listen("127.0.0.1:3012") + .unwrap(); +} diff --git a/third_party/rust/ws/examples/bench.rs b/third_party/rust/ws/examples/bench.rs new file mode 100644 index 0000000000..427f84147f --- /dev/null +++ b/third_party/rust/ws/examples/bench.rs @@ -0,0 +1,79 @@ +extern crate env_logger; +extern crate time; +extern crate url; +/// A simple, but immature, benchmark client for destroying other WebSocket frameworks and proving +/// WS-RS's performance excellence. ;) +/// Make sure you allow for enough connections in your OS (e.g. ulimit -Sn 10000). +extern crate ws; + +// Try this against node for some fun + +// TODO: Separate this example into a separate lib +// TODO: num threads, num connections per thread, num concurrent connections per thread, num +// messages per connection, length of message, text or binary + +use ws::{Builder, CloseCode, Handler, Handshake, Message, Result, Sender, Settings}; + +const CONNECTIONS: usize = 10_000; // simultaneous +const MESSAGES: u32 = 10; +static MESSAGE: &'static str = "TEST TEST TEST TEST TEST TEST TEST TEST"; + +fn main() { + env_logger::init(); + + let url = url::Url::parse("ws://127.0.0.1:3012").unwrap(); + + struct Connection { + out: Sender, + count: u32, + time: u64, + total: u64, + } + + impl Handler for Connection { + fn on_open(&mut self, _: Handshake) -> Result<()> { + self.out.send(MESSAGE)?; + self.count += 1; + self.time = time::precise_time_ns(); + Ok(()) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + assert_eq!(msg.as_text().unwrap(), MESSAGE); + if self.count > MESSAGES { + self.out.close(CloseCode::Normal)?; + } else { + self.out.send(MESSAGE)?; + let time = time::precise_time_ns(); + // println!("time {}", time -self.time); + self.total += time - self.time; + self.count += 1; + self.time = time; + } + Ok(()) + } + } + + let mut ws = Builder::new() + .with_settings(Settings { + max_connections: CONNECTIONS, + ..Settings::default() + }) + .build(|out| Connection { + out, + count: 0, + time: 0, + total: 0, + }) + .unwrap(); + + for _ in 0..CONNECTIONS { + ws.connect(url.clone()).unwrap(); + } + let start = time::precise_time_ns(); + ws.run().unwrap(); + println!( + "Total time. {}", + (time::precise_time_ns() - start) / 1_000_000 + ) +} diff --git a/third_party/rust/ws/examples/channel.rs b/third_party/rust/ws/examples/channel.rs new file mode 100644 index 0000000000..fb21901297 --- /dev/null +++ b/third_party/rust/ws/examples/channel.rs @@ -0,0 +1,156 @@ +extern crate env_logger; +/// An example of using channels to transfer data between three parts of some system. +/// +/// A WebSocket server echoes data back to a client and tees that data to a logging system. +/// A WebSocket client sends some data do the server. +/// A worker thread stores data as a log and sends that data back to the main program when the +/// WebSocket server has finished receiving data. +/// +/// This example demonstrates how to use threads, channels, and WebSocket handlers to create a +/// complex system from simple, composable parts. +extern crate ws; + +use std::sync::mpsc::Sender as ThreadOut; +use std::sync::mpsc::channel; +use std::thread; +use std::thread::sleep; +use std::time::Duration; + +use ws::{connect, listen, CloseCode, Handler, Handshake, Message, Result, Sender}; + +fn main() { + // Setup logging + env_logger::init(); + + // Data to be sent across WebSockets and channels + let data = vec![1, 2, 3, 4, 5]; + let (final_in, final_out) = channel(); + let (log_in, log_out) = channel(); + + // WebSocket connection handler for the server connection + struct Server { + ws: Sender, + log: ThreadOut<String>, + } + + impl Handler for Server { + fn on_message(&mut self, msg: Message) -> Result<()> { + println!("Server got message '{}'. ", msg); + + // log it + self.log.send(msg.to_string()).unwrap(); + + // echo it back + self.ws.send(msg) + } + + fn on_close(&mut self, _: CloseCode, _: &str) { + self.ws.shutdown().unwrap() + } + } + + // Server thread + let server = thread::Builder::new() + .name("server".to_owned()) + .spawn(move || { + listen("127.0.0.1:3012", |out| { + Server { + ws: out, + // we need to clone the channel because + // in theory, there could be many active connections + log: log_in.clone(), + } + }).unwrap() + }) + .unwrap(); + + // Give the server a little time to get going + sleep(Duration::from_millis(10)); + + // WebSocket connection handler for the client connection + struct Client { + out: Sender, + ind: usize, + data: Vec<u32>, + } + + impl Client { + // Core business logic for client, keeping it DRY + fn increment(&mut self) -> Result<()> { + if let Some(num) = self.data.get(self.ind) { + // Advance the index + self.ind += 1; + + // Send the number to the server + self.out.send(num.to_string()) + } else { + // All of the data has been sent, let's close + self.out.close(CloseCode::Normal) + } + } + } + + impl Handler for Client { + fn on_open(&mut self, _: Handshake) -> Result<()> { + self.increment() + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + println!("Client got message '{}'. ", msg); + self.increment() + } + } + + // We need to clone the data into the client, making two versions we will compare for + // consistency later + let client_data = data.clone(); + + // Client thread + let client = thread::Builder::new() + .name("client".to_owned()) + .spawn(move || { + connect("ws://127.0.0.1:3012", |out| { + Client { + out, + ind: 0, + // we need to clone again because + // in theory, there could be many client connections sending off the data + data: client_data.clone(), + } + }).unwrap() + }) + .unwrap(); + + // Logger thread + let logger = thread::Builder::new() + .name("logger".to_owned()) + .spawn(move || { + // Make a new vector to store the numbers + let mut log: Vec<u32> = Vec::new(); + + // Receive data and push it to the log, this only works if we have one WebSocket + // connection, otherwise the log would have data from all connections. But for our example, + // we know we only have one :) + while let Ok(string) = log_out.recv() { + println!("Logger is storing {}", string); + log.push(string.parse().unwrap()); + } + + println!("Logger sending final log result."); + final_in.send(log).unwrap(); + }) + .unwrap(); + + // Wait for the worker threads to finish what they are doing + let _ = server.join(); + let _ = client.join(); + let _ = logger.join(); + + // Get the result from the logger and check that it is correct + let final_data = final_out.recv().unwrap(); + println!("In: {:?}", data); + println!("Out: {:?}", final_data); + assert_eq!(final_data, data); + + println!("All done.") +} diff --git a/third_party/rust/ws/examples/cli.rs b/third_party/rust/ws/examples/cli.rs new file mode 100644 index 0000000000..80ebf8cc6b --- /dev/null +++ b/third_party/rust/ws/examples/cli.rs @@ -0,0 +1,180 @@ +extern crate clap; +extern crate env_logger; +extern crate term; +/// Run this cli like this: +/// cargo run --example server +/// cargo run --example cli -- ws://127.0.0.1:3012 +extern crate ws; + +use std::io; +use std::io::prelude::*; +use std::sync::mpsc::Sender as TSender; +use std::sync::mpsc::channel; +use std::thread; + +use clap::{App, Arg}; +use ws::{connect, CloseCode, Error, ErrorKind, Handler, Handshake, Message, Result, Sender}; + +fn main() { + // Setup logging + env_logger::init(); + + // setup command line arguments + let matches = App::new("WS Command Line Client") + .version("1.1") + .author("Jason Housley <housleyjk@gmail.com>") + .about("Connect to a WebSocket and send messages from the command line.") + .arg( + Arg::with_name("URL") + .help("The URL of the WebSocket server.") + .required(true) + .index(1), + ) + .get_matches(); + + let url = matches.value_of("URL").unwrap().to_string(); + + let (tx, rx) = channel(); + + // Run client thread with channel to give it's WebSocket message sender back to us + let client = thread::spawn(move || { + println!("Connecting to {}", url); + connect(url, |sender| Client { + ws_out: sender, + thread_out: tx.clone(), + }).unwrap(); + }); + + if let Ok(Event::Connect(sender)) = rx.recv() { + // If we were able to connect, print the instructions + instructions(); + + // Main loop + loop { + // Get user input + let mut input = String::new(); + io::stdin().read_line(&mut input).unwrap(); + + if let Ok(Event::Disconnect) = rx.try_recv() { + break; + } + + if input.starts_with("/h") { + // Show help + instructions() + } else if input.starts_with("/c") { + // If the close arguments are good, close the connection + let args: Vec<&str> = input.split(' ').collect(); + if args.len() == 1 { + // Simple close + println!("Closing normally, please wait..."); + sender.close(CloseCode::Normal).unwrap(); + } else if args.len() == 2 { + // Close with a specific code + if let Ok(code) = args[1].trim().parse::<u16>() { + let code = CloseCode::from(code); + println!("Closing with code: {:?}, please wait...", code); + sender.close(code).unwrap(); + } else { + display(&format!("Unable to parse {} as close code.", args[1])); + // Keep accepting input if the close arguments are invalid + continue; + } + } else { + // Close with a code and a reason + if let Ok(code) = args[1].trim().parse::<u16>() { + let code = CloseCode::from(code); + let reason = args[2..].join(" "); + println!( + "Closing with code: {:?} and reason: {}, please wait...", + code, + reason.trim() + ); + sender + .close_with_reason(code, reason.trim().to_string()) + .unwrap(); + } else { + display(&format!("Unable to parse {} as close code.", args[1])); + // Keep accepting input if the close arguments are invalid + continue; + } + } + break; + } else { + // Send the message + display(&format!(">>> {}", input.trim())); + sender.send(input.trim()).unwrap(); + } + } + } + + // Ensure the client has a chance to finish up + client.join().unwrap(); +} + +fn display(string: &str) { + let mut view = term::stdout().unwrap(); + view.carriage_return().unwrap(); + view.delete_line().unwrap(); + println!("{}", string); + print!("?> "); + io::stdout().flush().unwrap(); +} + +fn instructions() { + println!("Type /close [code] [reason] to close the connection."); + println!("Type /help to show these instructions."); + println!("Other input will be sent as messages.\n"); + print!("?> "); + io::stdout().flush().unwrap(); +} + +struct Client { + ws_out: Sender, + thread_out: TSender<Event>, +} + +impl Handler for Client { + fn on_open(&mut self, _: Handshake) -> Result<()> { + self.thread_out + .send(Event::Connect(self.ws_out.clone())) + .map_err(|err| { + Error::new( + ErrorKind::Internal, + format!("Unable to communicate between threads: {:?}.", err), + ) + }) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + display(&format!("<<< {}", msg)); + Ok(()) + } + + fn on_close(&mut self, code: CloseCode, reason: &str) { + if reason.is_empty() { + display(&format!( + "<<< Closing<({:?})>\nHit any key to end session.", + code + )); + } else { + display(&format!( + "<<< Closing<({:?}) {}>\nHit any key to end session.", + code, reason + )); + } + + if let Err(err) = self.thread_out.send(Event::Disconnect) { + display(&format!("{:?}", err)) + } + } + + fn on_error(&mut self, err: Error) { + display(&format!("<<< Error<{:?}>", err)) + } +} + +enum Event { + Connect(Sender), + Disconnect, +} diff --git a/third_party/rust/ws/examples/client.rs b/third_party/rust/ws/examples/client.rs new file mode 100644 index 0000000000..5390af3a23 --- /dev/null +++ b/third_party/rust/ws/examples/client.rs @@ -0,0 +1,33 @@ +extern crate env_logger; +/// Simple WebSocket client with error handling. It is not necessary to setup logging, but doing +/// so will allow you to see more details about the connection by using the RUST_LOG env variable. +extern crate ws; + +use ws::{connect, CloseCode}; + +fn main() { + // Setup logging + env_logger::init(); + + // Connect to the url and call the closure + if let Err(error) = connect("ws://127.0.0.1:3012", |out| { + // Queue a message to be sent when the WebSocket is open + if out.send("Hello WebSocket").is_err() { + println!("Websocket couldn't queue an initial message.") + } else { + println!("Client sent message 'Hello WebSocket'. ") + } + + // The handler needs to take ownership of out, so we use move + move |msg| { + // Handle messages received on this connection + println!("Client got message '{}'. ", msg); + + // Close the connection + out.close(CloseCode::Normal) + } + }) { + // Inform the user of failure + println!("Failed to create WebSocket due to: {:?}", error); + } +} diff --git a/third_party/rust/ws/examples/external_shutdown.rs b/third_party/rust/ws/examples/external_shutdown.rs new file mode 100644 index 0000000000..9e6f009ecd --- /dev/null +++ b/third_party/rust/ws/examples/external_shutdown.rs @@ -0,0 +1,40 @@ +extern crate ws; + +use std::sync::mpsc::channel; +use std::thread; +use std::time::Duration; + +fn main() { + let (tx, rx) = channel(); + + let socket = ws::Builder::new() + .build(move |out: ws::Sender| { + // When we get a connection, send a handle to the parent thread + tx.send(out).unwrap(); + + // Dummy message handler + move |_| { + println!("Message handler called."); + Ok(()) + } + }) + .unwrap(); + + let handle = socket.broadcaster(); + + let t = thread::spawn(move || { + socket.listen("127.0.0.1:3012").unwrap(); + }); + + // Wait for 5 seconds only for incoming connections; + thread::sleep(Duration::from_millis(5000)); + + if rx.try_recv().is_err() { + // shutdown the server from the outside + handle.shutdown().unwrap(); + println!("Shutting down server because no connections were established."); + } + + // Let the server finish up (whether it's waiting for new connections or going down) + t.join().unwrap(); +} diff --git a/third_party/rust/ws/examples/html_chat.rs b/third_party/rust/ws/examples/html_chat.rs new file mode 100644 index 0000000000..7b7059b83a --- /dev/null +++ b/third_party/rust/ws/examples/html_chat.rs @@ -0,0 +1,66 @@ +/// An example of a chat web application server +extern crate ws; +use ws::{listen, Handler, Message, Request, Response, Result, Sender}; + +// This can be read from a file +static INDEX_HTML: &'static [u8] = br#" +<!DOCTYPE html> +<html> + <head> + <meta charset="utf-8"> + </head> + <body> + <pre id="messages"></pre> + <form id="form"> + <input type="text" id="msg"> + <input type="submit" value="Send"> + </form> + <script> + var socket = new WebSocket("ws://" + window.location.host + "/ws"); + socket.onmessage = function (event) { + var messages = document.getElementById("messages"); + messages.append(event.data + "\n"); + }; + var form = document.getElementById("form"); + form.addEventListener('submit', function (event) { + event.preventDefault(); + var input = document.getElementById("msg"); + socket.send(input.value); + input.value = ""; + }); + </script> + </body> +</html> + "#; + +// Server web application handler +struct Server { + out: Sender, +} + +impl Handler for Server { + // + fn on_request(&mut self, req: &Request) -> Result<(Response)> { + // Using multiple handlers is better (see router example) + match req.resource() { + // The default trait implementation + "/ws" => Response::from_request(req), + + // Create a custom response + "/" => Ok(Response::new(200, "OK", INDEX_HTML.to_vec())), + + _ => Ok(Response::new(404, "Not Found", b"404 - Not Found".to_vec())), + } + } + + // Handle messages recieved in the websocket (in this case, only on /ws) + fn on_message(&mut self, msg: Message) -> Result<()> { + // Broadcast to all connections + self.out.broadcast(msg) + } +} + +fn main() { + // Listen on an address and call the closure for each connection + listen("127.0.0.1:8000", |out| Server { out }).unwrap() +} diff --git a/third_party/rust/ws/examples/peer2peer.rs b/third_party/rust/ws/examples/peer2peer.rs new file mode 100644 index 0000000000..076c3ac757 --- /dev/null +++ b/third_party/rust/ws/examples/peer2peer.rs @@ -0,0 +1,96 @@ +extern crate clap; +extern crate env_logger; +extern crate url; +/// An example of a client-server-agnostic WebSocket that takes input from stdin and sends that +/// input to all other peers. +/// +/// For example, to create a network like this: +/// +/// 3013 ---- 3012 ---- 3014 +/// \ | +/// \ | +/// \ | +/// \ | +/// \ | +/// \ | +/// \ | +/// 3015 +/// +/// Run these commands in separate processes +/// ./peer2peer +/// ./peer2peer --server localhost:3013 ws://localhost:3012 +/// ./peer2peer --server localhost:3014 ws://localhost:3012 +/// ./peer2peer --server localhost:3015 ws://localhost:3012 ws://localhost:3013 +/// +/// Stdin on 3012 will be sent to all other peers +/// Stdin on 3013 will be sent to 3012 and 3015 +/// Stdin on 3014 will be sent to 3012 only +/// Stdin on 3015 will be sent to 3012 and 2013 +extern crate ws; +#[macro_use] +extern crate log; + +use std::io; +use std::io::prelude::*; +use std::thread; + +use clap::{App, Arg}; + +fn main() { + // Setup logging + env_logger::init(); + + // Parse command line arguments + let matches = App::new("Simple Peer 2 Peer") + .version("1.0") + .author("Jason Housley <housleyjk@gmail.com>") + .about("Connect to other peers and listen for incoming connections.") + .arg( + Arg::with_name("server") + .short("s") + .long("server") + .value_name("SERVER") + .help("Set the address to listen for new connections."), + ) + .arg( + Arg::with_name("PEER") + .help("A WebSocket URL to attempt to connect to at start.") + .multiple(true), + ) + .get_matches(); + + // Get address of this peer + let my_addr = matches.value_of("server").unwrap_or("localhost:3012"); + + // Create simple websocket that just prints out messages + let mut me = ws::WebSocket::new(|_| { + move |msg| { + info!("Peer {} got message: {}", my_addr, msg); + Ok(()) + } + }).unwrap(); + + // Get a sender for ALL connections to the websocket + let broacaster = me.broadcaster(); + + // Setup thread for listening to stdin and sending messages to connections + let input = thread::spawn(move || { + let stdin = io::stdin(); + for line in stdin.lock().lines() { + // Send a message to all connections regardless of + // how those connections were established + broacaster.send(line.unwrap()).unwrap(); + } + }); + + // Connect to any existing peers specified on the cli + if let Some(peers) = matches.values_of("PEER") { + for peer in peers { + me.connect(url::Url::parse(peer).unwrap()).unwrap(); + } + } + + // Run the websocket + me.listen(my_addr).unwrap(); + input.join().unwrap(); +} diff --git a/third_party/rust/ws/examples/pong.rs b/third_party/rust/ws/examples/pong.rs new file mode 100644 index 0000000000..5c9c62c2e0 --- /dev/null +++ b/third_party/rust/ws/examples/pong.rs @@ -0,0 +1,130 @@ +extern crate env_logger; +extern crate mio_extras; +extern crate time; +/// An example demonstrating how to send and recieve a custom ping/pong frame. +extern crate ws; + +use std::str::from_utf8; + +use mio_extras::timer::Timeout; + +use ws::util::Token; +use ws::{listen, CloseCode, Error, ErrorKind, Frame, Handler, Handshake, Message, OpCode, Result, + Sender}; + +const PING: Token = Token(1); +const EXPIRE: Token = Token(2); + +fn main() { + // Setup logging + env_logger::init(); + + // Run the WebSocket + listen("127.0.0.1:3012", |out| Server { + out, + ping_timeout: None, + expire_timeout: None, + }).unwrap(); +} + +// Server WebSocket handler +struct Server { + out: Sender, + ping_timeout: Option<Timeout>, + expire_timeout: Option<Timeout>, +} + +impl Handler for Server { + fn on_open(&mut self, _: Handshake) -> Result<()> { + // schedule a timeout to send a ping every 5 seconds + self.out.timeout(5_000, PING)?; + // schedule a timeout to close the connection if there is no activity for 30 seconds + self.out.timeout(30_000, EXPIRE) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + println!("Server got message '{}'. ", msg); + self.out.send(msg) + } + + fn on_close(&mut self, code: CloseCode, reason: &str) { + println!("WebSocket closing for ({:?}) {}", code, reason); + + // NOTE: This code demonstrates cleaning up timeouts + if let Some(t) = self.ping_timeout.take() { + self.out.cancel(t).unwrap(); + } + if let Some(t) = self.expire_timeout.take() { + self.out.cancel(t).unwrap(); + } + + println!("Shutting down server after first connection closes."); + self.out.shutdown().unwrap(); + } + + fn on_error(&mut self, err: Error) { + // Shutdown on any error + println!("Shutting down server for error: {}", err); + self.out.shutdown().unwrap(); + } + + fn on_timeout(&mut self, event: Token) -> Result<()> { + match event { + // PING timeout has occured, send a ping and reschedule + PING => { + self.out.ping(time::precise_time_ns().to_string().into())?; + self.ping_timeout.take(); + self.out.timeout(5_000, PING) + } + // EXPIRE timeout has occured, this means that the connection is inactive, let's close + EXPIRE => self.out.close(CloseCode::Away), + // No other timeouts are possible + _ => Err(Error::new( + ErrorKind::Internal, + "Invalid timeout token encountered!", + )), + } + } + + fn on_new_timeout(&mut self, event: Token, timeout: Timeout) -> Result<()> { + // Cancel the old timeout and replace. + if event == EXPIRE { + if let Some(t) = self.expire_timeout.take() { + self.out.cancel(t)? + } + self.expire_timeout = Some(timeout) + } else { + // This ensures there is only one ping timeout at a time + if let Some(t) = self.ping_timeout.take() { + self.out.cancel(t)? + } + self.ping_timeout = Some(timeout) + } + + Ok(()) + } + + fn on_frame(&mut self, frame: Frame) -> Result<Option<Frame>> { + // If the frame is a pong, print the round-trip time. + // The pong should contain data from out ping, but it isn't guaranteed to. + if frame.opcode() == OpCode::Pong { + if let Ok(pong) = from_utf8(frame.payload())?.parse::<u64>() { + let now = time::precise_time_ns(); + println!("RTT is {:.3}ms.", (now - pong) as f64 / 1_000_000f64); + } else { + println!("Received bad pong."); + } + } + + // Some activity has occured, so reset the expiration + self.out.timeout(30_000, EXPIRE)?; + + // Run default frame validation + DefaultHandler.on_frame(frame) + } +} + +// For accessing the default handler implementation +struct DefaultHandler; + +impl Handler for DefaultHandler {} diff --git a/third_party/rust/ws/examples/remote_addr.rs b/third_party/rust/ws/examples/remote_addr.rs new file mode 100644 index 0000000000..d32e17af25 --- /dev/null +++ b/third_party/rust/ws/examples/remote_addr.rs @@ -0,0 +1,25 @@ +/// Example showing how to obtain the ip address of the client, where possible. +extern crate ws; + +struct Server { + ws: ws::Sender, +} + +impl ws::Handler for Server { + fn on_open(&mut self, shake: ws::Handshake) -> ws::Result<()> { + if let Some(ip_addr) = shake.remote_addr()? { + println!("Connection opened from {}.", ip_addr) + } else { + println!("Unable to obtain client's IP address.") + } + Ok(()) + } + + fn on_message(&mut self, _: ws::Message) -> ws::Result<()> { + self.ws.close(ws::CloseCode::Normal) + } +} + +fn main() { + ws::listen("127.0.0.1:3012", |out| Server { ws: out }).unwrap() +} diff --git a/third_party/rust/ws/examples/router.rs b/third_party/rust/ws/examples/router.rs new file mode 100644 index 0000000000..8e699298e9 --- /dev/null +++ b/third_party/rust/ws/examples/router.rs @@ -0,0 +1,141 @@ +extern crate env_logger; +/// WebSocket server using trait objects to route +/// to an infinitely extensible number of handlers +extern crate ws; + +// A WebSocket handler that routes connections to different boxed handlers by resource +struct Router { + sender: ws::Sender, + inner: Box<ws::Handler>, +} + +impl ws::Handler for Router { + fn on_request(&mut self, req: &ws::Request) -> ws::Result<(ws::Response)> { + // Clone the sender so that we can move it into the child handler + let out = self.sender.clone(); + + match req.resource() { + "/echo" => self.inner = Box::new(Echo { ws: out }), + + // Route to a data handler + "/data/one" => { + self.inner = Box::new(Data { + ws: out, + data: vec!["one", "two", "three", "four", "five"], + }) + } + + // Route to another data handler + "/data/two" => { + self.inner = Box::new(Data { + ws: out, + data: vec!["いち", "二", "さん", "四", "ご"], + }) + } + + // Use a closure as the child handler + "/closure" => { + self.inner = Box::new(move |msg: ws::Message| { + println!("Got a message on a closure handler: {}", msg); + out.close_with_reason(ws::CloseCode::Error, "Not Implemented.") + }) + } + + // Use the default child handler, NotFound + _ => (), + } + + // Delegate to the child handler + self.inner.on_request(req) + } + + // Pass through any other methods that should be delegated to the child. + // + // You could probably use a macro for this if you have many different + // routers or were building some sort of routing framework. + + fn on_shutdown(&mut self) { + self.inner.on_shutdown() + } + + fn on_open(&mut self, shake: ws::Handshake) -> ws::Result<()> { + self.inner.on_open(shake) + } + + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + self.inner.on_message(msg) + } + + fn on_close(&mut self, code: ws::CloseCode, reason: &str) { + self.inner.on_close(code, reason) + } + + fn on_error(&mut self, err: ws::Error) { + self.inner.on_error(err); + } +} + +// This handler returns a 404 response to all handshake requests +struct NotFound; + +impl ws::Handler for NotFound { + fn on_request(&mut self, req: &ws::Request) -> ws::Result<(ws::Response)> { + // This handler responds to all requests with a 404 + let mut res = ws::Response::from_request(req)?; + res.set_status(404); + res.set_reason("Not Found"); + Ok(res) + } +} + +// This handler simply echoes all messages back to the client +struct Echo { + ws: ws::Sender, +} + +impl ws::Handler for Echo { + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + println!("Echo handler received a message: {}", msg); + self.ws.send(msg) + } +} + +// This handler sends some data to the client and then terminates the connection on the first +// message received, presumably confirming receipt of the data +struct Data { + ws: ws::Sender, + data: Vec<&'static str>, +} + +impl ws::Handler for Data { + fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { + for msg in &self.data { + self.ws.send(*msg)? + } + Ok(()) + } + + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + println!("Data handler received a message: {}", msg); + println!("Data handler going down."); + self.ws.close(ws::CloseCode::Normal) + } +} + +fn main() { + env_logger::init(); + + // Listen on an address and call the closure for each connection + if let Err(error) = ws::listen("127.0.0.1:3012", |out| { + // Use our router as the handler to route the new connection + Router { + sender: out, + // Default to returning a 404 when the route doesn't match. + // You could default to any handler here. + inner: Box::new(NotFound), + } + }) { + // Inform the user of failure + println!("Failed to create WebSocket due to {:?}", error); + } +} diff --git a/third_party/rust/ws/examples/server.rs b/third_party/rust/ws/examples/server.rs new file mode 100644 index 0000000000..fd9f5ae671 --- /dev/null +++ b/third_party/rust/ws/examples/server.rs @@ -0,0 +1,26 @@ +extern crate env_logger; +/// Simple WebSocket server with error handling. It is not necessary to setup logging, but doing +/// so will allow you to see more details about the connection by using the RUST_LOG env variable. +extern crate ws; + +use ws::listen; + +fn main() { + // Setup logging + env_logger::init(); + + // Listen on an address and call the closure for each connection + if let Err(error) = listen("127.0.0.1:3012", |out| { + // The handler needs to take ownership of out, so we use move + move |msg| { + // Handle messages received on this connection + println!("Server got message '{}'. ", msg); + + // Use the out channel to send messages back + out.send(msg) + } + }) { + // Inform the user of failure + println!("Failed to create WebSocket due to {:?}", error); + } +} diff --git a/third_party/rust/ws/examples/shared.rs b/third_party/rust/ws/examples/shared.rs new file mode 100644 index 0000000000..e466f5ed46 --- /dev/null +++ b/third_party/rust/ws/examples/shared.rs @@ -0,0 +1,58 @@ +extern crate env_logger; +extern crate url; +/// A single-threaded client + server example showing how flexible closure handlers can be for +/// trivial applications. +extern crate ws; + +use ws::{Sender, WebSocket}; + +fn main() { + // Setup logging + env_logger::init(); + + // A variable to distinguish the two halves + let mut name = "Client"; + + // Create a WebSocket with a closure as the factory + let mut ws = WebSocket::new(|output: Sender| { + // The first connection is named Client + if name == "Client" { + println!("{} sending 'Hello Websocket' ", name); + output.send("Hello Websocket").unwrap(); + } + + // The closure handler needs to take ownership of output + let handler = move |msg| { + println!("{} got '{}' ", name, msg); + + // If we are the server, + if name == "Server" { + println!("{} sending 'How are you?' ", name); + + // send the message back + output.send("How are you?") + } else { + // otherwise, we are the client and will shutdown the WebSocket + output.shutdown() + } + }; + + // The next connection this factory makes will be named Server + name = "Server"; + + // We must return the handler + handler + }).unwrap(); + + // Url for the client + let url = url::Url::parse("ws://127.0.0.1:3012").unwrap(); + + // Queue a WebSocket connection to the url + ws.connect(url).unwrap(); + + // Start listening for incoming conections + ws.listen("127.0.0.1:3012").unwrap(); + + // The WebSocket has shutdown + println!("All done.") +} diff --git a/third_party/rust/ws/examples/ssl-server.rs b/third_party/rust/ws/examples/ssl-server.rs new file mode 100644 index 0000000000..ca788513e7 --- /dev/null +++ b/third_party/rust/ws/examples/ssl-server.rs @@ -0,0 +1,122 @@ +extern crate clap; +extern crate env_logger; +#[cfg(feature = "ssl")] +extern crate openssl; +/// WebSocket server to demonstrate ssl encryption within an a websocket server. +/// +/// The resulting executable takes three arguments: +/// ADDR - The address to listen for incoming connections (e.g. 127.0.0:3012) +/// CERT - The path to the cert PEM (e.g. snakeoil.crt) +/// KEY - The path to the key PEM (e.g. snakeoil.key) +/// +/// For more details concerning setting up the SSL context, see rust-openssl docs. +extern crate ws; + +#[cfg(feature = "ssl")] +use std::fs::File; +#[cfg(feature = "ssl")] +use std::io::Read; +#[cfg(feature = "ssl")] +use std::rc::Rc; + +#[cfg(feature = "ssl")] +use openssl::pkey::PKey; +#[cfg(feature = "ssl")] +use openssl::ssl::{SslAcceptor, SslMethod, SslStream}; +#[cfg(feature = "ssl")] +use openssl::x509::X509; + +#[cfg(feature = "ssl")] +use ws::util::TcpStream; + +#[cfg(feature = "ssl")] +struct Server { + out: ws::Sender, + ssl: Rc<SslAcceptor>, +} + +#[cfg(feature = "ssl")] +impl ws::Handler for Server { + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + self.out.send(msg) // simple echo + } + + fn upgrade_ssl_server(&mut self, sock: TcpStream) -> ws::Result<SslStream<TcpStream>> { + self.ssl.accept(sock).map_err(From::from) + } +} + +#[cfg(feature = "ssl")] +fn main() { + // Setup logging + env_logger::init(); + + // setup command line arguments + let matches = clap::App::new("WS-RS SSL Server Configuration") + .version("1.0") + .author("Jason Housley <housleyjk@gmail.com>") + .about("Establish a WebSocket server that encrypts and decrypts messages.") + .arg( + clap::Arg::with_name("ADDR") + .help("Address on which to bind the server.") + .required(true) + .index(1), + ) + .arg( + clap::Arg::with_name("CERT") + .help("Path to the SSL certificate.") + .required(true) + .index(2), + ) + .arg( + clap::Arg::with_name("KEY") + .help("Path to the SSL certificate key.") + .required(true) + .index(3), + ) + .get_matches(); + + let cert = { + let data = read_file(matches.value_of("CERT").unwrap()).unwrap(); + X509::from_pem(data.as_ref()).unwrap() + }; + + let pkey = { + let data = read_file(matches.value_of("KEY").unwrap()).unwrap(); + PKey::private_key_from_pem(data.as_ref()).unwrap() + }; + + let acceptor = Rc::new({ + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + builder.set_private_key(&pkey).unwrap(); + builder.set_certificate(&cert).unwrap(); + + builder.build() + }); + + ws::Builder::new() + .with_settings(ws::Settings { + encrypt_server: true, + ..ws::Settings::default() + }) + .build(|out: ws::Sender| Server { + out: out, + ssl: acceptor.clone(), + }) + .unwrap() + .listen(matches.value_of("ADDR").unwrap()) + .unwrap(); +} + +#[cfg(feature = "ssl")] +fn read_file(name: &str) -> std::io::Result<Vec<u8>> { + let mut file = File::open(name)?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + Ok(buf) +} + +#[cfg(not(feature = "ssl"))] +fn main() { + println!("SSL feature is not enabled.") +} diff --git a/third_party/rust/ws/examples/threaded.rs b/third_party/rust/ws/examples/threaded.rs new file mode 100644 index 0000000000..35cf0002fc --- /dev/null +++ b/third_party/rust/ws/examples/threaded.rs @@ -0,0 +1,56 @@ +extern crate env_logger; +/// A thread-based client + server example. It also demonstrates using a struct as a WebSocket +/// handler to implement more handler methods than a closure handler allows. +extern crate ws; + +use std::thread; +use std::thread::sleep; +use std::time::Duration; + +use ws::{connect, listen, CloseCode, Handler, Message, Result, Sender}; + +fn main() { + // Setup logging + env_logger::init(); + + // Server WebSocket handler + struct Server { + out: Sender, + } + + impl Handler for Server { + fn on_message(&mut self, msg: Message) -> Result<()> { + println!("Server got message '{}'. ", msg); + self.out.send(msg) + } + + fn on_close(&mut self, code: CloseCode, reason: &str) { + println!("WebSocket closing for ({:?}) {}", code, reason); + println!("Shutting down server after first connection closes."); + self.out.shutdown().unwrap(); + } + } + + // Server thread + let server = thread::spawn(move || listen("127.0.0.1:3012", |out| Server { out }).unwrap()); + + // Give the server a little time to get going + sleep(Duration::from_millis(10)); + + // Client thread + let client = thread::spawn(move || { + connect("ws://127.0.0.1:3012", |out| { + out.send("Hello WebSocket").unwrap(); + + move |msg| { + println!("Client got message '{}'. ", msg); + out.close(CloseCode::Normal) + } + }).unwrap() + }); + + let _ = server.join(); + let _ = client.join(); + + println!("All done.") +} diff --git a/third_party/rust/ws/examples/unsafe-ssl-client.rs b/third_party/rust/ws/examples/unsafe-ssl-client.rs new file mode 100644 index 0000000000..c055cac546 --- /dev/null +++ b/third_party/rust/ws/examples/unsafe-ssl-client.rs @@ -0,0 +1,69 @@ +extern crate env_logger; +#[cfg(feature = "ssl")] +extern crate openssl; +extern crate url; +extern crate ws; + +#[cfg(feature = "ssl")] +use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode}; +#[cfg(feature = "ssl")] +use ws::util::TcpStream; + +#[cfg(feature = "ssl")] +struct Client { + out: ws::Sender, +} + +#[cfg(feature = "ssl")] +impl ws::Handler for Client { + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + println!("msg = {}", msg); + self.out.close(ws::CloseCode::Normal) + } + + fn upgrade_ssl_client( + &mut self, + sock: TcpStream, + _: &url::Url, + ) -> ws::Result<SslStream<TcpStream>> { + let mut builder = SslConnector::builder(SslMethod::tls()).map_err(|e| { + ws::Error::new( + ws::ErrorKind::Internal, + format!("Failed to upgrade client to SSL: {}", e), + ) + })?; + builder.set_verify(SslVerifyMode::empty()); + + let connector = builder.build(); + connector + .configure() + .unwrap() + .use_server_name_indication(false) + .verify_hostname(false) + .connect("", sock) + .map_err(From::from) + } +} + +#[cfg(feature = "ssl")] +fn main() { + // Setup logging + env_logger::init(); + + if let Err(error) = ws::connect("wss://localhost:3443/api/websocket", |out| { + if let Err(_) = out.send("Hello WebSocket") { + println!("Websocket couldn't queue an initial message.") + } else { + println!("Client sent message 'Hello WebSocket'. ") + } + + Client { out } + }) { + println!("Failed to create WebSocket due to: {:?}", error); + } +} + +#[cfg(not(feature = "ssl"))] +fn main() { + println!("SSL feature is not enabled.") +} |