summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/rs/src/transport/mem.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/rs/src/transport/mem.rs')
-rw-r--r--src/jaegertracing/thrift/lib/rs/src/transport/mem.rs385
1 files changed, 385 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/rs/src/transport/mem.rs b/src/jaegertracing/thrift/lib/rs/src/transport/mem.rs
new file mode 100644
index 000000000..82c4b579f
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/rs/src/transport/mem.rs
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp;
+use std::io;
+use std::sync::{Arc, Mutex};
+
+use super::{ReadHalf, TIoChannel, WriteHalf};
+
+/// In-memory read and write channel with fixed-size read and write buffers.
+///
+/// On a `write` bytes are written to the internal write buffer. Writes are no
+/// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
+/// before subsequent writes are accepted.
+///
+/// You can set readable bytes in the internal read buffer by filling it with
+/// `set_readable_bytes(...)`. Callers can then read until the buffer is
+/// depleted. No further reads are accepted until the internal read buffer is
+/// replenished again.
+#[derive(Debug)]
+pub struct TBufferChannel {
+ read: Arc<Mutex<ReadData>>,
+ write: Arc<Mutex<WriteData>>,
+}
+
+#[derive(Debug)]
+struct ReadData {
+ buf: Box<[u8]>,
+ pos: usize,
+ idx: usize,
+ cap: usize,
+}
+
+#[derive(Debug)]
+struct WriteData {
+ buf: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+}
+
+impl TBufferChannel {
+ /// Constructs a new, empty `TBufferChannel` with the given
+ /// read buffer capacity and write buffer capacity.
+ pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
+ TBufferChannel {
+ read: Arc::new(Mutex::new(ReadData {
+ buf: vec![0; read_capacity].into_boxed_slice(),
+ idx: 0,
+ pos: 0,
+ cap: read_capacity,
+ })),
+ write: Arc::new(Mutex::new(WriteData {
+ buf: vec![0; write_capacity].into_boxed_slice(),
+ pos: 0,
+ cap: write_capacity,
+ })),
+ }
+ }
+
+ /// Return a copy of the bytes held by the internal read buffer.
+ /// Returns an empty vector if no readable bytes are present.
+ pub fn read_bytes(&self) -> Vec<u8> {
+ let rdata = self.read.as_ref().lock().unwrap();
+ let mut buf = vec![0u8; rdata.idx];
+ buf.copy_from_slice(&rdata.buf[..rdata.idx]);
+ buf
+ }
+
+ // FIXME: do I really need this API call?
+ // FIXME: should this simply reset to the last set of readable bytes?
+ /// Reset the number of readable bytes to zero.
+ ///
+ /// Subsequent calls to `read` will return nothing.
+ pub fn empty_read_buffer(&mut self) {
+ let mut rdata = self.read.as_ref().lock().unwrap();
+ rdata.pos = 0;
+ rdata.idx = 0;
+ }
+
+ /// Copy bytes from the source buffer `buf` into the internal read buffer,
+ /// overwriting any existing bytes. Returns the number of bytes copied,
+ /// which is `min(buf.len(), internal_read_buf.len())`.
+ pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
+ self.empty_read_buffer();
+ let mut rdata = self.read.as_ref().lock().unwrap();
+ let max_bytes = cmp::min(rdata.cap, buf.len());
+ rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
+ rdata.idx = max_bytes;
+ max_bytes
+ }
+
+ /// Return a copy of the bytes held by the internal write buffer.
+ /// Returns an empty vector if no bytes were written.
+ pub fn write_bytes(&self) -> Vec<u8> {
+ let wdata = self.write.as_ref().lock().unwrap();
+ let mut buf = vec![0u8; wdata.pos];
+ buf.copy_from_slice(&wdata.buf[..wdata.pos]);
+ buf
+ }
+
+ /// Resets the internal write buffer, making it seem like no bytes were
+ /// written. Calling `write_buffer` after this returns an empty vector.
+ pub fn empty_write_buffer(&mut self) {
+ let mut wdata = self.write.as_ref().lock().unwrap();
+ wdata.pos = 0;
+ }
+
+ /// Overwrites the contents of the read buffer with the contents of the
+ /// write buffer. The write buffer is emptied after this operation.
+ pub fn copy_write_buffer_to_read_buffer(&mut self) {
+ // FIXME: redo this entire method
+ let buf = {
+ let wdata = self.write.as_ref().lock().unwrap();
+ let b = &wdata.buf[..wdata.pos];
+ let mut b_ret = vec![0; b.len()];
+ b_ret.copy_from_slice(b);
+ b_ret
+ };
+
+ let bytes_copied = self.set_readable_bytes(&buf);
+ assert_eq!(bytes_copied, buf.len());
+
+ self.empty_write_buffer();
+ }
+}
+
+impl TIoChannel for TBufferChannel {
+ fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+ where
+ Self: Sized,
+ {
+ Ok((
+ ReadHalf {
+ handle: TBufferChannel {
+ read: self.read.clone(),
+ write: self.write.clone(),
+ },
+ },
+ WriteHalf {
+ handle: TBufferChannel {
+ read: self.read.clone(),
+ write: self.write.clone(),
+ },
+ },
+ ))
+ }
+}
+
+impl io::Read for TBufferChannel {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let mut rdata = self.read.as_ref().lock().unwrap();
+ let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
+ buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
+ rdata.pos += nread;
+ Ok(nread)
+ }
+}
+
+impl io::Write for TBufferChannel {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ let mut wdata = self.write.as_ref().lock().unwrap();
+ let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
+ let (start, end) = (wdata.pos, wdata.pos + nwrite);
+ wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
+ wdata.pos += nwrite;
+ Ok(nwrite)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(()) // nothing to do on flush
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::{Read, Write};
+
+ use super::TBufferChannel;
+
+ #[test]
+ fn must_empty_write_buffer() {
+ let mut t = TBufferChannel::with_capacity(0, 1);
+
+ let bytes_to_write: [u8; 1] = [0x01];
+ let result = t.write(&bytes_to_write);
+ assert_eq!(result.unwrap(), 1);
+ assert_eq!(&t.write_bytes(), &bytes_to_write);
+
+ t.empty_write_buffer();
+ assert_eq!(t.write_bytes().len(), 0);
+ }
+
+ #[test]
+ fn must_accept_writes_after_buffer_emptied() {
+ let mut t = TBufferChannel::with_capacity(0, 2);
+
+ let bytes_to_write: [u8; 2] = [0x01, 0x02];
+
+ // first write (all bytes written)
+ let result = t.write(&bytes_to_write);
+ assert_eq!(result.unwrap(), 2);
+ assert_eq!(&t.write_bytes(), &bytes_to_write);
+
+ // try write again (nothing should be written)
+ let result = t.write(&bytes_to_write);
+ assert_eq!(result.unwrap(), 0);
+ assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
+
+ // now reset the buffer
+ t.empty_write_buffer();
+ assert_eq!(t.write_bytes().len(), 0);
+
+ // now try write again - the write should succeed
+ let result = t.write(&bytes_to_write);
+ assert_eq!(result.unwrap(), 2);
+ assert_eq!(&t.write_bytes(), &bytes_to_write);
+ }
+
+ #[test]
+ fn must_accept_multiple_writes_until_buffer_is_full() {
+ let mut t = TBufferChannel::with_capacity(0, 10);
+
+ // first write (all bytes written)
+ let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
+ let write_0_result = t.write(&bytes_to_write_0);
+ assert_eq!(write_0_result.unwrap(), 2);
+ assert_eq!(t.write_bytes(), &bytes_to_write_0);
+
+ // second write (all bytes written, starting at index 2)
+ let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
+ let write_1_result = t.write(&bytes_to_write_1);
+ assert_eq!(write_1_result.unwrap(), 7);
+ assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
+
+ // third write (only 1 byte written - that's all we have space for)
+ let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
+ let write_2_result = t.write(&bytes_to_write_2);
+ assert_eq!(write_2_result.unwrap(), 1);
+ assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
+
+ // fourth write (no writes are accepted)
+ let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
+ let write_3_result = t.write(&bytes_to_write_3);
+ assert_eq!(write_3_result.unwrap(), 0);
+
+ // check the full write buffer
+ let mut expected: Vec<u8> = Vec::with_capacity(10);
+ expected.extend_from_slice(&bytes_to_write_0);
+ expected.extend_from_slice(&bytes_to_write_1);
+ expected.extend_from_slice(&bytes_to_write_2[0..1]);
+ assert_eq!(t.write_bytes(), &expected[..]);
+ }
+
+ #[test]
+ fn must_empty_read_buffer() {
+ let mut t = TBufferChannel::with_capacity(1, 0);
+
+ let bytes_to_read: [u8; 1] = [0x01];
+ let result = t.set_readable_bytes(&bytes_to_read);
+ assert_eq!(result, 1);
+ assert_eq!(t.read_bytes(), &bytes_to_read);
+
+ t.empty_read_buffer();
+ assert_eq!(t.read_bytes().len(), 0);
+ }
+
+ #[test]
+ fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
+ let mut t = TBufferChannel::with_capacity(1, 0);
+
+ let bytes_to_read_0: [u8; 1] = [0x01];
+ let result = t.set_readable_bytes(&bytes_to_read_0);
+ assert_eq!(result, 1);
+ assert_eq!(t.read_bytes(), &bytes_to_read_0);
+
+ t.empty_read_buffer();
+ assert_eq!(t.read_bytes().len(), 0);
+
+ let bytes_to_read_1: [u8; 1] = [0x02];
+ let result = t.set_readable_bytes(&bytes_to_read_1);
+ assert_eq!(result, 1);
+ assert_eq!(t.read_bytes(), &bytes_to_read_1);
+ }
+
+ #[test]
+ fn must_accept_multiple_reads_until_all_bytes_read() {
+ let mut t = TBufferChannel::with_capacity(10, 0);
+
+ let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
+
+ // check that we're able to set the bytes to be read
+ let result = t.set_readable_bytes(&readable_bytes);
+ assert_eq!(result, 10);
+ assert_eq!(t.read_bytes(), &readable_bytes);
+
+ // first read
+ let mut read_buf_0 = vec![0; 5];
+ let read_result = t.read(&mut read_buf_0);
+ assert_eq!(read_result.unwrap(), 5);
+ assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
+
+ // second read
+ let mut read_buf_1 = vec![0; 4];
+ let read_result = t.read(&mut read_buf_1);
+ assert_eq!(read_result.unwrap(), 4);
+ assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
+
+ // third read (only 1 byte remains to be read)
+ let mut read_buf_2 = vec![0; 3];
+ let read_result = t.read(&mut read_buf_2);
+ assert_eq!(read_result.unwrap(), 1);
+ read_buf_2.truncate(1); // FIXME: does the caller have to do this?
+ assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
+
+ // fourth read (nothing should be readable)
+ let mut read_buf_3 = vec![0; 10];
+ let read_result = t.read(&mut read_buf_3);
+ assert_eq!(read_result.unwrap(), 0);
+ read_buf_3.truncate(0);
+
+ // check that all the bytes we received match the original (again!)
+ let mut bytes_read = Vec::with_capacity(10);
+ bytes_read.extend_from_slice(&read_buf_0);
+ bytes_read.extend_from_slice(&read_buf_1);
+ bytes_read.extend_from_slice(&read_buf_2);
+ bytes_read.extend_from_slice(&read_buf_3);
+ assert_eq!(&bytes_read, &readable_bytes);
+ }
+
+ #[test]
+ fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
+ let mut t = TBufferChannel::with_capacity(3, 0);
+
+ let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
+
+ // check that we're able to set the bytes to be read
+ let result = t.set_readable_bytes(&readable_bytes_0);
+ assert_eq!(result, 3);
+ assert_eq!(t.read_bytes(), &readable_bytes_0);
+
+ let mut read_buf = vec![0; 4];
+
+ // drain the read buffer
+ let read_result = t.read(&mut read_buf);
+ assert_eq!(read_result.unwrap(), 3);
+ assert_eq!(t.read_bytes(), &read_buf[0..3]);
+
+ // check that a subsequent read fails
+ let read_result = t.read(&mut read_buf);
+ assert_eq!(read_result.unwrap(), 0);
+
+ // we don't modify the read buffer on failure
+ let mut expected_bytes = Vec::with_capacity(4);
+ expected_bytes.extend_from_slice(&readable_bytes_0);
+ expected_bytes.push(0x00);
+ assert_eq!(&read_buf, &expected_bytes);
+
+ // replenish the read buffer again
+ let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
+
+ // check that we're able to set the bytes to be read
+ let result = t.set_readable_bytes(&readable_bytes_1);
+ assert_eq!(result, 2);
+ assert_eq!(t.read_bytes(), &readable_bytes_1);
+
+ // read again
+ let read_result = t.read(&mut read_buf);
+ assert_eq!(read_result.unwrap(), 2);
+ assert_eq!(t.read_bytes(), &read_buf[0..2]);
+ }
+}