use futures_util::{Stream, StreamExt}; use std::collections::HashMap; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use warp::{sse::Event, Filter}; #[tokio::main] async fn main() { pretty_env_logger::init(); // Keep track of all connected users, key is usize, value // is an event stream sender. let users = Arc::new(Mutex::new(HashMap::new())); // Turn our "state" into a new Filter... let users = warp::any().map(move || users.clone()); // POST /chat -> send message let chat_send = warp::path("chat") .and(warp::post()) .and(warp::path::param::()) .and(warp::body::content_length_limit(500)) .and( warp::body::bytes().and_then(|body: bytes::Bytes| async move { std::str::from_utf8(&body) .map(String::from) .map_err(|_e| warp::reject::custom(NotUtf8)) }), ) .and(users.clone()) .map(|my_id, msg, users| { user_message(my_id, msg, &users); warp::reply() }); // GET /chat -> messages stream let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| { // reply using server-sent events let stream = user_connected(users); warp::sse::reply(warp::sse::keep_alive().stream(stream)) }); // GET / -> index html let index = warp::path::end().map(|| { warp::http::Response::builder() .header("content-type", "text/html; charset=utf-8") .body(INDEX_HTML) }); let routes = index.or(chat_recv).or(chat_send); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; } /// Our global unique user id counter. static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); /// Message variants. #[derive(Debug)] enum Message { UserId(usize), Reply(String), } #[derive(Debug)] struct NotUtf8; impl warp::reject::Reject for NotUtf8 {} /// Our state of currently connected users. /// /// - Key is their id /// - Value is a sender of `Message` type Users = Arc>>>; fn user_connected(users: Users) -> impl Stream> + Send + 'static { // Use a counter to assign a new unique ID for this user. let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); eprintln!("new chat user: {}", my_id); // Use an unbounded channel to handle buffering and flushing of messages // to the event source... let (tx, rx) = mpsc::unbounded_channel(); let rx = UnboundedReceiverStream::new(rx); tx.send(Message::UserId(my_id)) // rx is right above, so this cannot fail .unwrap(); // Save the sender in our list of connected users. users.lock().unwrap().insert(my_id, tx); // Convert messages into Server-Sent Events and return resulting stream. rx.map(|msg| match msg { Message::UserId(my_id) => Ok(Event::default().event("user").data(my_id.to_string())), Message::Reply(reply) => Ok(Event::default().data(reply)), }) } fn user_message(my_id: usize, msg: String, users: &Users) { let new_msg = format!(": {}", my_id, msg); // New message from this user, send it to everyone else (except same uid)... // // We use `retain` instead of a for loop so that we can reap any user that // appears to have disconnected. users.lock().unwrap().retain(|uid, tx| { if my_id == *uid { // don't send to same user, but do retain true } else { // If not `is_ok`, the SSE stream is gone, and so don't retain tx.send(Message::Reply(new_msg.clone())).is_ok() } }); } static INDEX_HTML: &str = r#" Warp Chat

warp chat

Connecting...

"#;