diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs')
-rw-r--r-- | src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs new file mode 100644 index 000000000..761f1ac78 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class TBufferedClientTransport : TClientTransport + { + private readonly int _bufSize; + private readonly MemoryStream _inputBuffer = new MemoryStream(0); + private readonly MemoryStream _outputBuffer = new MemoryStream(0); + private readonly TClientTransport _transport; + private bool _isDisposed; + + //TODO: should support only specified input transport? + public TBufferedClientTransport(TClientTransport transport, int bufSize = 1024) + { + if (bufSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number."); + } + + _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + _bufSize = bufSize; + } + + public TClientTransport UnderlyingTransport + { + get + { + CheckNotDisposed(); + + return _transport; + } + } + + public override bool IsOpen => !_isDisposed && _transport.IsOpen; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + await _transport.OpenAsync(cancellationToken); + } + + public override void Close() + { + CheckNotDisposed(); + + _transport.Close(); + } + + public override async Task<int> ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + //TODO: investigate how it should work correctly + CheckNotDisposed(); + + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (_inputBuffer.Capacity < _bufSize) + { + _inputBuffer.Capacity = _bufSize; + } + + var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken); + if (got > 0) + { + return got; + } + + _inputBuffer.Seek(0, SeekOrigin.Begin); + _inputBuffer.SetLength(_inputBuffer.Capacity); + + ArraySegment<byte> bufSegment; + _inputBuffer.TryGetBuffer(out bufSegment); + + // investigate + var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken); + _inputBuffer.SetLength(filled); + + if (filled == 0) + { + return 0; + } + + return await ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + // Relative offset from "off" argument + var writtenCount = 0; + if (_outputBuffer.Length > 0) + { + var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length); + var writeSize = capa <= length ? capa : length; + await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken); + + writtenCount += writeSize; + if (writeSize == capa) + { + //ArraySegment<byte> bufSegment; + //_outputBuffer.TryGetBuffer(out bufSegment); + var data = _outputBuffer.ToArray(); + //await _transport.WriteAsync(bufSegment.Array, cancellationToken); + await _transport.WriteAsync(data, cancellationToken); + _outputBuffer.SetLength(0); + } + } + + while (length - writtenCount >= _bufSize) + { + await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken); + writtenCount += _bufSize; + } + + var remain = length - writtenCount; + if (remain > 0) + { + if (_outputBuffer.Capacity < _bufSize) + { + _outputBuffer.Capacity = _bufSize; + } + await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken); + } + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (_outputBuffer.Length > 0) + { + //ArraySegment<byte> bufSegment; + var data = _outputBuffer.ToArray(); // TryGetBuffer(out bufSegment); + + await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken); + _outputBuffer.SetLength(0); + } + + await _transport.FlushAsync(cancellationToken); + } + + private void CheckNotDisposed() + { + if (_isDisposed) + { + throw new ObjectDisposedException(nameof(_transport)); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _inputBuffer?.Dispose(); + _outputBuffer?.Dispose(); + _transport?.Dispose(); + } + } + _isDisposed = true; + } + } +}
\ No newline at end of file |