diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/rs/src/protocol/multiplexed.rs | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/rs/src/protocol/multiplexed.rs b/src/jaegertracing/thrift/lib/rs/src/protocol/multiplexed.rs new file mode 100644 index 000000000..aaee44f73 --- /dev/null +++ b/src/jaegertracing/thrift/lib/rs/src/protocol/multiplexed.rs @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{ + TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, + TOutputProtocol, TSetIdentifier, TStructIdentifier, +}; + +/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift +/// messages. +/// +/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services +/// send messages over a single I/O channel. By prefixing service identifiers +/// to outgoing messages receivers are able to demux them and route them to the +/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor` +/// to process incoming messages, while other languages must use their +/// corresponding multiplexed processor implementations. +/// +/// For example, given a service `TestService` and a service call `test_call`, +/// this implementation would identify messages as originating from +/// `TestService:test_call`. +/// +/// # Examples +/// +/// Create and use a `TMultiplexedOutputProtocol`. +/// +/// ```no_run +/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol}; +/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol}; +/// use thrift::transport::TTcpChannel; +/// +/// let mut channel = TTcpChannel::new(); +/// channel.open("localhost:9090").unwrap(); +/// +/// let protocol = TBinaryOutputProtocol::new(channel, true); +/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol); +/// +/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1); +/// protocol.write_message_begin(&ident).unwrap(); +/// ``` +#[derive(Debug)] +pub struct TMultiplexedOutputProtocol<P> +where + P: TOutputProtocol, +{ + service_name: String, + inner: P, +} + +impl<P> TMultiplexedOutputProtocol<P> +where + P: TOutputProtocol, +{ + /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages + /// as originating from a service named `service_name` and sends them over + /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent + /// by `wrapped`, not by this instance. + pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> { + TMultiplexedOutputProtocol { + service_name: service_name.to_owned(), + inner: wrapped, + } + } +} + +// FIXME: avoid passthrough methods +impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P> +where + P: TOutputProtocol, +{ + fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> { + match identifier.message_type { + // FIXME: is there a better way to override identifier here? + TMessageType::Call | TMessageType::OneWay => { + let identifier = TMessageIdentifier { + name: format!("{}:{}", self.service_name, identifier.name), + ..*identifier + }; + self.inner.write_message_begin(&identifier) + } + _ => self.inner.write_message_begin(identifier), + } + } + + fn write_message_end(&mut self) -> ::Result<()> { + self.inner.write_message_end() + } + + fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> { + self.inner.write_struct_begin(identifier) + } + + fn write_struct_end(&mut self) -> ::Result<()> { + self.inner.write_struct_end() + } + + fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> { + self.inner.write_field_begin(identifier) + } + + fn write_field_end(&mut self) -> ::Result<()> { + self.inner.write_field_end() + } + + fn write_field_stop(&mut self) -> ::Result<()> { + self.inner.write_field_stop() + } + + fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> { + self.inner.write_bytes(b) + } + + fn write_bool(&mut self, b: bool) -> ::Result<()> { + self.inner.write_bool(b) + } + + fn write_i8(&mut self, i: i8) -> ::Result<()> { + self.inner.write_i8(i) + } + + fn write_i16(&mut self, i: i16) -> ::Result<()> { + self.inner.write_i16(i) + } + + fn write_i32(&mut self, i: i32) -> ::Result<()> { + self.inner.write_i32(i) + } + + fn write_i64(&mut self, i: i64) -> ::Result<()> { + self.inner.write_i64(i) + } + + fn write_double(&mut self, d: f64) -> ::Result<()> { + self.inner.write_double(d) + } + + fn write_string(&mut self, s: &str) -> ::Result<()> { + self.inner.write_string(s) + } + + fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> { + self.inner.write_list_begin(identifier) + } + + fn write_list_end(&mut self) -> ::Result<()> { + self.inner.write_list_end() + } + + fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> { + self.inner.write_set_begin(identifier) + } + + fn write_set_end(&mut self) -> ::Result<()> { + self.inner.write_set_end() + } + + fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> { + self.inner.write_map_begin(identifier) + } + + fn write_map_end(&mut self) -> ::Result<()> { + self.inner.write_map_end() + } + + fn flush(&mut self) -> ::Result<()> { + self.inner.flush() + } + + // utility + // + + fn write_byte(&mut self, b: u8) -> ::Result<()> { + self.inner.write_byte(b) + } +} + +#[cfg(test)] +mod tests { + + use protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol}; + use transport::{TBufferChannel, TIoChannel, WriteHalf}; + + use super::*; + + #[test] + fn must_write_message_begin_with_prefixed_service_name() { + let mut o_prot = test_objects(); + + let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2); + assert_success!(o_prot.write_message_begin(&ident)); + + #[cfg_attr(rustfmt, rustfmt::skip)] + let expected: [u8; 19] = [ + 0x80, + 0x01, /* protocol identifier */ + 0x00, + 0x01, /* message type */ + 0x00, + 0x00, + 0x00, + 0x07, + 0x66, + 0x6F, + 0x6F, /* "foo" */ + 0x3A, /* ":" */ + 0x62, + 0x61, + 0x72, /* "bar" */ + 0x00, + 0x00, + 0x00, + 0x02 /* sequence number */, + ]; + + assert_eq!(o_prot.inner.transport.write_bytes(), expected); + } + + fn test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>> + { + let c = TBufferChannel::with_capacity(40, 40); + let (_, w_chan) = c.split().unwrap(); + let prot = TBinaryOutputProtocol::new(w_chan, true); + TMultiplexedOutputProtocol::new("foo", prot) + } +} |