diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/rs/src/transport/socket.rs')
-rw-r--r-- | src/jaegertracing/thrift/lib/rs/src/transport/socket.rs | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/rs/src/transport/socket.rs b/src/jaegertracing/thrift/lib/rs/src/transport/socket.rs new file mode 100644 index 000000000..0bef67bed --- /dev/null +++ b/src/jaegertracing/thrift/lib/rs/src/transport/socket.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::From; +use std::io; +use std::io::{ErrorKind, Read, Write}; +use std::net::{Shutdown, TcpStream}; + +use super::{ReadHalf, TIoChannel, WriteHalf}; +use {new_transport_error, TransportErrorKind}; + +/// Bidirectional TCP/IP channel. +/// +/// # Examples +/// +/// Create a `TTcpChannel`. +/// +/// ```no_run +/// use std::io::{Read, Write}; +/// use thrift::transport::TTcpChannel; +/// +/// let mut c = TTcpChannel::new(); +/// c.open("localhost:9090").unwrap(); +/// +/// let mut buf = vec![0u8; 4]; +/// c.read(&mut buf).unwrap(); +/// c.write(&vec![0, 1, 2]).unwrap(); +/// ``` +/// +/// Create a `TTcpChannel` by wrapping an existing `TcpStream`. +/// +/// ```no_run +/// use std::io::{Read, Write}; +/// use std::net::TcpStream; +/// use thrift::transport::TTcpChannel; +/// +/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap(); +/// +/// // no need to call c.open() since we've already connected above +/// let mut c = TTcpChannel::with_stream(stream); +/// +/// let mut buf = vec![0u8; 4]; +/// c.read(&mut buf).unwrap(); +/// c.write(&vec![0, 1, 2]).unwrap(); +/// ``` +#[derive(Debug, Default)] +pub struct TTcpChannel { + stream: Option<TcpStream>, +} + +impl TTcpChannel { + /// Create an uninitialized `TTcpChannel`. + /// + /// The returned instance must be opened using `TTcpChannel::open(...)` + /// before it can be used. + pub fn new() -> TTcpChannel { + TTcpChannel { stream: None } + } + + /// Create a `TTcpChannel` that wraps an existing `TcpStream`. + /// + /// The passed-in stream is assumed to have been opened before being wrapped + /// by the created `TTcpChannel` instance. + pub fn with_stream(stream: TcpStream) -> TTcpChannel { + TTcpChannel { + stream: Some(stream), + } + } + + /// Connect to `remote_address`, which should have the form `host:port`. + pub fn open(&mut self, remote_address: &str) -> ::Result<()> { + if self.stream.is_some() { + Err(new_transport_error( + TransportErrorKind::AlreadyOpen, + "tcp connection previously opened", + )) + } else { + match TcpStream::connect(&remote_address) { + Ok(s) => { + self.stream = Some(s); + Ok(()) + } + Err(e) => Err(From::from(e)), + } + } + } + + /// Shut down this channel. + /// + /// Both send and receive halves are closed, and this instance can no + /// longer be used to communicate with another endpoint. + pub fn close(&mut self) -> ::Result<()> { + self.if_set(|s| s.shutdown(Shutdown::Both)) + .map_err(From::from) + } + + fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> + where + F: FnMut(&mut TcpStream) -> io::Result<T>, + { + if let Some(ref mut s) = self.stream { + stream_operation(s) + } else { + Err(io::Error::new( + ErrorKind::NotConnected, + "tcp endpoint not connected", + )) + } + } +} + +impl TIoChannel for TTcpChannel { + fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)> + where + Self: Sized, + { + let mut s = self; + + s.stream + .as_mut() + .and_then(|s| s.try_clone().ok()) + .map(|cloned| { + let read_half = ReadHalf::new(TTcpChannel { + stream: s.stream.take(), + }); + let write_half = WriteHalf::new(TTcpChannel { + stream: Some(cloned), + }); + (read_half, write_half) + }) + .ok_or_else(|| { + new_transport_error( + TransportErrorKind::Unknown, + "cannot clone underlying tcp stream", + ) + }) + } +} + +impl Read for TTcpChannel { + fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { + self.if_set(|s| s.read(b)) + } +} + +impl Write for TTcpChannel { + fn write(&mut self, b: &[u8]) -> io::Result<usize> { + self.if_set(|s| s.write(b)) + } + + fn flush(&mut self) -> io::Result<()> { + self.if_set(|s| s.flush()) + } +} |