summaryrefslogtreecommitdiffstats
path: root/third_party/rust/warp/examples/websockets_chat.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/warp/examples/websockets_chat.rs')
-rw-r--r--third_party/rust/warp/examples/websockets_chat.rs175
1 files changed, 175 insertions, 0 deletions
diff --git a/third_party/rust/warp/examples/websockets_chat.rs b/third_party/rust/warp/examples/websockets_chat.rs
new file mode 100644
index 0000000000..21e2286f6f
--- /dev/null
+++ b/third_party/rust/warp/examples/websockets_chat.rs
@@ -0,0 +1,175 @@
+// #![deny(warnings)]
+use std::collections::HashMap;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use futures_util::{SinkExt, StreamExt, TryFutureExt};
+use tokio::sync::{mpsc, RwLock};
+use tokio_stream::wrappers::UnboundedReceiverStream;
+use warp::ws::{Message, WebSocket};
+use warp::Filter;
+
+/// Our global unique user id counter.
+static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
+
+/// Our state of currently connected users.
+///
+/// - Key is their id
+/// - Value is a sender of `warp::ws::Message`
+type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
+
+#[tokio::main]
+async fn main() {
+ pretty_env_logger::init();
+
+ // Keep track of all connected users, key is usize, value
+ // is a websocket sender.
+ let users = Users::default();
+ // Turn our "state" into a new Filter...
+ let users = warp::any().map(move || users.clone());
+
+ // GET /chat -> websocket upgrade
+ let chat = warp::path("chat")
+ // The `ws()` filter will prepare Websocket handshake...
+ .and(warp::ws())
+ .and(users)
+ .map(|ws: warp::ws::Ws, users| {
+ // This will call our function if the handshake succeeds.
+ ws.on_upgrade(move |socket| user_connected(socket, users))
+ });
+
+ // GET / -> index html
+ let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
+
+ let routes = index.or(chat);
+
+ warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
+}
+
+async fn user_connected(ws: WebSocket, users: Users) {
+ // Use a counter to assign a new unique ID for this user.
+ let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
+
+ eprintln!("new chat user: {}", my_id);
+
+ // Split the socket into a sender and receive of messages.
+ let (mut user_ws_tx, mut user_ws_rx) = ws.split();
+
+ // Use an unbounded channel to handle buffering and flushing of messages
+ // to the websocket...
+ let (tx, rx) = mpsc::unbounded_channel();
+ let mut rx = UnboundedReceiverStream::new(rx);
+
+ tokio::task::spawn(async move {
+ while let Some(message) = rx.next().await {
+ user_ws_tx
+ .send(message)
+ .unwrap_or_else(|e| {
+ eprintln!("websocket send error: {}", e);
+ })
+ .await;
+ }
+ });
+
+ // Save the sender in our list of connected users.
+ users.write().await.insert(my_id, tx);
+
+ // Return a `Future` that is basically a state machine managing
+ // this specific user's connection.
+
+ // Every time the user sends a message, broadcast it to
+ // all other users...
+ while let Some(result) = user_ws_rx.next().await {
+ let msg = match result {
+ Ok(msg) => msg,
+ Err(e) => {
+ eprintln!("websocket error(uid={}): {}", my_id, e);
+ break;
+ }
+ };
+ user_message(my_id, msg, &users).await;
+ }
+
+ // user_ws_rx stream will keep processing as long as the user stays
+ // connected. Once they disconnect, then...
+ user_disconnected(my_id, &users).await;
+}
+
+async fn user_message(my_id: usize, msg: Message, users: &Users) {
+ // Skip any non-Text messages...
+ let msg = if let Ok(s) = msg.to_str() {
+ s
+ } else {
+ return;
+ };
+
+ let new_msg = format!("<User#{}>: {}", my_id, msg);
+
+ // New message from this user, send it to everyone else (except same uid)...
+ for (&uid, tx) in users.read().await.iter() {
+ if my_id != uid {
+ if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
+ // The tx is disconnected, our `user_disconnected` code
+ // should be happening in another task, nothing more to
+ // do here.
+ }
+ }
+ }
+}
+
+async fn user_disconnected(my_id: usize, users: &Users) {
+ eprintln!("good bye user: {}", my_id);
+
+ // Stream closed up, so remove from the user list
+ users.write().await.remove(&my_id);
+}
+
+static INDEX_HTML: &str = r#"<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>Warp Chat</title>
+ </head>
+ <body>
+ <h1>Warp chat</h1>
+ <div id="chat">
+ <p><em>Connecting...</em></p>
+ </div>
+ <input type="text" id="text" />
+ <button type="button" id="send">Send</button>
+ <script type="text/javascript">
+ const chat = document.getElementById('chat');
+ const text = document.getElementById('text');
+ const uri = 'ws://' + location.host + '/chat';
+ const ws = new WebSocket(uri);
+
+ function message(data) {
+ const line = document.createElement('p');
+ line.innerText = data;
+ chat.appendChild(line);
+ }
+
+ ws.onopen = function() {
+ chat.innerHTML = '<p><em>Connected!</em></p>';
+ };
+
+ ws.onmessage = function(msg) {
+ message(msg.data);
+ };
+
+ ws.onclose = function() {
+ chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
+ };
+
+ send.onclick = function() {
+ const msg = text.value;
+ ws.send(msg);
+ text.value = '';
+
+ message('<You>: ' + msg);
+ };
+ </script>
+ </body>
+</html>
+"#;