diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.22/examples/chat.rs')
-rw-r--r-- | third_party/rust/tokio-0.1.22/examples/chat.rs | 473 |
1 files changed, 473 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/examples/chat.rs b/third_party/rust/tokio-0.1.22/examples/chat.rs new file mode 100644 index 0000000000..b21432afa2 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/examples/chat.rs @@ -0,0 +1,473 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This example is explicitly more verbose than it has to be. This is to +//! illustrate more concepts. +//! +//! A chat server for telnet clients. After a telnet client connects, the first +//! line should contain the client's name. After that, all lines sent by a +//! client are broadcasted to all other connected clients. +//! +//! Because the client is telnet, lines are delimited by "\r\n". +//! +//! You can test this out by running: +//! +//! cargo run --example chat +//! +//! And then in another terminal run: +//! +//! telnet localhost 6142 +//! +//! You can run the `telnet` command in any number of additional windows. +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +#![deny(warnings)] + +extern crate tokio; +#[macro_use] +extern crate futures; +extern crate bytes; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::future::{self, Either}; +use futures::sync::mpsc; +use tokio::io; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; + +/// Shorthand for the transmit half of the message channel. +type Tx = mpsc::UnboundedSender<Bytes>; + +/// Shorthand for the receive half of the message channel. +type Rx = mpsc::UnboundedReceiver<Bytes>; + +/// Data that is shared between all peers in the chat server. +/// +/// This is the set of `Tx` handles for all connected clients. Whenever a +/// message is received from a client, it is broadcasted to all peers by +/// iterating over the `peers` entries and sending a copy of the message on each +/// `Tx`. +struct Shared { + peers: HashMap<SocketAddr, Tx>, +} + +/// The state for each connected client. +struct Peer { + /// Name of the peer. + /// + /// When a client connects, the first line sent is treated as the client's + /// name (like alice or bob). The name is used to preface all messages that + /// arrive from the client so that we can simulate a real chat server: + /// + /// ```text + /// alice: Hello everyone. + /// bob: Welcome to telnet chat! + /// ``` + name: BytesMut, + + /// The TCP socket wrapped with the `Lines` codec, defined below. + /// + /// This handles sending and receiving data on the socket. When using + /// `Lines`, we can work at the line level instead of having to manage the + /// raw byte operations. + lines: Lines, + + /// Handle to the shared chat state. + /// + /// This is used to broadcast messages read off the socket to all connected + /// peers. + state: Arc<Mutex<Shared>>, + + /// Receive half of the message channel. + /// + /// This is used to receive messages from peers. When a message is received + /// off of this `Rx`, it will be written to the socket. + rx: Rx, + + /// Client socket address. + /// + /// The socket address is used as the key in the `peers` HashMap. The + /// address is saved so that the `Peer` drop implementation can clean up its + /// entry. + addr: SocketAddr, +} + +/// Line based codec +/// +/// This decorates a socket and presents a line based read / write interface. +/// +/// As a user of `Lines`, we can focus on working at the line level. So, we send +/// and receive values that represent entire lines. The `Lines` codec will +/// handle the encoding and decoding as well as reading from and writing to the +/// socket. +#[derive(Debug)] +struct Lines { + /// The TCP socket. + socket: TcpStream, + + /// Buffer used when reading from the socket. Data is not returned from this + /// buffer until an entire line has been read. + rd: BytesMut, + + /// Buffer used to stage data before writing it to the socket. + wr: BytesMut, +} + +impl Shared { + /// Create a new, empty, instance of `Shared`. + fn new() -> Self { + Shared { + peers: HashMap::new(), + } + } +} + +impl Peer { + /// Create a new instance of `Peer`. + fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer { + // Get the client socket address + let addr = lines.socket.peer_addr().unwrap(); + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded(); + + // Add an entry for this `Peer` in the shared state map. + state.lock().unwrap().peers.insert(addr, tx); + + Peer { + name, + lines, + state, + rx, + addr, + } + } +} + +/// This is where a connected client is managed. +/// +/// A `Peer` is also a future representing completely processing the client. +/// +/// When a `Peer` is created, the first line (representing the client's name) +/// has already been read. When the socket closes, the `Peer` future completes. +/// +/// While processing, the peer future implementation will: +/// +/// 1) Receive messages on its message channel and write them to the socket. +/// 2) Receive messages from the socket and broadcast them to all peers. +/// +impl Future for Peer { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + // Tokio (and futures) use cooperative scheduling without any + // preemption. If a task never yields execution back to the executor, + // then other tasks may be starved. + // + // To deal with this, robust applications should not have any unbounded + // loops. In this example, we will read at most `LINES_PER_TICK` lines + // from the client on each tick. + // + // If the limit is hit, the current task is notified, informing the + // executor to schedule the task again asap. + const LINES_PER_TICK: usize = 10; + + // Receive all messages from peers. + for i in 0..LINES_PER_TICK { + // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is + // safe. + match self.rx.poll().unwrap() { + Async::Ready(Some(v)) => { + // Buffer the line. Once all lines are buffered, they will + // be flushed to the socket (right below). + self.lines.buffer(&v); + + // If this is the last iteration, the loop will break even + // though there could still be lines to read. Because we did + // not reach `Async::NotReady`, we have to notify ourselves + // in order to tell the executor to schedule the task again. + if i + 1 == LINES_PER_TICK { + task::current().notify(); + } + } + _ => break, + } + } + + // Flush the write buffer to the socket + let _ = self.lines.poll_flush()?; + + // Read new lines from the socket + while let Async::Ready(line) = self.lines.poll()? { + println!("Received line ({:?}) : {:?}", self.name, line); + + if let Some(message) = line { + // Append the peer's name to the front of the line: + let mut line = self.name.clone(); + line.extend_from_slice(b": "); + line.extend_from_slice(&message); + line.extend_from_slice(b"\r\n"); + + // We're using `Bytes`, which allows zero-copy clones (by + // storing the data in an Arc internally). + // + // However, before cloning, we must freeze the data. This + // converts it from mutable -> immutable, allowing zero copy + // cloning. + let line = line.freeze(); + + // Now, send the line to all other peers + for (addr, tx) in &self.state.lock().unwrap().peers { + // Don't send the message to ourselves + if *addr != self.addr { + // The send only fails if the rx half has been dropped, + // however this is impossible as the `tx` half will be + // removed from the map before the `rx` is dropped. + tx.unbounded_send(line.clone()).unwrap(); + } + } + } else { + // EOF was reached. The remote client has disconnected. There is + // nothing more to do. + return Ok(Async::Ready(())); + } + } + + // As always, it is important to not just return `NotReady` without + // ensuring an inner future also returned `NotReady`. + // + // We know we got a `NotReady` from either `self.rx` or `self.lines`, so + // the contract is respected. + Ok(Async::NotReady) + } +} + +impl Drop for Peer { + fn drop(&mut self) { + self.state.lock().unwrap().peers.remove(&self.addr); + } +} + +impl Lines { + /// Create a new `Lines` codec backed by the socket + fn new(socket: TcpStream) -> Self { + Lines { + socket, + rd: BytesMut::new(), + wr: BytesMut::new(), + } + } + + /// Buffer a line. + /// + /// This writes the line to an internal buffer. Calls to `poll_flush` will + /// attempt to flush this buffer to the socket. + fn buffer(&mut self, line: &[u8]) { + // Ensure the buffer has capacity. Ideally this would not be unbounded, + // but to keep the example simple, we will not limit this. + self.wr.reserve(line.len()); + + // Push the line onto the end of the write buffer. + // + // The `put` function is from the `BufMut` trait. + self.wr.put(line); + } + + /// Flush the write buffer to the socket + fn poll_flush(&mut self) -> Poll<(), io::Error> { + // As long as there is buffered data to write, try to write it. + while !self.wr.is_empty() { + // Try to write some bytes to the socket + let n = try_ready!(self.socket.poll_write(&self.wr)); + + // As long as the wr is not empty, a successful write should + // never write 0 bytes. + assert!(n > 0); + + // This discards the first `n` bytes of the buffer. + let _ = self.wr.split_to(n); + } + + Ok(Async::Ready(())) + } + + /// Read data from the socket. + /// + /// This only returns `Ready` when the socket has closed. + fn fill_read_buf(&mut self) -> Poll<(), io::Error> { + loop { + // Ensure the read buffer has capacity. + // + // This might result in an internal allocation. + self.rd.reserve(1024); + + // Read data into the buffer. + let n = try_ready!(self.socket.read_buf(&mut self.rd)); + + if n == 0 { + return Ok(Async::Ready(())); + } + } + } +} + +impl Stream for Lines { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + // First, read any new data that might have been received off the socket + let sock_closed = self.fill_read_buf()?.is_ready(); + + // Now, try finding lines + let pos = self + .rd + .windows(2) + .enumerate() + .find(|&(_, bytes)| bytes == b"\r\n") + .map(|(i, _)| i); + + if let Some(pos) = pos { + // Remove the line from the read buffer and set it to `line`. + let mut line = self.rd.split_to(pos + 2); + + // Drop the trailing \r\n + line.split_off(pos); + + // Return the line + return Ok(Async::Ready(Some(line))); + } + + if sock_closed { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +/// Spawn a task to manage the socket. +/// +/// This will read the first line from the socket to identify the client, then +/// add the client to the set of connected peers in the chat service. +fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { + // Wrap the socket with the `Lines` codec that we wrote above. + // + // By doing this, we can operate at the line level instead of doing raw byte + // manipulation. + let lines = Lines::new(socket); + + // The first line is treated as the client's name. The client is not added + // to the set of connected peers until this line is received. + // + // We use the `into_future` combinator to extract the first item from the + // lines stream. `into_future` takes a `Stream` and converts it to a future + // of `(first, rest)` where `rest` is the original stream instance. + let connection = lines + .into_future() + // `into_future` doesn't have the right error type, so map the error to + // make it work. + .map_err(|(e, _)| e) + // Process the first received line as the client's name. + .and_then(|(name, lines)| { + // If `name` is `None`, then the client disconnected without + // actually sending a line of data. + // + // Since the connection is closed, there is no further work that we + // need to do. So, we just terminate processing by returning + // `future::ok()`. + // + // The problem is that only a single future type can be returned + // from a combinator closure, but we want to return both + // `future::ok()` and `Peer` (below). + // + // This is a common problem, so the `futures` crate solves this by + // providing the `Either` helper enum that allows creating a single + // return type that covers two concrete future types. + let name = match name { + Some(name) => name, + None => { + // The remote client closed the connection without sending + // any data. + return Either::A(future::ok(())); + } + }; + + println!("`{:?}` is joining the chat", name); + + // Create the peer. + // + // This is also a future that processes the connection, only + // completing when the socket closes. + let peer = Peer::new(name, state, lines); + + // Wrap `peer` with `Either::B` to make the return type fit. + Either::B(peer) + }) + // Task futures have an error of type `()`, this ensures we handle the + // error. We do this by printing the error to STDOUT. + .map_err(|e| { + println!("connection error = {:?}", e); + }); + + // Spawn the task. Internally, this submits the task to a thread pool. + tokio::spawn(connection); +} + +pub fn main() -> Result<(), Box<std::error::Error>> { + // Create the shared state. This is how all the peers communicate. + // + // The server task will hold a handle to this. For every new client, the + // `state` handle is cloned and passed into the task that processes the + // client connection. + let state = Arc::new(Mutex::new(Shared::new())); + + let addr = "127.0.0.1:6142".parse()?; + + // Bind a TCP listener to the socket address. + // + // Note that this is the Tokio TcpListener, which is fully async. + let listener = TcpListener::bind(&addr)?; + + // The server task asynchronously iterates over and processes each + // incoming connection. + let server = listener + .incoming() + .for_each(move |socket| { + // Spawn a task to process the connection + process(socket, state.clone()); + Ok(()) + }) + .map_err(|err| { + // All tasks must have an `Error` type of `()`. This forces error + // handling and helps avoid silencing failures. + // + // In our example, we are only going to log the error to STDOUT. + println!("accept error = {:?}", err); + }); + + println!("server running on localhost:6142"); + + // Start the Tokio runtime. + // + // The Tokio is a pre-configured "out of the box" runtime for building + // asynchronous applications. It includes both a reactor and a task + // scheduler. This means applications are multithreaded by default. + // + // This function blocks until the runtime reaches an idle state. Idle is + // defined as all spawned tasks have completed and all I/O resources (TCP + // sockets in our case) have been dropped. + // + // In our example, we have not defined a shutdown strategy, so this will + // block until `ctrl-c` is pressed at the terminal. + tokio::run(server); + Ok(()) +} |