summaryrefslogtreecommitdiffstats
path: root/third_party/rust/warp/examples/sse_chat.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/warp/examples/sse_chat.rs
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/warp/examples/sse_chat.rs')
-rw-r--r--third_party/rust/warp/examples/sse_chat.rs163
1 files changed, 163 insertions, 0 deletions
diff --git a/third_party/rust/warp/examples/sse_chat.rs b/third_party/rust/warp/examples/sse_chat.rs
new file mode 100644
index 0000000000..6e064b1824
--- /dev/null
+++ b/third_party/rust/warp/examples/sse_chat.rs
@@ -0,0 +1,163 @@
+use futures_util::{Stream, StreamExt};
+use std::collections::HashMap;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc, Mutex,
+};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::UnboundedReceiverStream;
+use warp::{sse::Event, Filter};
+
+#[tokio::main]
+async fn main() {
+ pretty_env_logger::init();
+
+ // Keep track of all connected users, key is usize, value
+ // is an event stream sender.
+ let users = Arc::new(Mutex::new(HashMap::new()));
+ // Turn our "state" into a new Filter...
+ let users = warp::any().map(move || users.clone());
+
+ // POST /chat -> send message
+ let chat_send = warp::path("chat")
+ .and(warp::post())
+ .and(warp::path::param::<usize>())
+ .and(warp::body::content_length_limit(500))
+ .and(
+ warp::body::bytes().and_then(|body: bytes::Bytes| async move {
+ std::str::from_utf8(&body)
+ .map(String::from)
+ .map_err(|_e| warp::reject::custom(NotUtf8))
+ }),
+ )
+ .and(users.clone())
+ .map(|my_id, msg, users| {
+ user_message(my_id, msg, &users);
+ warp::reply()
+ });
+
+ // GET /chat -> messages stream
+ let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
+ // reply using server-sent events
+ let stream = user_connected(users);
+ warp::sse::reply(warp::sse::keep_alive().stream(stream))
+ });
+
+ // GET / -> index html
+ let index = warp::path::end().map(|| {
+ warp::http::Response::builder()
+ .header("content-type", "text/html; charset=utf-8")
+ .body(INDEX_HTML)
+ });
+
+ let routes = index.or(chat_recv).or(chat_send);
+
+ warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
+}
+
+/// Our global unique user id counter.
+static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
+
+/// Message variants.
+#[derive(Debug)]
+enum Message {
+ UserId(usize),
+ Reply(String),
+}
+
+#[derive(Debug)]
+struct NotUtf8;
+impl warp::reject::Reject for NotUtf8 {}
+
+/// Our state of currently connected users.
+///
+/// - Key is their id
+/// - Value is a sender of `Message`
+type Users = Arc<Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
+
+fn user_connected(users: Users) -> impl Stream<Item = Result<Event, warp::Error>> + Send + 'static {
+ // Use a counter to assign a new unique ID for this user.
+ let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
+
+ eprintln!("new chat user: {}", my_id);
+
+ // Use an unbounded channel to handle buffering and flushing of messages
+ // to the event source...
+ let (tx, rx) = mpsc::unbounded_channel();
+ let rx = UnboundedReceiverStream::new(rx);
+
+ tx.send(Message::UserId(my_id))
+ // rx is right above, so this cannot fail
+ .unwrap();
+
+ // Save the sender in our list of connected users.
+ users.lock().unwrap().insert(my_id, tx);
+
+ // Convert messages into Server-Sent Events and return resulting stream.
+ rx.map(|msg| match msg {
+ Message::UserId(my_id) => Ok(Event::default().event("user").data(my_id.to_string())),
+ Message::Reply(reply) => Ok(Event::default().data(reply)),
+ })
+}
+
+fn user_message(my_id: usize, msg: String, users: &Users) {
+ let new_msg = format!("<User#{}>: {}", my_id, msg);
+
+ // New message from this user, send it to everyone else (except same uid)...
+ //
+ // We use `retain` instead of a for loop so that we can reap any user that
+ // appears to have disconnected.
+ users.lock().unwrap().retain(|uid, tx| {
+ if my_id == *uid {
+ // don't send to same user, but do retain
+ true
+ } else {
+ // If not `is_ok`, the SSE stream is gone, and so don't retain
+ tx.send(Message::Reply(new_msg.clone())).is_ok()
+ }
+ });
+}
+
+static INDEX_HTML: &str = r#"
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Warp Chat</title>
+ </head>
+ <body>
+ <h1>warp chat</h1>
+ <div id="chat">
+ <p><em>Connecting...</em></p>
+ </div>
+ <input type="text" id="text" />
+ <button type="button" id="send">Send</button>
+ <script type="text/javascript">
+ var uri = 'http://' + location.host + '/chat';
+ var sse = new EventSource(uri);
+ function message(data) {
+ var line = document.createElement('p');
+ line.innerText = data;
+ chat.appendChild(line);
+ }
+ sse.onopen = function() {
+ chat.innerHTML = "<p><em>Connected!</em></p>";
+ }
+ var user_id;
+ sse.addEventListener("user", function(msg) {
+ user_id = msg.data;
+ });
+ sse.onmessage = function(msg) {
+ message(msg.data);
+ };
+ send.onclick = function() {
+ var msg = text.value;
+ var xhr = new XMLHttpRequest();
+ xhr.open("POST", uri + '/' + user_id, true);
+ xhr.send(msg);
+ text.value = '';
+ message('<You>: ' + msg);
+ };
+ </script>
+ </body>
+</html>
+"#;