diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/warp/examples/sse_chat.rs | |
parent | Initial commit. (diff) | |
download | firefox-esr-upstream.tar.xz firefox-esr-upstream.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/warp/examples/sse_chat.rs')
-rw-r--r-- | third_party/rust/warp/examples/sse_chat.rs | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/third_party/rust/warp/examples/sse_chat.rs b/third_party/rust/warp/examples/sse_chat.rs new file mode 100644 index 0000000000..6e064b1824 --- /dev/null +++ b/third_party/rust/warp/examples/sse_chat.rs @@ -0,0 +1,163 @@ +use futures_util::{Stream, StreamExt}; +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use warp::{sse::Event, Filter}; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // Keep track of all connected users, key is usize, value + // is an event stream sender. + let users = Arc::new(Mutex::new(HashMap::new())); + // Turn our "state" into a new Filter... + let users = warp::any().map(move || users.clone()); + + // POST /chat -> send message + let chat_send = warp::path("chat") + .and(warp::post()) + .and(warp::path::param::<usize>()) + .and(warp::body::content_length_limit(500)) + .and( + warp::body::bytes().and_then(|body: bytes::Bytes| async move { + std::str::from_utf8(&body) + .map(String::from) + .map_err(|_e| warp::reject::custom(NotUtf8)) + }), + ) + .and(users.clone()) + .map(|my_id, msg, users| { + user_message(my_id, msg, &users); + warp::reply() + }); + + // GET /chat -> messages stream + let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| { + // reply using server-sent events + let stream = user_connected(users); + warp::sse::reply(warp::sse::keep_alive().stream(stream)) + }); + + // GET / -> index html + let index = warp::path::end().map(|| { + warp::http::Response::builder() + .header("content-type", "text/html; charset=utf-8") + .body(INDEX_HTML) + }); + + let routes = index.or(chat_recv).or(chat_send); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +/// Our global unique user id counter. +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); + +/// Message variants. +#[derive(Debug)] +enum Message { + UserId(usize), + Reply(String), +} + +#[derive(Debug)] +struct NotUtf8; +impl warp::reject::Reject for NotUtf8 {} + +/// Our state of currently connected users. +/// +/// - Key is their id +/// - Value is a sender of `Message` +type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>; + +fn user_connected(users: Users) -> impl Stream<Item = Result<Event, warp::Error>> + Send + 'static { + // Use a counter to assign a new unique ID for this user. + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("new chat user: {}", my_id); + + // Use an unbounded channel to handle buffering and flushing of messages + // to the event source... + let (tx, rx) = mpsc::unbounded_channel(); + let rx = UnboundedReceiverStream::new(rx); + + tx.send(Message::UserId(my_id)) + // rx is right above, so this cannot fail + .unwrap(); + + // Save the sender in our list of connected users. + users.lock().unwrap().insert(my_id, tx); + + // Convert messages into Server-Sent Events and return resulting stream. + rx.map(|msg| match msg { + Message::UserId(my_id) => Ok(Event::default().event("user").data(my_id.to_string())), + Message::Reply(reply) => Ok(Event::default().data(reply)), + }) +} + +fn user_message(my_id: usize, msg: String, users: &Users) { + let new_msg = format!("<User#{}>: {}", my_id, msg); + + // New message from this user, send it to everyone else (except same uid)... + // + // We use `retain` instead of a for loop so that we can reap any user that + // appears to have disconnected. + users.lock().unwrap().retain(|uid, tx| { + if my_id == *uid { + // don't send to same user, but do retain + true + } else { + // If not `is_ok`, the SSE stream is gone, and so don't retain + tx.send(Message::Reply(new_msg.clone())).is_ok() + } + }); +} + +static INDEX_HTML: &str = r#" +<!DOCTYPE html> +<html> + <head> + <title>Warp Chat</title> + </head> + <body> + <h1>warp chat</h1> + <div id="chat"> + <p><em>Connecting...</em></p> + </div> + <input type="text" id="text" /> + <button type="button" id="send">Send</button> + <script type="text/javascript"> + var uri = 'http://' + location.host + '/chat'; + var sse = new EventSource(uri); + function message(data) { + var line = document.createElement('p'); + line.innerText = data; + chat.appendChild(line); + } + sse.onopen = function() { + chat.innerHTML = "<p><em>Connected!</em></p>"; + } + var user_id; + sse.addEventListener("user", function(msg) { + user_id = msg.data; + }); + sse.onmessage = function(msg) { + message(msg.data); + }; + send.onclick = function() { + var msg = text.value; + var xhr = new XMLHttpRequest(); + xhr.open("POST", uri + '/' + user_id, true); + xhr.send(msg); + text.value = ''; + message('<You>: ' + msg); + }; + </script> + </body> +</html> +"#; |