From 36d22d82aa202bb199967e9512281e9a53db42c9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 21:33:14 +0200 Subject: Adding upstream version 115.7.0esr. Signed-off-by: Daniel Baumann --- third_party/rust/warp/examples/sse_chat.rs | 163 +++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 third_party/rust/warp/examples/sse_chat.rs (limited to 'third_party/rust/warp/examples/sse_chat.rs') diff --git a/third_party/rust/warp/examples/sse_chat.rs b/third_party/rust/warp/examples/sse_chat.rs new file mode 100644 index 0000000000..6e064b1824 --- /dev/null +++ b/third_party/rust/warp/examples/sse_chat.rs @@ -0,0 +1,163 @@ +use futures_util::{Stream, StreamExt}; +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; +use warp::{sse::Event, Filter}; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + // Keep track of all connected users, key is usize, value + // is an event stream sender. + let users = Arc::new(Mutex::new(HashMap::new())); + // Turn our "state" into a new Filter... + let users = warp::any().map(move || users.clone()); + + // POST /chat -> send message + let chat_send = warp::path("chat") + .and(warp::post()) + .and(warp::path::param::()) + .and(warp::body::content_length_limit(500)) + .and( + warp::body::bytes().and_then(|body: bytes::Bytes| async move { + std::str::from_utf8(&body) + .map(String::from) + .map_err(|_e| warp::reject::custom(NotUtf8)) + }), + ) + .and(users.clone()) + .map(|my_id, msg, users| { + user_message(my_id, msg, &users); + warp::reply() + }); + + // GET /chat -> messages stream + let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| { + // reply using server-sent events + let stream = user_connected(users); + warp::sse::reply(warp::sse::keep_alive().stream(stream)) + }); + + // GET / -> index html + let index = warp::path::end().map(|| { + warp::http::Response::builder() + .header("content-type", "text/html; charset=utf-8") + .body(INDEX_HTML) + }); + + let routes = index.or(chat_recv).or(chat_send); + + warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; +} + +/// Our global unique user id counter. +static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); + +/// Message variants. +#[derive(Debug)] +enum Message { + UserId(usize), + Reply(String), +} + +#[derive(Debug)] +struct NotUtf8; +impl warp::reject::Reject for NotUtf8 {} + +/// Our state of currently connected users. +/// +/// - Key is their id +/// - Value is a sender of `Message` +type Users = Arc>>>; + +fn user_connected(users: Users) -> impl Stream> + Send + 'static { + // Use a counter to assign a new unique ID for this user. + let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); + + eprintln!("new chat user: {}", my_id); + + // Use an unbounded channel to handle buffering and flushing of messages + // to the event source... + let (tx, rx) = mpsc::unbounded_channel(); + let rx = UnboundedReceiverStream::new(rx); + + tx.send(Message::UserId(my_id)) + // rx is right above, so this cannot fail + .unwrap(); + + // Save the sender in our list of connected users. + users.lock().unwrap().insert(my_id, tx); + + // Convert messages into Server-Sent Events and return resulting stream. + rx.map(|msg| match msg { + Message::UserId(my_id) => Ok(Event::default().event("user").data(my_id.to_string())), + Message::Reply(reply) => Ok(Event::default().data(reply)), + }) +} + +fn user_message(my_id: usize, msg: String, users: &Users) { + let new_msg = format!(": {}", my_id, msg); + + // New message from this user, send it to everyone else (except same uid)... + // + // We use `retain` instead of a for loop so that we can reap any user that + // appears to have disconnected. + users.lock().unwrap().retain(|uid, tx| { + if my_id == *uid { + // don't send to same user, but do retain + true + } else { + // If not `is_ok`, the SSE stream is gone, and so don't retain + tx.send(Message::Reply(new_msg.clone())).is_ok() + } + }); +} + +static INDEX_HTML: &str = r#" + + + + Warp Chat + + +

warp chat

+
+

Connecting...

+
+ + + + + +"#; -- cgit v1.2.3