//! A chat server that broadcasts a message to all connections. //! //! This is a line-based server which accepts connections, reads lines from //! those connections, and broadcasts the lines to all other connected clients. //! //! This example is similar to chat.rs, but uses combinators and a much more //! functional style. //! //! Because we are here running the reactor/executor on the same thread instead //! of a threadpool, we can avoid full synchronization with Arc + Mutex and use //! Rc + RefCell instead. The max performance is however limited to a CPU HW //! thread. //! //! You can test this out by running: //! //! cargo run --example chat-combinator-current-thread //! //! And then in another window run: //! //! cargo run --example connect 127.0.0.1:8080 //! //! You can run the second command in multiple windows and then chat between the //! two, seeing the messages from the other client as they're received. For all //! connected clients they'll all join the same room and see everyone else's //! messages. #![deny(warnings)] extern crate futures; extern crate tokio; use tokio::io; use tokio::net::TcpListener; use tokio::prelude::*; use tokio::runtime::current_thread::{Runtime, TaskExecutor}; use std::cell::RefCell; use std::collections::HashMap; use std::env; use std::io::BufReader; use std::iter; use std::rc::Rc; fn main() -> Result<(), Box> { let mut runtime = Runtime::new().unwrap(); // Create the TCP listener we'll accept connections on. let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse()?; let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); // This is running on the Tokio current_thread runtime, so it will be single- // threaded. The `Rc>` allows state to be shared across the tasks. let connections = Rc::new(RefCell::new(HashMap::new())); // The server task asynchronously iterates over and processes each incoming // connection. let srv = socket .incoming() .map_err(|e| { println!("failed to accept socket; error = {:?}", e); e }) .for_each(move |stream| { // The client's socket address let addr = stream.peer_addr()?; println!("New Connection: {}", addr); // Split the TcpStream into two separate handles. One handle for reading // and one handle for writing. This lets us use separate tasks for // reading and writing. let (reader, writer) = stream.split(); // Create a channel for our stream, which other sockets will use to // send us messages. Then register our address with the stream to send // data to us. let (tx, rx) = futures::sync::mpsc::unbounded(); let mut conns = connections.borrow_mut(); conns.insert(addr, tx); // Define here what we do for the actual I/O. That is, read a bunch of // lines from the socket and dispatch them while we also write any lines // from other sockets. let connections_inner = connections.clone(); let reader = BufReader::new(reader); // Model the read portion of this socket by mapping an infinite // iterator to each line off the socket. This "loop" is then // terminated with an error once we hit EOF on the socket. let iter = stream::iter_ok::<_, io::Error>(iter::repeat(())); let socket_reader = iter.fold(reader, move |reader, _| { // Read a line off the socket, failing if we're at EOF let line = io::read_until(reader, b'\n', Vec::new()); let line = line.and_then(|(reader, vec)| { if vec.len() == 0 { Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")) } else { Ok((reader, vec)) } }); // Convert the bytes we read into a string, and then send that // string to all other connected clients. let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec))); // Move the connection state into the closure below. let connections = connections_inner.clone(); line.map(move |(reader, message)| { println!("{}: {:?}", addr, message); let mut conns = connections.borrow_mut(); if let Ok(msg) = message { // For each open connection except the sender, send the // string via the channel. let iter = conns .iter_mut() .filter(|&(&k, _)| k != addr) .map(|(_, v)| v); for tx in iter { tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap(); } } else { let tx = conns.get_mut(&addr).unwrap(); tx.unbounded_send("You didn't send valid UTF-8.".to_string()) .unwrap(); } reader }) }); // Whenever we receive a string on the Receiver, we write it to // `WriteHalf`. let socket_writer = rx.fold(writer, |writer, msg| { let amt = io::write_all(writer, msg.into_bytes()); let amt = amt.map(|(writer, _)| writer); amt.map_err(|_| ()) }); // Now that we've got futures representing each half of the socket, we // use the `select` combinator to wait for either half to be done to // tear down the other. Then we spawn off the result. let connections = connections.clone(); let socket_reader = socket_reader.map_err(|_| ()); let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ())); // Spawn locally a task to process the connection TaskExecutor::current() .spawn_local(Box::new(connection.then(move |_| { let mut conns = connections.borrow_mut(); conns.remove(&addr); println!("Connection {} closed.", addr); Ok(()) }))) .unwrap(); Ok(()) }) .map_err(|err| println!("error occurred: {:?}", err)); // Spawn srv itself runtime.spawn(srv); // Execute server runtime.run().unwrap(); Ok(()) }