diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/csharp/src/Transport/TBufferedTransport.cs | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/csharp/src/Transport/TBufferedTransport.cs b/src/jaegertracing/thrift/lib/csharp/src/Transport/TBufferedTransport.cs new file mode 100644 index 000000000..887098810 --- /dev/null +++ b/src/jaegertracing/thrift/lib/csharp/src/Transport/TBufferedTransport.cs @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.IO; + +namespace Thrift.Transport +{ + public class TBufferedTransport : TTransport, IDisposable + { + private readonly int bufSize; + private readonly MemoryStream inputBuffer = new MemoryStream(0); + private readonly MemoryStream outputBuffer = new MemoryStream(0); + private readonly TTransport transport; + + public TBufferedTransport(TTransport transport, int bufSize = 1024) + { + if (transport == null) + throw new ArgumentNullException("transport"); + if (bufSize <= 0) + throw new ArgumentException("bufSize", "Buffer size must be a positive number."); + this.transport = transport; + this.bufSize = bufSize; + } + + public TTransport UnderlyingTransport + { + get + { + CheckNotDisposed(); + return transport; + } + } + + public override bool IsOpen + { + get + { + // We can legitimately throw here but be nice a bit. + // CheckNotDisposed(); + return !_IsDisposed && transport.IsOpen; + } + } + + public override void Open() + { + CheckNotDisposed(); + transport.Open(); + } + + public override void Close() + { + CheckNotDisposed(); + transport.Close(); + } + + public override int Read(byte[] buf, int off, int len) + { + CheckNotDisposed(); + ValidateBufferArgs(buf, off, len); + if (!IsOpen) + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + + if (inputBuffer.Capacity < bufSize) + inputBuffer.Capacity = bufSize; + + while (true) + { + int got = inputBuffer.Read(buf, off, len); + if (got > 0) + return got; + + inputBuffer.Seek(0, SeekOrigin.Begin); + inputBuffer.SetLength(inputBuffer.Capacity); + int filled = transport.Read(inputBuffer.GetBuffer(), 0, (int)inputBuffer.Length); + inputBuffer.SetLength(filled); + if (filled == 0) + return 0; + } + } + + public override void Write(byte[] buf, int off, int len) + { + CheckNotDisposed(); + ValidateBufferArgs(buf, off, len); + if (!IsOpen) + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + // Relative offset from "off" argument + int offset = 0; + if (outputBuffer.Length > 0) + { + int capa = (int)(outputBuffer.Capacity - outputBuffer.Length); + int writeSize = capa <= len ? capa : len; + outputBuffer.Write(buf, off, writeSize); + offset += writeSize; + if (writeSize == capa) + { + transport.Write(outputBuffer.GetBuffer(), 0, (int)outputBuffer.Length); + outputBuffer.SetLength(0); + } + } + while (len - offset >= bufSize) + { + transport.Write(buf, off + offset, bufSize); + offset += bufSize; + } + int remain = len - offset; + if (remain > 0) + { + if (outputBuffer.Capacity < bufSize) + outputBuffer.Capacity = bufSize; + outputBuffer.Write(buf, off + offset, remain); + } + } + + private void InternalFlush() + { + if (!IsOpen) + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + if (outputBuffer.Length > 0) + { + transport.Write(outputBuffer.GetBuffer(), 0, (int)outputBuffer.Length); + outputBuffer.SetLength(0); + } + } + + public override void Flush() + { + CheckNotDisposed(); + InternalFlush(); + + transport.Flush(); + } + + public override IAsyncResult BeginFlush(AsyncCallback callback, object state) + { + CheckNotDisposed(); + InternalFlush(); + + return transport.BeginFlush( callback, state); + } + + public override void EndFlush(IAsyncResult asyncResult) + { + transport.EndFlush( asyncResult); + } + + + + protected void CheckNotDisposed() + { + if (_IsDisposed) + throw new ObjectDisposedException("TBufferedTransport"); + } + + #region " IDisposable Support " + protected bool _IsDisposed { get; private set; } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_IsDisposed) + { + if (disposing) + { + if (inputBuffer != null) + inputBuffer.Dispose(); + if (outputBuffer != null) + outputBuffer.Dispose(); + if (transport != null) + transport.Dispose(); + } + } + _IsDisposed = true; + } + #endregion + } +} |