From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../Transports/Client/TBufferedClientTransport.cs | 206 ++++++++++++++++++ .../Transports/Client/TFramedClientTransport.cs | 201 +++++++++++++++++ .../Transports/Client/THttpClientTransport.cs | 227 ++++++++++++++++++++ .../Client/TMemoryBufferClientTransport.cs | 97 +++++++++ .../Transports/Client/TNamedPipeClientTransport.cs | 95 +++++++++ .../Transports/Client/TSocketClientTransport.cs | 139 ++++++++++++ .../Transports/Client/TStreamClientTransport.cs | 110 ++++++++++ .../Transports/Client/TTlsSocketClientTransport.cs | 237 +++++++++++++++++++++ .../Transports/Server/THttpServerTransport.cs | 98 +++++++++ .../Transports/Server/TNamedPipeServerTransport.cs | 191 +++++++++++++++++ .../Transports/Server/TServerFramedTransport.cs | 150 +++++++++++++ .../Transports/Server/TServerSocketTransport.cs | 174 +++++++++++++++ .../Transports/Server/TTlsServerSocketTransport.cs | 177 +++++++++++++++ .../netcore/Thrift/Transports/TClientTransport.cs | 179 ++++++++++++++++ .../netcore/Thrift/Transports/TServerTransport.cs | 54 +++++ .../Thrift/Transports/TTransportException.cs | 58 +++++ .../netcore/Thrift/Transports/TTransportFactory.cs | 35 +++ 17 files changed, 2428 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TClientTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TServerTransport.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportException.cs create mode 100644 src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportFactory.cs (limited to 'src/jaegertracing/thrift/lib/netcore/Thrift/Transports') diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs new file mode 100644 index 000000000..761f1ac78 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class TBufferedClientTransport : TClientTransport + { + private readonly int _bufSize; + private readonly MemoryStream _inputBuffer = new MemoryStream(0); + private readonly MemoryStream _outputBuffer = new MemoryStream(0); + private readonly TClientTransport _transport; + private bool _isDisposed; + + //TODO: should support only specified input transport? + public TBufferedClientTransport(TClientTransport transport, int bufSize = 1024) + { + if (bufSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number."); + } + + _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + _bufSize = bufSize; + } + + public TClientTransport UnderlyingTransport + { + get + { + CheckNotDisposed(); + + return _transport; + } + } + + public override bool IsOpen => !_isDisposed && _transport.IsOpen; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + await _transport.OpenAsync(cancellationToken); + } + + public override void Close() + { + CheckNotDisposed(); + + _transport.Close(); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + //TODO: investigate how it should work correctly + CheckNotDisposed(); + + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (_inputBuffer.Capacity < _bufSize) + { + _inputBuffer.Capacity = _bufSize; + } + + var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken); + if (got > 0) + { + return got; + } + + _inputBuffer.Seek(0, SeekOrigin.Begin); + _inputBuffer.SetLength(_inputBuffer.Capacity); + + ArraySegment bufSegment; + _inputBuffer.TryGetBuffer(out bufSegment); + + // investigate + var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken); + _inputBuffer.SetLength(filled); + + if (filled == 0) + { + return 0; + } + + return await ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + // Relative offset from "off" argument + var writtenCount = 0; + if (_outputBuffer.Length > 0) + { + var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length); + var writeSize = capa <= length ? capa : length; + await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken); + + writtenCount += writeSize; + if (writeSize == capa) + { + //ArraySegment bufSegment; + //_outputBuffer.TryGetBuffer(out bufSegment); + var data = _outputBuffer.ToArray(); + //await _transport.WriteAsync(bufSegment.Array, cancellationToken); + await _transport.WriteAsync(data, cancellationToken); + _outputBuffer.SetLength(0); + } + } + + while (length - writtenCount >= _bufSize) + { + await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken); + writtenCount += _bufSize; + } + + var remain = length - writtenCount; + if (remain > 0) + { + if (_outputBuffer.Capacity < _bufSize) + { + _outputBuffer.Capacity = _bufSize; + } + await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken); + } + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (_outputBuffer.Length > 0) + { + //ArraySegment bufSegment; + var data = _outputBuffer.ToArray(); // TryGetBuffer(out bufSegment); + + await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken); + _outputBuffer.SetLength(0); + } + + await _transport.FlushAsync(cancellationToken); + } + + private void CheckNotDisposed() + { + if (_isDisposed) + { + throw new ObjectDisposedException(nameof(_transport)); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _inputBuffer?.Dispose(); + _outputBuffer?.Dispose(); + _transport?.Dispose(); + } + } + _isDisposed = true; + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs new file mode 100644 index 000000000..d11bb959a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + //TODO: check for correct implementation + + // ReSharper disable once InconsistentNaming + public class TFramedClientTransport : TClientTransport + { + private const int HeaderSize = 4; + private readonly byte[] _headerBuf = new byte[HeaderSize]; + private readonly MemoryStream _readBuffer = new MemoryStream(1024); + private readonly TClientTransport _transport; + private readonly MemoryStream _writeBuffer = new MemoryStream(1024); + + private bool _isDisposed; + + public TFramedClientTransport(TClientTransport transport) + { + _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + + InitWriteBuffer(); + } + + public override bool IsOpen => !_isDisposed && _transport.IsOpen; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + await _transport.OpenAsync(cancellationToken); + } + + public override void Close() + { + CheckNotDisposed(); + + _transport.Close(); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + CheckNotDisposed(); + + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + var got = await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken); + if (got > 0) + { + return got; + } + + // Read another frame of data + await ReadFrameAsync(cancellationToken); + + return await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken); + } + + private async Task ReadFrameAsync(CancellationToken cancellationToken) + { + await _transport.ReadAllAsync(_headerBuf, 0, HeaderSize, cancellationToken); + + var size = DecodeFrameSize(_headerBuf); + + _readBuffer.SetLength(size); + _readBuffer.Seek(0, SeekOrigin.Begin); + + ArraySegment bufSegment; + _readBuffer.TryGetBuffer(out bufSegment); + + var buff = bufSegment.Array; + + await _transport.ReadAllAsync(buff, 0, size, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (_writeBuffer.Length + length > int.MaxValue) + { + await FlushAsync(cancellationToken); + } + + await _writeBuffer.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + //ArraySegment bufSegment; + //_writeBuffer.TryGetBuffer(out bufSegment); + //var buf = bufSegment.Array; + var buf = _writeBuffer.ToArray(); + + //var len = (int)_writeBuffer.Length; + var dataLen = (int) _writeBuffer.Length - HeaderSize; + if (dataLen < 0) + { + throw new InvalidOperationException(); // logic error actually + } + + // Inject message header into the reserved buffer space + EncodeFrameSize(dataLen, buf); + + // Send the entire message at once + await _transport.WriteAsync(buf, cancellationToken); + + InitWriteBuffer(); + + await _transport.FlushAsync(cancellationToken); + } + + private void InitWriteBuffer() + { + // Reserve space for message header to be put right before sending it out + _writeBuffer.SetLength(HeaderSize); + _writeBuffer.Seek(0, SeekOrigin.End); + } + + private static void EncodeFrameSize(int frameSize, byte[] buf) + { + buf[0] = (byte) (0xff & (frameSize >> 24)); + buf[1] = (byte) (0xff & (frameSize >> 16)); + buf[2] = (byte) (0xff & (frameSize >> 8)); + buf[3] = (byte) (0xff & (frameSize)); + } + + private static int DecodeFrameSize(byte[] buf) + { + return + ((buf[0] & 0xff) << 24) | + ((buf[1] & 0xff) << 16) | + ((buf[2] & 0xff) << 8) | + (buf[3] & 0xff); + } + + + private void CheckNotDisposed() + { + if (_isDisposed) + { + throw new ObjectDisposedException("TFramedClientTransport"); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _readBuffer?.Dispose(); + _writeBuffer?.Dispose(); + _transport?.Dispose(); + } + } + _isDisposed = true; + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs new file mode 100644 index 000000000..8bce9e4a0 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class THttpClientTransport : TClientTransport + { + private readonly X509Certificate[] _certificates; + private readonly Uri _uri; + + // Timeouts in milliseconds + private int _connectTimeout = 30000; + private HttpClient _httpClient; + private Stream _inputStream; + + private bool _isDisposed; + private MemoryStream _outputStream = new MemoryStream(); + + public THttpClientTransport(Uri u, IDictionary customHeaders) + : this(u, Enumerable.Empty(), customHeaders) + { + } + + public THttpClientTransport(Uri u, IEnumerable certificates, + IDictionary customHeaders) + { + _uri = u; + _certificates = (certificates ?? Enumerable.Empty()).ToArray(); + CustomHeaders = customHeaders; + + // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 + // this can be switched to default way (create client->use->dispose per flush) later + _httpClient = CreateClient(); + } + + public IDictionary CustomHeaders { get; } + + public int ConnectTimeout + { + set { _connectTimeout = value; } + get { return _connectTimeout; } + } + + public override bool IsOpen => true; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + if (_inputStream != null) + { + _inputStream.Dispose(); + _inputStream = null; + } + + if (_outputStream != null) + { + _outputStream.Dispose(); + _outputStream = null; + } + + if (_httpClient != null) + { + _httpClient.Dispose(); + _httpClient = null; + } + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled(cancellationToken); + } + + if (_inputStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); + } + + try + { + var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); + + if (ret == -1) + { + throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); + } + + return ret; + } + catch (IOException iox) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); + } + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); + } + + private HttpClient CreateClient() + { + var handler = new HttpClientHandler(); + handler.ClientCertificates.AddRange(_certificates); + + var httpClient = new HttpClient(handler); + + if (_connectTimeout > 0) + { + httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); + } + + httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); + httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("THttpClientTransport", "0.13.0")); + + if (CustomHeaders != null) + { + foreach (var item in CustomHeaders) + { + httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); + } + } + + return httpClient; + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + try + { + try + { + if (_outputStream.CanSeek) + { + _outputStream.Seek(0, SeekOrigin.Begin); + } + + using (var outStream = new StreamContent(_outputStream)) + { + var msg = await _httpClient.PostAsync(_uri, outStream, cancellationToken); + + msg.EnsureSuccessStatusCode(); + + if (_inputStream != null) + { + _inputStream.Dispose(); + _inputStream = null; + } + + _inputStream = await msg.Content.ReadAsStreamAsync(); + if (_inputStream.CanSeek) + { + _inputStream.Seek(0, SeekOrigin.Begin); + } + } + } + catch (IOException iox) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); + } + catch (HttpRequestException wx) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, + "Couldn't connect to server: " + wx); + } + } + finally + { + _outputStream = new MemoryStream(); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _inputStream?.Dispose(); + _outputStream?.Dispose(); + _httpClient?.Dispose(); + } + } + _isDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs new file mode 100644 index 000000000..46a55a64a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class TMemoryBufferClientTransport : TClientTransport + { + private readonly MemoryStream _byteStream; + private bool _isDisposed; + + public TMemoryBufferClientTransport() + { + _byteStream = new MemoryStream(); + } + + public TMemoryBufferClientTransport(byte[] buf) + { + _byteStream = new MemoryStream(buf); + } + + public override bool IsOpen => true; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + /** do nothing **/ + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + return await _byteStream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) + { + await _byteStream.WriteAsync(buffer, 0, buffer.Length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + await _byteStream.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public byte[] GetBuffer() + { + return _byteStream.ToArray(); + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _byteStream?.Dispose(); + } + } + _isDisposed = true; + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs new file mode 100644 index 000000000..f5e4baf4a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO.Pipes; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class TNamedPipeClientTransport : TClientTransport + { + private NamedPipeClientStream _client; + + public TNamedPipeClientTransport(string pipe) : this(".", pipe) + { + } + + public TNamedPipeClientTransport(string server, string pipe) + { + var serverName = string.IsNullOrWhiteSpace(server) ? server : "."; + + _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None); + } + + public override bool IsOpen => _client != null && _client.IsConnected; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen); + } + + await _client.ConnectAsync(cancellationToken); + } + + public override void Close() + { + if (_client != null) + { + _client.Dispose(); + _client = null; + } + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (_client == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + return await _client.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (_client == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + await _client.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + protected override void Dispose(bool disposing) + { + _client.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs new file mode 100644 index 000000000..e769d1421 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class TSocketClientTransport : TStreamClientTransport + { + private bool _isDisposed; + + public TSocketClientTransport(TcpClient client) + { + TcpClient = client ?? throw new ArgumentNullException(nameof(client)); + + if (IsOpen) + { + InputStream = client.GetStream(); + OutputStream = client.GetStream(); + } + } + + public TSocketClientTransport(IPAddress host, int port) + : this(host, port, 0) + { + } + + public TSocketClientTransport(IPAddress host, int port, int timeout) + { + Host = host; + Port = port; + + TcpClient = new TcpClient(); + TcpClient.ReceiveTimeout = TcpClient.SendTimeout = timeout; + TcpClient.Client.NoDelay = true; + } + + public TcpClient TcpClient { get; private set; } + public IPAddress Host { get; } + public int Port { get; } + + public int Timeout + { + set + { + if (TcpClient != null) + { + TcpClient.ReceiveTimeout = TcpClient.SendTimeout = value; + } + } + } + + public override bool IsOpen + { + get + { + if (TcpClient == null) + { + return false; + } + + return TcpClient.Connected; + } + } + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + if (IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen, "Socket already connected"); + } + + if (Port <= 0) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open without port"); + } + + if (TcpClient == null) + { + throw new InvalidOperationException("Invalid or not initialized tcp client"); + } + + await TcpClient.ConnectAsync(Host, Port); + + InputStream = TcpClient.GetStream(); + OutputStream = TcpClient.GetStream(); + } + + public override void Close() + { + base.Close(); + + if (TcpClient != null) + { + TcpClient.Dispose(); + TcpClient = null; + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + TcpClient?.Dispose(); + + base.Dispose(disposing); + } + } + _isDisposed = true; + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs new file mode 100644 index 000000000..be2011e83 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + // ReSharper disable once InconsistentNaming + public class TStreamClientTransport : TClientTransport + { + private bool _isDisposed; + + protected TStreamClientTransport() + { + } + + public TStreamClientTransport(Stream inputStream, Stream outputStream) + { + InputStream = inputStream; + OutputStream = outputStream; + } + + protected Stream OutputStream { get; set; } + + protected Stream InputStream { get; set; } + + public override bool IsOpen => true; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + if (InputStream != null) + { + InputStream.Dispose(); + InputStream = null; + } + + if (OutputStream != null) + { + OutputStream.Dispose(); + OutputStream = null; + } + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (InputStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, + "Cannot read from null inputstream"); + } + + return await InputStream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (OutputStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, + "Cannot write to null outputstream"); + } + + await OutputStream.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await OutputStream.FlushAsync(cancellationToken); + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + InputStream?.Dispose(); + OutputStream?.Dispose(); + } + } + _isDisposed = true; + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs new file mode 100644 index 000000000..c8be4ede1 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Client +{ + //TODO: check for correct work + + // ReSharper disable once InconsistentNaming + public class TTlsSocketClientTransport : TStreamClientTransport + { + private readonly X509Certificate2 _certificate; + private readonly RemoteCertificateValidationCallback _certValidator; + private readonly IPAddress _host; + private readonly bool _isServer; + private readonly LocalCertificateSelectionCallback _localCertificateSelectionCallback; + private readonly int _port; + private readonly SslProtocols _sslProtocols; + private TcpClient _client; + private SslStream _secureStream; + private int _timeout; + + public TTlsSocketClientTransport(TcpClient client, X509Certificate2 certificate, bool isServer = false, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + _client = client; + _certificate = certificate; + _certValidator = certValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + _isServer = isServer; + + if (isServer && certificate == null) + { + throw new ArgumentException("TTlsSocketClientTransport needs certificate to be used for server", + nameof(certificate)); + } + + if (IsOpen) + { + InputStream = client.GetStream(); + OutputStream = client.GetStream(); + } + } + + public TTlsSocketClientTransport(IPAddress host, int port, string certificatePath, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(host, port, 0, + new X509Certificate2(certificatePath), + certValidator, + localCertificateSelectionCallback, + sslProtocols) + { + } + + public TTlsSocketClientTransport(IPAddress host, int port, + X509Certificate2 certificate = null, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(host, port, 0, + certificate, + certValidator, + localCertificateSelectionCallback, + sslProtocols) + { + } + + public TTlsSocketClientTransport(IPAddress host, int port, int timeout, + X509Certificate2 certificate, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + _host = host; + _port = port; + _timeout = timeout; + _certificate = certificate; + _certValidator = certValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + + InitSocket(); + } + + public int Timeout + { + set { _client.ReceiveTimeout = _client.SendTimeout = _timeout = value; } + } + + public TcpClient TcpClient => _client; + + public IPAddress Host => _host; + + public int Port => _port; + + public override bool IsOpen + { + get + { + if (_client == null) + { + return false; + } + + return _client.Connected; + } + } + + private void InitSocket() + { + _client = new TcpClient(); + _client.ReceiveTimeout = _client.SendTimeout = _timeout; + _client.Client.NoDelay = true; + } + + private bool DefaultCertificateValidator(object sender, X509Certificate certificate, X509Chain chain, + SslPolicyErrors sslValidationErrors) + { + return sslValidationErrors == SslPolicyErrors.None; + } + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen, "Socket already connected"); + } + + if (_host == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open null host"); + } + + if (_port <= 0) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open without port"); + } + + if (_client == null) + { + InitSocket(); + } + + if (_client != null) + { + await _client.ConnectAsync(_host, _port); + await SetupTlsAsync(); + } + } + + public async Task SetupTlsAsync() + { + var validator = _certValidator ?? DefaultCertificateValidator; + + if (_localCertificateSelectionCallback != null) + { + _secureStream = new SslStream(_client.GetStream(), false, validator, _localCertificateSelectionCallback); + } + else + { + _secureStream = new SslStream(_client.GetStream(), false, validator); + } + + try + { + if (_isServer) + { + // Server authentication + await + _secureStream.AuthenticateAsServerAsync(_certificate, _certValidator != null, _sslProtocols, + true); + } + else + { + // Client authentication + var certs = _certificate != null + ? new X509CertificateCollection {_certificate} + : new X509CertificateCollection(); + + var targetHost = _host.ToString(); + await _secureStream.AuthenticateAsClientAsync(targetHost, certs, _sslProtocols, true); + } + } + catch (Exception) + { + Close(); + throw; + } + + InputStream = _secureStream; + OutputStream = _secureStream; + } + + public override void Close() + { + base.Close(); + if (_client != null) + { + _client.Dispose(); + _client = null; + } + + if (_secureStream != null) + { + _secureStream.Dispose(); + _secureStream = null; + } + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs new file mode 100644 index 000000000..032063a37 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; +using Thrift.Protocols; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class THttpServerTransport + { + protected const string ContentType = "application/x-thrift"; + private readonly ILogger _logger; + private readonly RequestDelegate _next; + protected Encoding Encoding = Encoding.UTF8; + + protected ITProtocolFactory InputProtocolFactory; + protected ITProtocolFactory OutputProtocolFactory; + + protected ITAsyncProcessor Processor; + + public THttpServerTransport(ITAsyncProcessor processor, RequestDelegate next, ILoggerFactory loggerFactory) + : this(processor, new TBinaryProtocol.Factory(), next, loggerFactory) + { + } + + public THttpServerTransport(ITAsyncProcessor processor, ITProtocolFactory protocolFactory, RequestDelegate next, + ILoggerFactory loggerFactory) + : this(processor, protocolFactory, protocolFactory, next, loggerFactory) + { + } + + public THttpServerTransport(ITAsyncProcessor processor, ITProtocolFactory inputProtocolFactory, + ITProtocolFactory outputProtocolFactory, RequestDelegate next, ILoggerFactory loggerFactory) + { + if (loggerFactory == null) + { + throw new ArgumentNullException(nameof(loggerFactory)); + } + + Processor = processor ?? throw new ArgumentNullException(nameof(processor)); + InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory)); + OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory)); + + _next = next; + _logger = loggerFactory.CreateLogger(); + } + + public async Task Invoke(HttpContext context) + { + context.Response.ContentType = ContentType; + await ProcessRequestAsync(context, context.RequestAborted); //TODO: check for correct logic + } + + public async Task ProcessRequestAsync(HttpContext context, CancellationToken cancellationToken) + { + var transport = new TStreamClientTransport(context.Request.Body, context.Response.Body); + + try + { + var input = InputProtocolFactory.GetProtocol(transport); + var output = OutputProtocolFactory.GetProtocol(transport); + + while (await Processor.ProcessAsync(input, output, cancellationToken)) + { + } + } + catch (TTransportException) + { + // Client died, just move on + } + finally + { + transport.Close(); + } + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs new file mode 100644 index 000000000..186786ed2 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO.Pipes; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TNamedPipeServerTransport : TServerTransport + { + /// + /// This is the address of the Pipe on the localhost. + /// + private readonly string _pipeAddress; + + private bool _asyncMode = true; + private volatile bool _isPending = true; + + private NamedPipeServerStream _stream = null; + + public TNamedPipeServerTransport(string pipeAddress) + { + _pipeAddress = pipeAddress; + } + + public override void Listen() + { + // nothing to do here + } + + public override void Close() + { + if (_stream != null) + { + try + { + //TODO: check for disconection + _stream.Disconnect(); + _stream.Dispose(); + } + finally + { + _stream = null; + _isPending = false; + } + } + } + + public override bool IsClientPending() + { + return _isPending; + } + + private void EnsurePipeInstance() + { + if (_stream == null) + { + var direction = PipeDirection.InOut; + var maxconn = 254; + var mode = PipeTransmissionMode.Byte; + var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; + var inbuf = 4096; + var outbuf = 4096; + // TODO: security + + try + { + _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); + } + catch (NotImplementedException) // Mono still does not support async, fallback to sync + { + if (_asyncMode) + { + options &= (~PipeOptions.Asynchronous); + _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, + outbuf); + _asyncMode = false; + } + else + { + throw; + } + } + } + } + + protected override async Task AcceptImplementationAsync(CancellationToken cancellationToken) + { + try + { + EnsurePipeInstance(); + + await _stream.WaitForConnectionAsync(cancellationToken); + + var trans = new ServerTransport(_stream); + _stream = null; // pass ownership to ServerTransport + + //_isPending = false; + + return trans; + } + catch (TTransportException) + { + Close(); + throw; + } + catch (Exception e) + { + Close(); + throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message); + } + } + + private class ServerTransport : TClientTransport + { + private readonly NamedPipeServerStream _stream; + + public ServerTransport(NamedPipeServerStream stream) + { + _stream = stream; + } + + public override bool IsOpen => _stream != null && _stream.IsConnected; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + _stream?.Dispose(); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (_stream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + return await _stream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (_stream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + await _stream.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + protected override void Dispose(bool disposing) + { + _stream?.Dispose(); + } + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs new file mode 100644 index 000000000..0b86e9ebb --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TServerFramedTransport : TServerTransport + { + private readonly int _clientTimeout; + private readonly int _port; + private TcpListener _server; + + public TServerFramedTransport(TcpListener listener) + : this(listener, 0) + { + } + + public TServerFramedTransport(TcpListener listener, int clientTimeout) + { + _server = listener; + _clientTimeout = clientTimeout; + } + + public TServerFramedTransport(int port) + : this(port, 0) + { + } + + public TServerFramedTransport(int port, int clientTimeout) + { + _port = port; + _clientTimeout = clientTimeout; + try + { + // Make server socket + _server = new TcpListener(IPAddress.Any, _port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public override void Listen() + { + // Make sure not to block on accept + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException("Could not accept on listening socket: " + sx.Message); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async Task AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + TFramedClientTransport tSocketTransport = null; + var tcpClient = await _server.AcceptTcpClientAsync(); + + try + { + tSocketTransport = new TFramedClientTransport(new TSocketClientTransport(tcpClient) + { + Timeout = _clientTimeout + }); + + return tSocketTransport; + } + catch (Exception) + { + if (tSocketTransport != null) + { + tSocketTransport.Dispose(); + } + else // Otherwise, clean it up ourselves. + { + ((IDisposable) tcpClient).Dispose(); + } + + throw; + } + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException("WARNING: Could not close server socket: " + ex); + } + _server = null; + } + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs new file mode 100644 index 000000000..3a9d8a17d --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TServerSocketTransport : TServerTransport + { + private readonly int _clientTimeout; + private readonly int _port; + private readonly bool _useBufferedSockets; + private readonly bool _useFramedTransport; + private TcpListener _server; + + public TServerSocketTransport(TcpListener listener) + : this(listener, 0) + { + } + + public TServerSocketTransport(TcpListener listener, int clientTimeout) + { + _server = listener; + _clientTimeout = clientTimeout; + } + + public TServerSocketTransport(int port) + : this(port, 0) + { + } + + public TServerSocketTransport(int port, int clientTimeout) + : this(port, clientTimeout, false) + { + } + + public TServerSocketTransport(int port, int clientTimeout, bool useBufferedSockets): + this(port, clientTimeout, useBufferedSockets, false) + { + } + + public TServerSocketTransport(int port, int clientTimeout, bool useBufferedSockets, bool useFramedTransport) + { + _port = port; + _clientTimeout = clientTimeout; + _useBufferedSockets = useBufferedSockets; + _useFramedTransport = useFramedTransport; + try + { + // Make server socket + _server = new TcpListener(IPAddress.Any, _port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public override void Listen() + { + // Make sure not to block on accept + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException("Could not accept on listening socket: " + sx.Message); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async Task AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + TClientTransport tSocketTransport = null; + var tcpClient = await _server.AcceptTcpClientAsync(); + + try + { + tSocketTransport = new TSocketClientTransport(tcpClient) + { + Timeout = _clientTimeout + }; + + if (_useBufferedSockets) + { + tSocketTransport = new TBufferedClientTransport(tSocketTransport); + } + + if (_useFramedTransport) + { + tSocketTransport = new TFramedClientTransport(tSocketTransport); + } + + return tSocketTransport; + } + catch (Exception) + { + if (tSocketTransport != null) + { + tSocketTransport.Dispose(); + } + else // Otherwise, clean it up ourselves. + { + ((IDisposable) tcpClient).Dispose(); + } + + throw; + } + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException("WARNING: Could not close server socket: " + ex); + } + _server = null; + } + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs new file mode 100644 index 000000000..759feeddd --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TTlsServerSocketTransport : TServerTransport + { + private readonly RemoteCertificateValidationCallback _clientCertValidator; + private readonly int _clientTimeout = 0; + private readonly LocalCertificateSelectionCallback _localCertificateSelectionCallback; + private readonly int _port; + private readonly X509Certificate2 _serverCertificate; + private readonly SslProtocols _sslProtocols; + private readonly bool _useBufferedSockets; + private readonly bool _useFramedTransport; + private TcpListener _server; + + public TTlsServerSocketTransport(int port, X509Certificate2 certificate) + : this(port, false, certificate) + { + } + + public TTlsServerSocketTransport( + int port, + bool useBufferedSockets, + X509Certificate2 certificate, + RemoteCertificateValidationCallback clientCertValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(port, useBufferedSockets, false, certificate, + clientCertValidator, localCertificateSelectionCallback, sslProtocols) + { + } + + public TTlsServerSocketTransport( + int port, + bool useBufferedSockets, + bool useFramedTransport, + X509Certificate2 certificate, + RemoteCertificateValidationCallback clientCertValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + if (!certificate.HasPrivateKey) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, + "Your server-certificate needs to have a private key"); + } + + _port = port; + _serverCertificate = certificate; + _useBufferedSockets = useBufferedSockets; + _useFramedTransport = useFramedTransport; + _clientCertValidator = clientCertValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + + try + { + // Create server socket + _server = new TcpListener(IPAddress.Any, _port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException($"Could not create ServerSocket on port {port}."); + } + } + + public override void Listen() + { + // Make sure accept is not blocking + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException($"Could not accept on listening socket: {sx.Message}"); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async Task AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + var client = await _server.AcceptTcpClientAsync(); + client.SendTimeout = client.ReceiveTimeout = _clientTimeout; + + //wrap the client in an SSL Socket passing in the SSL cert + var tTlsSocket = new TTlsSocketClientTransport(client, _serverCertificate, true, _clientCertValidator, + _localCertificateSelectionCallback, _sslProtocols); + + await tTlsSocket.SetupTlsAsync(); + + TClientTransport trans = tTlsSocket; + + if (_useBufferedSockets) + { + trans = new TBufferedClientTransport(trans); + } + + if (_useFramedTransport) + { + trans = new TFramedClientTransport(trans); + } + + return trans; + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException($"WARNING: Could not close server socket: {ex}"); + } + + _server = null; + } + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TClientTransport.cs new file mode 100644 index 000000000..0dd96cb36 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TClientTransport.cs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports +{ + //TODO: think about client info + // ReSharper disable once InconsistentNaming + public abstract class TClientTransport : IDisposable + { + //TODO: think how to avoid peek byte + private readonly byte[] _peekBuffer = new byte[1]; + private bool _hasPeekByte; + public abstract bool IsOpen { get; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public async Task PeekAsync(CancellationToken cancellationToken) + { + //If we already have a byte read but not consumed, do nothing. + if (_hasPeekByte) + { + return true; + } + + //If transport closed we can't peek. + if (!IsOpen) + { + return false; + } + + //Try to read one byte. If succeeds we will need to store it for the next read. + try + { + var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken); + if (bytes == 0) + { + return false; + } + } + catch (IOException) + { + return false; + } + + _hasPeekByte = true; + return true; + } + + public virtual async Task OpenAsync() + { + await OpenAsync(CancellationToken.None); + } + + public abstract Task OpenAsync(CancellationToken cancellationToken); + + public abstract void Close(); + + protected static void ValidateBufferArgs(byte[] buffer, int offset, int length) + { + if (buffer == null) + { + throw new ArgumentNullException(nameof(buffer)); + } + + if (offset < 0) + { + throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset is smaller than zero."); + } + + if (length < 0) + { + throw new ArgumentOutOfRangeException(nameof(length), "Buffer length is smaller than zero."); + } + + if (offset + length > buffer.Length) + { + throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data."); + } + } + + public virtual async Task ReadAsync(byte[] buffer, int offset, int length) + { + return await ReadAsync(buffer, offset, length, CancellationToken.None); + } + + public abstract Task ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken); + + public virtual async Task ReadAllAsync(byte[] buffer, int offset, int length) + { + return await ReadAllAsync(buffer, offset, length, CancellationToken.None); + } + + public virtual async Task ReadAllAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + ValidateBufferArgs(buffer, offset, length); + + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled(cancellationToken); + } + + var retrieved = 0; + + //If we previously peeked a byte, we need to use that first. + if (_hasPeekByte) + { + buffer[offset + retrieved++] = _peekBuffer[0]; + _hasPeekByte = false; + } + + while (retrieved < length) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled(cancellationToken); + } + + var returnedCount = await ReadAsync(buffer, offset + retrieved, length - retrieved, cancellationToken); + if (returnedCount <= 0) + { + throw new TTransportException(TTransportException.ExceptionType.EndOfFile, + "Cannot read, Remote side has closed"); + } + retrieved += returnedCount; + } + return retrieved; + } + + public virtual async Task WriteAsync(byte[] buffer) + { + await WriteAsync(buffer, CancellationToken.None); + } + + public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) + { + await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None); + } + + public virtual async Task WriteAsync(byte[] buffer, int offset, int length) + { + await WriteAsync(buffer, offset, length, CancellationToken.None); + } + + public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken); + + public virtual async Task FlushAsync() + { + await FlushAsync(CancellationToken.None); + } + + public abstract Task FlushAsync(CancellationToken cancellationToken); + + protected abstract void Dispose(bool disposing); + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TServerTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TServerTransport.cs new file mode 100644 index 000000000..0d45a55f9 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TServerTransport.cs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports +{ + // ReSharper disable once InconsistentNaming + public abstract class TServerTransport + { + public abstract void Listen(); + public abstract void Close(); + public abstract bool IsClientPending(); + + protected virtual async Task AcceptImplementationAsync() + { + return await AcceptImplementationAsync(CancellationToken.None); + } + + protected abstract Task AcceptImplementationAsync(CancellationToken cancellationToken); + + public async Task AcceptAsync() + { + return await AcceptAsync(CancellationToken.None); + } + + public async Task AcceptAsync(CancellationToken cancellationToken) + { + var transport = await AcceptImplementationAsync(cancellationToken); + + if (transport == null) + { + throw new TTransportException($"{nameof(AcceptImplementationAsync)} should not return null"); + } + + return transport; + } + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportException.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportException.cs new file mode 100644 index 000000000..b7c42e33a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportException.cs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Transports +{ + // ReSharper disable once InconsistentNaming + public class TTransportException : TException + { + public enum ExceptionType + { + Unknown, + NotOpen, + AlreadyOpen, + TimedOut, + EndOfFile, + Interrupted + } + + protected ExceptionType ExType; + + public TTransportException() + { + } + + public TTransportException(ExceptionType exType) + : this() + { + ExType = exType; + } + + public TTransportException(ExceptionType exType, string message) + : base(message) + { + ExType = exType; + } + + public TTransportException(string message) + : base(message) + { + } + + public ExceptionType Type => ExType; + } +} \ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportFactory.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportFactory.cs new file mode 100644 index 000000000..26c3cc471 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/TTransportFactory.cs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Transports +{ + /// + /// From Mark Slee & Aditya Agarwal of Facebook: + /// Factory class used to create wrapped instance of Transports. + /// This is used primarily in servers, which get Transports from + /// a ServerTransport and then may want to mutate them (i.e. create + /// a BufferedTransport from the underlying base transport) + /// + // ReSharper disable once InconsistentNaming + public class TTransportFactory + { + public virtual TClientTransport GetTransport(TClientTransport trans) + { + return trans; + } + } +} \ No newline at end of file -- cgit v1.2.3