1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
|
//! A chat server that broadcasts a message to all connections.
//!
//! This example is explicitly more verbose than it has to be. This is to
//! illustrate more concepts.
//!
//! A chat server for telnet clients. After a telnet client connects, the first
//! line should contain the client's name. After that, all lines sent by a
//! client are broadcasted to all other connected clients.
//!
//! Because the client is telnet, lines are delimited by "\r\n".
//!
//! You can test this out by running:
//!
//! cargo run --example chat
//!
//! And then in another terminal run:
//!
//! telnet localhost 6142
//!
//! You can run the `telnet` command in any number of additional windows.
//!
//! You can run the second command in multiple windows and then chat between the
//! two, seeing the messages from the other client as they're received. For all
//! connected clients they'll all join the same room and see everyone else's
//! messages.
#![deny(warnings)]
extern crate tokio;
#[macro_use]
extern crate futures;
extern crate bytes;
use bytes::{BufMut, Bytes, BytesMut};
use futures::future::{self, Either};
use futures::sync::mpsc;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<Bytes>;
/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<Bytes>;
/// Data that is shared between all peers in the chat server.
///
/// This is the set of `Tx` handles for all connected clients. Whenever a
/// message is received from a client, it is broadcasted to all peers by
/// iterating over the `peers` entries and sending a copy of the message on each
/// `Tx`.
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
/// The state for each connected client.
struct Peer {
/// Name of the peer.
///
/// When a client connects, the first line sent is treated as the client's
/// name (like alice or bob). The name is used to preface all messages that
/// arrive from the client so that we can simulate a real chat server:
///
/// ```text
/// alice: Hello everyone.
/// bob: Welcome to telnet chat!
/// ```
name: BytesMut,
/// The TCP socket wrapped with the `Lines` codec, defined below.
///
/// This handles sending and receiving data on the socket. When using
/// `Lines`, we can work at the line level instead of having to manage the
/// raw byte operations.
lines: Lines,
/// Handle to the shared chat state.
///
/// This is used to broadcast messages read off the socket to all connected
/// peers.
state: Arc<Mutex<Shared>>,
/// Receive half of the message channel.
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Rx,
/// Client socket address.
///
/// The socket address is used as the key in the `peers` HashMap. The
/// address is saved so that the `Peer` drop implementation can clean up its
/// entry.
addr: SocketAddr,
}
/// Line based codec
///
/// This decorates a socket and presents a line based read / write interface.
///
/// As a user of `Lines`, we can focus on working at the line level. So, we send
/// and receive values that represent entire lines. The `Lines` codec will
/// handle the encoding and decoding as well as reading from and writing to the
/// socket.
#[derive(Debug)]
struct Lines {
/// The TCP socket.
socket: TcpStream,
/// Buffer used when reading from the socket. Data is not returned from this
/// buffer until an entire line has been read.
rd: BytesMut,
/// Buffer used to stage data before writing it to the socket.
wr: BytesMut,
}
impl Shared {
/// Create a new, empty, instance of `Shared`.
fn new() -> Self {
Shared {
peers: HashMap::new(),
}
}
}
impl Peer {
/// Create a new instance of `Peer`.
fn new(name: BytesMut, state: Arc<Mutex<Shared>>, lines: Lines) -> Peer {
// Get the client socket address
let addr = lines.socket.peer_addr().unwrap();
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
state.lock().unwrap().peers.insert(addr, tx);
Peer {
name,
lines,
state,
rx,
addr,
}
}
}
/// This is where a connected client is managed.
///
/// A `Peer` is also a future representing completely processing the client.
///
/// When a `Peer` is created, the first line (representing the client's name)
/// has already been read. When the socket closes, the `Peer` future completes.
///
/// While processing, the peer future implementation will:
///
/// 1) Receive messages on its message channel and write them to the socket.
/// 2) Receive messages from the socket and broadcast them to all peers.
///
impl Future for Peer {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
// Tokio (and futures) use cooperative scheduling without any
// preemption. If a task never yields execution back to the executor,
// then other tasks may be starved.
//
// To deal with this, robust applications should not have any unbounded
// loops. In this example, we will read at most `LINES_PER_TICK` lines
// from the client on each tick.
//
// If the limit is hit, the current task is notified, informing the
// executor to schedule the task again asap.
const LINES_PER_TICK: usize = 10;
// Receive all messages from peers.
for i in 0..LINES_PER_TICK {
// Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
// safe.
match self.rx.poll().unwrap() {
Async::Ready(Some(v)) => {
// Buffer the line. Once all lines are buffered, they will
// be flushed to the socket (right below).
self.lines.buffer(&v);
// If this is the last iteration, the loop will break even
// though there could still be lines to read. Because we did
// not reach `Async::NotReady`, we have to notify ourselves
// in order to tell the executor to schedule the task again.
if i + 1 == LINES_PER_TICK {
task::current().notify();
}
}
_ => break,
}
}
// Flush the write buffer to the socket
let _ = self.lines.poll_flush()?;
// Read new lines from the socket
while let Async::Ready(line) = self.lines.poll()? {
println!("Received line ({:?}) : {:?}", self.name, line);
if let Some(message) = line {
// Append the peer's name to the front of the line:
let mut line = self.name.clone();
line.extend_from_slice(b": ");
line.extend_from_slice(&message);
line.extend_from_slice(b"\r\n");
// We're using `Bytes`, which allows zero-copy clones (by
// storing the data in an Arc internally).
//
// However, before cloning, we must freeze the data. This
// converts it from mutable -> immutable, allowing zero copy
// cloning.
let line = line.freeze();
// Now, send the line to all other peers
for (addr, tx) in &self.state.lock().unwrap().peers {
// Don't send the message to ourselves
if *addr != self.addr {
// The send only fails if the rx half has been dropped,
// however this is impossible as the `tx` half will be
// removed from the map before the `rx` is dropped.
tx.unbounded_send(line.clone()).unwrap();
}
}
} else {
// EOF was reached. The remote client has disconnected. There is
// nothing more to do.
return Ok(Async::Ready(()));
}
}
// As always, it is important to not just return `NotReady` without
// ensuring an inner future also returned `NotReady`.
//
// We know we got a `NotReady` from either `self.rx` or `self.lines`, so
// the contract is respected.
Ok(Async::NotReady)
}
}
impl Drop for Peer {
fn drop(&mut self) {
self.state.lock().unwrap().peers.remove(&self.addr);
}
}
impl Lines {
/// Create a new `Lines` codec backed by the socket
fn new(socket: TcpStream) -> Self {
Lines {
socket,
rd: BytesMut::new(),
wr: BytesMut::new(),
}
}
/// Buffer a line.
///
/// This writes the line to an internal buffer. Calls to `poll_flush` will
/// attempt to flush this buffer to the socket.
fn buffer(&mut self, line: &[u8]) {
// Ensure the buffer has capacity. Ideally this would not be unbounded,
// but to keep the example simple, we will not limit this.
self.wr.reserve(line.len());
// Push the line onto the end of the write buffer.
//
// The `put` function is from the `BufMut` trait.
self.wr.put(line);
}
/// Flush the write buffer to the socket
fn poll_flush(&mut self) -> Poll<(), io::Error> {
// As long as there is buffered data to write, try to write it.
while !self.wr.is_empty() {
// Try to write some bytes to the socket
let n = try_ready!(self.socket.poll_write(&self.wr));
// As long as the wr is not empty, a successful write should
// never write 0 bytes.
assert!(n > 0);
// This discards the first `n` bytes of the buffer.
let _ = self.wr.split_to(n);
}
Ok(Async::Ready(()))
}
/// Read data from the socket.
///
/// This only returns `Ready` when the socket has closed.
fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
loop {
// Ensure the read buffer has capacity.
//
// This might result in an internal allocation.
self.rd.reserve(1024);
// Read data into the buffer.
let n = try_ready!(self.socket.read_buf(&mut self.rd));
if n == 0 {
return Ok(Async::Ready(()));
}
}
}
}
impl Stream for Lines {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// First, read any new data that might have been received off the socket
let sock_closed = self.fill_read_buf()?.is_ready();
// Now, try finding lines
let pos = self
.rd
.windows(2)
.enumerate()
.find(|&(_, bytes)| bytes == b"\r\n")
.map(|(i, _)| i);
if let Some(pos) = pos {
// Remove the line from the read buffer and set it to `line`.
let mut line = self.rd.split_to(pos + 2);
// Drop the trailing \r\n
line.split_off(pos);
// Return the line
return Ok(Async::Ready(Some(line)));
}
if sock_closed {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}
}
/// Spawn a task to manage the socket.
///
/// This will read the first line from the socket to identify the client, then
/// add the client to the set of connected peers in the chat service.
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// Wrap the socket with the `Lines` codec that we wrote above.
//
// By doing this, we can operate at the line level instead of doing raw byte
// manipulation.
let lines = Lines::new(socket);
// The first line is treated as the client's name. The client is not added
// to the set of connected peers until this line is received.
//
// We use the `into_future` combinator to extract the first item from the
// lines stream. `into_future` takes a `Stream` and converts it to a future
// of `(first, rest)` where `rest` is the original stream instance.
let connection = lines
.into_future()
// `into_future` doesn't have the right error type, so map the error to
// make it work.
.map_err(|(e, _)| e)
// Process the first received line as the client's name.
.and_then(|(name, lines)| {
// If `name` is `None`, then the client disconnected without
// actually sending a line of data.
//
// Since the connection is closed, there is no further work that we
// need to do. So, we just terminate processing by returning
// `future::ok()`.
//
// The problem is that only a single future type can be returned
// from a combinator closure, but we want to return both
// `future::ok()` and `Peer` (below).
//
// This is a common problem, so the `futures` crate solves this by
// providing the `Either` helper enum that allows creating a single
// return type that covers two concrete future types.
let name = match name {
Some(name) => name,
None => {
// The remote client closed the connection without sending
// any data.
return Either::A(future::ok(()));
}
};
println!("`{:?}` is joining the chat", name);
// Create the peer.
//
// This is also a future that processes the connection, only
// completing when the socket closes.
let peer = Peer::new(name, state, lines);
// Wrap `peer` with `Either::B` to make the return type fit.
Either::B(peer)
})
// Task futures have an error of type `()`, this ensures we handle the
// error. We do this by printing the error to STDOUT.
.map_err(|e| {
println!("connection error = {:?}", e);
});
// Spawn the task. Internally, this submits the task to a thread pool.
tokio::spawn(connection);
}
pub fn main() -> Result<(), Box<std::error::Error>> {
// Create the shared state. This is how all the peers communicate.
//
// The server task will hold a handle to this. For every new client, the
// `state` handle is cloned and passed into the task that processes the
// client connection.
let state = Arc::new(Mutex::new(Shared::new()));
let addr = "127.0.0.1:6142".parse()?;
// Bind a TCP listener to the socket address.
//
// Note that this is the Tokio TcpListener, which is fully async.
let listener = TcpListener::bind(&addr)?;
// The server task asynchronously iterates over and processes each
// incoming connection.
let server = listener
.incoming()
.for_each(move |socket| {
// Spawn a task to process the connection
process(socket, state.clone());
Ok(())
})
.map_err(|err| {
// All tasks must have an `Error` type of `()`. This forces error
// handling and helps avoid silencing failures.
//
// In our example, we are only going to log the error to STDOUT.
println!("accept error = {:?}", err);
});
println!("server running on localhost:6142");
// Start the Tokio runtime.
//
// The Tokio is a pre-configured "out of the box" runtime for building
// asynchronous applications. It includes both a reactor and a task
// scheduler. This means applications are multithreaded by default.
//
// This function blocks until the runtime reaches an idle state. Idle is
// defined as all spawned tasks have completed and all I/O resources (TCP
// sockets in our case) have been dropped.
//
// In our example, we have not defined a shutdown strategy, so this will
// block until `ctrl-c` is pressed at the terminal.
tokio::run(server);
Ok(())
}
|