summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.11/examples/connect.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.11/examples/connect.rs')
-rw-r--r--third_party/rust/tokio-0.1.11/examples/connect.rs245
1 files changed, 245 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/examples/connect.rs b/third_party/rust/tokio-0.1.11/examples/connect.rs
new file mode 100644
index 0000000000..fa3824c4a1
--- /dev/null
+++ b/third_party/rust/tokio-0.1.11/examples/connect.rs
@@ -0,0 +1,245 @@
+//! An example of hooking up stdin/stdout to either a TCP or UDP stream.
+//!
+//! This example will connect to a socket address specified in the argument list
+//! and then forward all data read on stdin to the server, printing out all data
+//! received on stdout. An optional `--udp` argument can be passed to specify
+//! that the connection should be made over UDP instead of TCP, translating each
+//! line entered on stdin to a UDP packet to be sent to the remote address.
+//!
+//! Note that this is not currently optimized for performance, especially
+//! around buffer management. Rather it's intended to show an example of
+//! working with a client.
+//!
+//! This example can be quite useful when interacting with the other examples in
+//! this repository! Many of them recommend running this as a simple "hook up
+//! stdin/stdout to a server" to get up and running.
+
+#![deny(warnings)]
+
+extern crate tokio;
+extern crate tokio_io;
+extern crate futures;
+extern crate bytes;
+
+use std::env;
+use std::io::{self, Read, Write};
+use std::net::SocketAddr;
+use std::thread;
+
+use tokio::prelude::*;
+use futures::sync::mpsc;
+
+fn main() {
+ // Determine if we're going to run in TCP or UDP mode
+ let mut args = env::args().skip(1).collect::<Vec<_>>();
+ let tcp = match args.iter().position(|a| a == "--udp") {
+ Some(i) => {
+ args.remove(i);
+ false
+ }
+ None => true,
+ };
+
+ // Parse what address we're going to connect to
+ let addr = args.first().unwrap_or_else(|| {
+ panic!("this program requires at least one argument")
+ });
+ let addr = addr.parse::<SocketAddr>().unwrap();
+
+ // Right now Tokio doesn't support a handle to stdin running on the event
+ // loop, so we farm out that work to a separate thread. This thread will
+ // read data (with blocking I/O) from stdin and then send it to the event
+ // loop over a standard futures channel.
+ let (stdin_tx, stdin_rx) = mpsc::channel(0);
+ thread::spawn(|| read_stdin(stdin_tx));
+ let stdin_rx = stdin_rx.map_err(|_| panic!()); // errors not possible on rx
+
+ // Now that we've got our stdin read we either set up our TCP connection or
+ // our UDP connection to get a stream of bytes we're going to emit to
+ // stdout.
+ let stdout = if tcp {
+ tcp::connect(&addr, Box::new(stdin_rx))
+ } else {
+ udp::connect(&addr, Box::new(stdin_rx))
+ };
+
+ // And now with our stream of bytes to write to stdout, we execute that in
+ // the event loop! Note that this is doing blocking I/O to emit data to
+ // stdout, and in general it's a no-no to do that sort of work on the event
+ // loop. In this case, though, we know it's ok as the event loop isn't
+ // otherwise running anything useful.
+ let mut out = io::stdout();
+
+ tokio::run({
+ stdout
+ .for_each(move |chunk| {
+ out.write_all(&chunk)
+ })
+ .map_err(|e| println!("error reading stdout; error = {:?}", e))
+ });
+}
+
+mod codec {
+ use std::io;
+ use bytes::{BufMut, BytesMut};
+ use tokio::codec::{Encoder, Decoder};
+
+ /// A simple `Codec` implementation that just ships bytes around.
+ ///
+ /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
+ /// just a convenient method for us to work with streams/sinks for now.
+ /// This'll just take any data read and interpret it as a "frame" and
+ /// conversely just shove data into the output location without looking at
+ /// it.
+ pub struct Bytes;
+
+ impl Decoder for Bytes {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+
+ impl Encoder for Bytes {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
+ buf.put(&data[..]);
+ Ok(())
+ }
+ }
+}
+
+mod tcp {
+ use tokio;
+ use tokio::net::TcpStream;
+ use tokio::prelude::*;
+ use tokio::codec::Decoder;
+
+ use bytes::BytesMut;
+ use codec::Bytes;
+
+ use std::io;
+ use std::net::SocketAddr;
+
+ pub fn connect(addr: &SocketAddr,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
+ {
+ let tcp = TcpStream::connect(addr);
+
+ // After the TCP connection has been established, we set up our client
+ // to start forwarding data.
+ //
+ // First we use the `Io::framed` method with a simple implementation of
+ // a `Codec` (listed below) that just ships bytes around. We then split
+ // that in two to work with the stream and sink separately.
+ //
+ // Half of the work we're going to do is to take all data we receive on
+ // `stdin` and send that along the TCP stream (`sink`). The second half
+ // is to take all the data we receive (`stream`) and then write that to
+ // stdout. We'll be passing this handle back out from this method.
+ //
+ // You'll also note that we *spawn* the work to read stdin and write it
+ // to the TCP stream. This is done to ensure that happens concurrently
+ // with us reading data from the stream.
+ Box::new(tcp.map(move |stream| {
+ let (sink, stream) = Bytes.framed(stream).split();
+
+ tokio::spawn(stdin.forward(sink).then(|result| {
+ if let Err(e) = result {
+ panic!("failed to write to socket: {}", e)
+ }
+ Ok(())
+ }));
+
+ stream
+ }).flatten_stream())
+ }
+}
+
+mod udp {
+ use std::io;
+ use std::net::SocketAddr;
+
+ use tokio;
+ use tokio::net::{UdpSocket, UdpFramed};
+ use tokio::prelude::*;
+ use bytes::BytesMut;
+
+ use codec::Bytes;
+
+ pub fn connect(&addr: &SocketAddr,
+ stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
+ {
+ // We'll bind our UDP socket to a local IP/port, but for now we
+ // basically let the OS pick both of those.
+ let addr_to_bind = if addr.ip().is_ipv4() {
+ "0.0.0.0:0".parse().unwrap()
+ } else {
+ "[::]:0".parse().unwrap()
+ };
+ let udp = UdpSocket::bind(&addr_to_bind)
+ .expect("failed to bind socket");
+
+ // Like above with TCP we use an instance of `Bytes` codec to transform
+ // this UDP socket into a framed sink/stream which operates over
+ // discrete values. In this case we're working with *pairs* of socket
+ // addresses and byte buffers.
+ let (sink, stream) = UdpFramed::new(udp, Bytes).split();
+
+ // All bytes from `stdin` will go to the `addr` specified in our
+ // argument list. Like with TCP this is spawned concurrently
+ let forward_stdin = stdin.map(move |chunk| {
+ (chunk, addr)
+ }).forward(sink).then(|result| {
+ if let Err(e) = result {
+ panic!("failed to write to socket: {}", e)
+ }
+ Ok(())
+ });
+
+ // With UDP we could receive data from any source, so filter out
+ // anything coming from a different address
+ let receive = stream.filter_map(move |(chunk, src)| {
+ if src == addr {
+ Some(chunk.into())
+ } else {
+ None
+ }
+ });
+
+ Box::new(future::lazy(|| {
+ tokio::spawn(forward_stdin);
+ future::ok(receive)
+ }).flatten_stream())
+ }
+}
+
+// Our helper method which will read data from stdin and send it along the
+// sender provided.
+fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
+ let mut stdin = io::stdin();
+ loop {
+ let mut buf = vec![0; 1024];
+ let n = match stdin.read(&mut buf) {
+ Err(_) |
+ Ok(0) => break,
+ Ok(n) => n,
+ };
+ buf.truncate(n);
+ tx = match tx.send(buf).wait() {
+ Ok(tx) => tx,
+ Err(_) => break,
+ };
+ }
+}