1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
//! A proxy that forwards data to another server and forwards that server's
//! responses back to clients.
//!
//! Because the Tokio runtime uses a thread pool, each TCP connection is
//! processed concurrently with all other TCP connections across multiple
//! threads.
//!
//! You can showcase this by running this in one terminal:
//!
//! cargo run --example proxy
//!
//! This in another terminal
//!
//! cargo run --example echo
//!
//! And finally this in another terminal
//!
//! cargo run --example connect 127.0.0.1:8081
//!
//! This final terminal will connect to our proxy, which will in turn connect to
//! the echo server, and you'll be able to see data flowing between them.
#![deny(warnings)]
extern crate tokio;
use std::env;
use std::io::{self, Read, Write};
use std::net::{Shutdown, SocketAddr};
use std::sync::{Arc, Mutex};
use tokio::io::{copy, shutdown};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
fn main() -> Result<(), Box<std::error::Error>> {
let listen_addr = env::args().nth(1).unwrap_or("127.0.0.1:8081".to_string());
let listen_addr = listen_addr.parse::<SocketAddr>()?;
let server_addr = env::args().nth(2).unwrap_or("127.0.0.1:8080".to_string());
let server_addr = server_addr.parse::<SocketAddr>()?;
// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr)?;
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let done = socket
.incoming()
.map_err(|e| println!("error accepting socket; error = {:?}", e))
.for_each(move |client| {
let server = TcpStream::connect(&server_addr);
let amounts = server.and_then(move |server| {
// Create separate read/write handles for the TCP clients that we're
// proxying data between. Note that typically you'd use
// `AsyncRead::split` for this operation, but we want our writer
// handles to have a custom implementation of `shutdown` which
// actually calls `TcpStream::shutdown` to ensure that EOF is
// transmitted properly across the proxied connection.
//
// As a result, we wrap up our client/server manually in arcs and
// use the impls below on our custom `MyTcpStream` type.
let client_reader = MyTcpStream(Arc::new(Mutex::new(client)));
let client_writer = client_reader.clone();
let server_reader = MyTcpStream(Arc::new(Mutex::new(server)));
let server_writer = server_reader.clone();
// Copy the data (in parallel) between the client and the server.
// After the copy is done we indicate to the remote side that we've
// finished by shutting down the connection.
let client_to_server = copy(client_reader, server_writer)
.and_then(|(n, _, server_writer)| shutdown(server_writer).map(move |_| n));
let server_to_client = copy(server_reader, client_writer)
.and_then(|(n, _, client_writer)| shutdown(client_writer).map(move |_| n));
client_to_server.join(server_to_client)
});
let msg = amounts
.map(move |(from_client, from_server)| {
println!(
"client wrote {} bytes and received {} bytes",
from_client, from_server
);
})
.map_err(|e| {
// Don't panic. Maybe the client just disconnected too soon.
println!("error: {}", e);
});
tokio::spawn(msg);
Ok(())
});
tokio::run(done);
Ok(())
}
// This is a custom type used to have a custom implementation of the
// `AsyncWrite::shutdown` method which actually calls `TcpStream::shutdown` to
// notify the remote end that we're done writing.
#[derive(Clone)]
struct MyTcpStream(Arc<Mutex<TcpStream>>);
impl Read for MyTcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.lock().unwrap().read(buf)
}
}
impl Write for MyTcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.lock().unwrap().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl AsyncRead for MyTcpStream {}
impl AsyncWrite for MyTcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.0.lock().unwrap().shutdown(Shutdown::Write)?;
Ok(().into())
}
}
|