summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs206
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs201
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs227
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs97
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs95
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs139
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs110
-rw-r--r--src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs237
8 files changed, 1312 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs
new file mode 100644
index 000000000..761f1ac78
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TBufferedClientTransport.cs
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ // ReSharper disable once InconsistentNaming
+ public class TBufferedClientTransport : TClientTransport
+ {
+ private readonly int _bufSize;
+ private readonly MemoryStream _inputBuffer = new MemoryStream(0);
+ private readonly MemoryStream _outputBuffer = new MemoryStream(0);
+ private readonly TClientTransport _transport;
+ private bool _isDisposed;
+
+ //TODO: should support only specified input transport?
+ public TBufferedClientTransport(TClientTransport transport, int bufSize = 1024)
+ {
+ if (bufSize <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number.");
+ }
+
+ _transport = transport ?? throw new ArgumentNullException(nameof(transport));
+ _bufSize = bufSize;
+ }
+
+ public TClientTransport UnderlyingTransport
+ {
+ get
+ {
+ CheckNotDisposed();
+
+ return _transport;
+ }
+ }
+
+ public override bool IsOpen => !_isDisposed && _transport.IsOpen;
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ await _transport.OpenAsync(cancellationToken);
+ }
+
+ public override void Close()
+ {
+ CheckNotDisposed();
+
+ _transport.Close();
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
+ CancellationToken cancellationToken)
+ {
+ //TODO: investigate how it should work correctly
+ CheckNotDisposed();
+
+ ValidateBufferArgs(buffer, offset, length);
+
+ if (!IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ if (_inputBuffer.Capacity < _bufSize)
+ {
+ _inputBuffer.Capacity = _bufSize;
+ }
+
+ var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken);
+ if (got > 0)
+ {
+ return got;
+ }
+
+ _inputBuffer.Seek(0, SeekOrigin.Begin);
+ _inputBuffer.SetLength(_inputBuffer.Capacity);
+
+ ArraySegment<byte> bufSegment;
+ _inputBuffer.TryGetBuffer(out bufSegment);
+
+ // investigate
+ var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken);
+ _inputBuffer.SetLength(filled);
+
+ if (filled == 0)
+ {
+ return 0;
+ }
+
+ return await ReadAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ ValidateBufferArgs(buffer, offset, length);
+
+ if (!IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ // Relative offset from "off" argument
+ var writtenCount = 0;
+ if (_outputBuffer.Length > 0)
+ {
+ var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length);
+ var writeSize = capa <= length ? capa : length;
+ await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken);
+
+ writtenCount += writeSize;
+ if (writeSize == capa)
+ {
+ //ArraySegment<byte> bufSegment;
+ //_outputBuffer.TryGetBuffer(out bufSegment);
+ var data = _outputBuffer.ToArray();
+ //await _transport.WriteAsync(bufSegment.Array, cancellationToken);
+ await _transport.WriteAsync(data, cancellationToken);
+ _outputBuffer.SetLength(0);
+ }
+ }
+
+ while (length - writtenCount >= _bufSize)
+ {
+ await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken);
+ writtenCount += _bufSize;
+ }
+
+ var remain = length - writtenCount;
+ if (remain > 0)
+ {
+ if (_outputBuffer.Capacity < _bufSize)
+ {
+ _outputBuffer.Capacity = _bufSize;
+ }
+ await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken);
+ }
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ if (!IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ if (_outputBuffer.Length > 0)
+ {
+ //ArraySegment<byte> bufSegment;
+ var data = _outputBuffer.ToArray(); // TryGetBuffer(out bufSegment);
+
+ await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken);
+ _outputBuffer.SetLength(0);
+ }
+
+ await _transport.FlushAsync(cancellationToken);
+ }
+
+ private void CheckNotDisposed()
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException(nameof(_transport));
+ }
+ }
+
+ // IDisposable
+ protected override void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ _inputBuffer?.Dispose();
+ _outputBuffer?.Dispose();
+ _transport?.Dispose();
+ }
+ }
+ _isDisposed = true;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs
new file mode 100644
index 000000000..d11bb959a
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TFramedClientTransport.cs
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ //TODO: check for correct implementation
+
+ // ReSharper disable once InconsistentNaming
+ public class TFramedClientTransport : TClientTransport
+ {
+ private const int HeaderSize = 4;
+ private readonly byte[] _headerBuf = new byte[HeaderSize];
+ private readonly MemoryStream _readBuffer = new MemoryStream(1024);
+ private readonly TClientTransport _transport;
+ private readonly MemoryStream _writeBuffer = new MemoryStream(1024);
+
+ private bool _isDisposed;
+
+ public TFramedClientTransport(TClientTransport transport)
+ {
+ _transport = transport ?? throw new ArgumentNullException(nameof(transport));
+
+ InitWriteBuffer();
+ }
+
+ public override bool IsOpen => !_isDisposed && _transport.IsOpen;
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ await _transport.OpenAsync(cancellationToken);
+ }
+
+ public override void Close()
+ {
+ CheckNotDisposed();
+
+ _transport.Close();
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
+ CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ ValidateBufferArgs(buffer, offset, length);
+
+ if (!IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ var got = await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken);
+ if (got > 0)
+ {
+ return got;
+ }
+
+ // Read another frame of data
+ await ReadFrameAsync(cancellationToken);
+
+ return await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken);
+ }
+
+ private async Task ReadFrameAsync(CancellationToken cancellationToken)
+ {
+ await _transport.ReadAllAsync(_headerBuf, 0, HeaderSize, cancellationToken);
+
+ var size = DecodeFrameSize(_headerBuf);
+
+ _readBuffer.SetLength(size);
+ _readBuffer.Seek(0, SeekOrigin.Begin);
+
+ ArraySegment<byte> bufSegment;
+ _readBuffer.TryGetBuffer(out bufSegment);
+
+ var buff = bufSegment.Array;
+
+ await _transport.ReadAllAsync(buff, 0, size, cancellationToken);
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ ValidateBufferArgs(buffer, offset, length);
+
+ if (!IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ if (_writeBuffer.Length + length > int.MaxValue)
+ {
+ await FlushAsync(cancellationToken);
+ }
+
+ await _writeBuffer.WriteAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ CheckNotDisposed();
+
+ if (!IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ //ArraySegment<byte> bufSegment;
+ //_writeBuffer.TryGetBuffer(out bufSegment);
+ //var buf = bufSegment.Array;
+ var buf = _writeBuffer.ToArray();
+
+ //var len = (int)_writeBuffer.Length;
+ var dataLen = (int) _writeBuffer.Length - HeaderSize;
+ if (dataLen < 0)
+ {
+ throw new InvalidOperationException(); // logic error actually
+ }
+
+ // Inject message header into the reserved buffer space
+ EncodeFrameSize(dataLen, buf);
+
+ // Send the entire message at once
+ await _transport.WriteAsync(buf, cancellationToken);
+
+ InitWriteBuffer();
+
+ await _transport.FlushAsync(cancellationToken);
+ }
+
+ private void InitWriteBuffer()
+ {
+ // Reserve space for message header to be put right before sending it out
+ _writeBuffer.SetLength(HeaderSize);
+ _writeBuffer.Seek(0, SeekOrigin.End);
+ }
+
+ private static void EncodeFrameSize(int frameSize, byte[] buf)
+ {
+ buf[0] = (byte) (0xff & (frameSize >> 24));
+ buf[1] = (byte) (0xff & (frameSize >> 16));
+ buf[2] = (byte) (0xff & (frameSize >> 8));
+ buf[3] = (byte) (0xff & (frameSize));
+ }
+
+ private static int DecodeFrameSize(byte[] buf)
+ {
+ return
+ ((buf[0] & 0xff) << 24) |
+ ((buf[1] & 0xff) << 16) |
+ ((buf[2] & 0xff) << 8) |
+ (buf[3] & 0xff);
+ }
+
+
+ private void CheckNotDisposed()
+ {
+ if (_isDisposed)
+ {
+ throw new ObjectDisposedException("TFramedClientTransport");
+ }
+ }
+
+ // IDisposable
+ protected override void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ _readBuffer?.Dispose();
+ _writeBuffer?.Dispose();
+ _transport?.Dispose();
+ }
+ }
+ _isDisposed = true;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs
new file mode 100644
index 000000000..8bce9e4a0
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/THttpClientTransport.cs
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Security.Cryptography.X509Certificates;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ // ReSharper disable once InconsistentNaming
+ public class THttpClientTransport : TClientTransport
+ {
+ private readonly X509Certificate[] _certificates;
+ private readonly Uri _uri;
+
+ // Timeouts in milliseconds
+ private int _connectTimeout = 30000;
+ private HttpClient _httpClient;
+ private Stream _inputStream;
+
+ private bool _isDisposed;
+ private MemoryStream _outputStream = new MemoryStream();
+
+ public THttpClientTransport(Uri u, IDictionary<string, string> customHeaders)
+ : this(u, Enumerable.Empty<X509Certificate>(), customHeaders)
+ {
+ }
+
+ public THttpClientTransport(Uri u, IEnumerable<X509Certificate> certificates,
+ IDictionary<string, string> customHeaders)
+ {
+ _uri = u;
+ _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray();
+ CustomHeaders = customHeaders;
+
+ // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809
+ // this can be switched to default way (create client->use->dispose per flush) later
+ _httpClient = CreateClient();
+ }
+
+ public IDictionary<string, string> CustomHeaders { get; }
+
+ public int ConnectTimeout
+ {
+ set { _connectTimeout = value; }
+ get { return _connectTimeout; }
+ }
+
+ public override bool IsOpen => true;
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+ }
+
+ public override void Close()
+ {
+ if (_inputStream != null)
+ {
+ _inputStream.Dispose();
+ _inputStream = null;
+ }
+
+ if (_outputStream != null)
+ {
+ _outputStream.Dispose();
+ _outputStream = null;
+ }
+
+ if (_httpClient != null)
+ {
+ _httpClient.Dispose();
+ _httpClient = null;
+ }
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
+ CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return await Task.FromCanceled<int>(cancellationToken);
+ }
+
+ if (_inputStream == null)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
+ }
+
+ try
+ {
+ var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
+
+ if (ret == -1)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
+ }
+
+ return ret;
+ }
+ catch (IOException iox)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
+ }
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+
+ await _outputStream.WriteAsync(buffer, offset, length, cancellationToken);
+ }
+
+ private HttpClient CreateClient()
+ {
+ var handler = new HttpClientHandler();
+ handler.ClientCertificates.AddRange(_certificates);
+
+ var httpClient = new HttpClient(handler);
+
+ if (_connectTimeout > 0)
+ {
+ httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout);
+ }
+
+ httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift"));
+ httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("THttpClientTransport", "0.13.0"));
+
+ if (CustomHeaders != null)
+ {
+ foreach (var item in CustomHeaders)
+ {
+ httpClient.DefaultRequestHeaders.Add(item.Key, item.Value);
+ }
+ }
+
+ return httpClient;
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ try
+ {
+ if (_outputStream.CanSeek)
+ {
+ _outputStream.Seek(0, SeekOrigin.Begin);
+ }
+
+ using (var outStream = new StreamContent(_outputStream))
+ {
+ var msg = await _httpClient.PostAsync(_uri, outStream, cancellationToken);
+
+ msg.EnsureSuccessStatusCode();
+
+ if (_inputStream != null)
+ {
+ _inputStream.Dispose();
+ _inputStream = null;
+ }
+
+ _inputStream = await msg.Content.ReadAsStreamAsync();
+ if (_inputStream.CanSeek)
+ {
+ _inputStream.Seek(0, SeekOrigin.Begin);
+ }
+ }
+ }
+ catch (IOException iox)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString());
+ }
+ catch (HttpRequestException wx)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.Unknown,
+ "Couldn't connect to server: " + wx);
+ }
+ }
+ finally
+ {
+ _outputStream = new MemoryStream();
+ }
+ }
+
+ // IDisposable
+ protected override void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ _inputStream?.Dispose();
+ _outputStream?.Dispose();
+ _httpClient?.Dispose();
+ }
+ }
+ _isDisposed = true;
+ }
+ }
+}
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs
new file mode 100644
index 000000000..46a55a64a
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TMemoryBufferClientTransport.cs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ // ReSharper disable once InconsistentNaming
+ public class TMemoryBufferClientTransport : TClientTransport
+ {
+ private readonly MemoryStream _byteStream;
+ private bool _isDisposed;
+
+ public TMemoryBufferClientTransport()
+ {
+ _byteStream = new MemoryStream();
+ }
+
+ public TMemoryBufferClientTransport(byte[] buf)
+ {
+ _byteStream = new MemoryStream(buf);
+ }
+
+ public override bool IsOpen => true;
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+ }
+
+ public override void Close()
+ {
+ /** do nothing **/
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
+ CancellationToken cancellationToken)
+ {
+ return await _byteStream.ReadAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
+ {
+ await _byteStream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ await _byteStream.WriteAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+ }
+
+ public byte[] GetBuffer()
+ {
+ return _byteStream.ToArray();
+ }
+
+ // IDisposable
+ protected override void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ _byteStream?.Dispose();
+ }
+ }
+ _isDisposed = true;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs
new file mode 100644
index 000000000..f5e4baf4a
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TNamedPipeClientTransport.cs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO.Pipes;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ // ReSharper disable once InconsistentNaming
+ public class TNamedPipeClientTransport : TClientTransport
+ {
+ private NamedPipeClientStream _client;
+
+ public TNamedPipeClientTransport(string pipe) : this(".", pipe)
+ {
+ }
+
+ public TNamedPipeClientTransport(string server, string pipe)
+ {
+ var serverName = string.IsNullOrWhiteSpace(server) ? server : ".";
+
+ _client = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None);
+ }
+
+ public override bool IsOpen => _client != null && _client.IsConnected;
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ if (IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen);
+ }
+
+ await _client.ConnectAsync(cancellationToken);
+ }
+
+ public override void Close()
+ {
+ if (_client != null)
+ {
+ _client.Dispose();
+ _client = null;
+ }
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
+ CancellationToken cancellationToken)
+ {
+ if (_client == null)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ return await _client.ReadAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ if (_client == null)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen);
+ }
+
+ await _client.WriteAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ _client.Dispose();
+ }
+ }
+} \ No newline at end of file
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs
new file mode 100644
index 000000000..e769d1421
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TSocketClientTransport.cs
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ // ReSharper disable once InconsistentNaming
+ public class TSocketClientTransport : TStreamClientTransport
+ {
+ private bool _isDisposed;
+
+ public TSocketClientTransport(TcpClient client)
+ {
+ TcpClient = client ?? throw new ArgumentNullException(nameof(client));
+
+ if (IsOpen)
+ {
+ InputStream = client.GetStream();
+ OutputStream = client.GetStream();
+ }
+ }
+
+ public TSocketClientTransport(IPAddress host, int port)
+ : this(host, port, 0)
+ {
+ }
+
+ public TSocketClientTransport(IPAddress host, int port, int timeout)
+ {
+ Host = host;
+ Port = port;
+
+ TcpClient = new TcpClient();
+ TcpClient.ReceiveTimeout = TcpClient.SendTimeout = timeout;
+ TcpClient.Client.NoDelay = true;
+ }
+
+ public TcpClient TcpClient { get; private set; }
+ public IPAddress Host { get; }
+ public int Port { get; }
+
+ public int Timeout
+ {
+ set
+ {
+ if (TcpClient != null)
+ {
+ TcpClient.ReceiveTimeout = TcpClient.SendTimeout = value;
+ }
+ }
+ }
+
+ public override bool IsOpen
+ {
+ get
+ {
+ if (TcpClient == null)
+ {
+ return false;
+ }
+
+ return TcpClient.Connected;
+ }
+ }
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+
+ if (IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen, "Socket already connected");
+ }
+
+ if (Port <= 0)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open without port");
+ }
+
+ if (TcpClient == null)
+ {
+ throw new InvalidOperationException("Invalid or not initialized tcp client");
+ }
+
+ await TcpClient.ConnectAsync(Host, Port);
+
+ InputStream = TcpClient.GetStream();
+ OutputStream = TcpClient.GetStream();
+ }
+
+ public override void Close()
+ {
+ base.Close();
+
+ if (TcpClient != null)
+ {
+ TcpClient.Dispose();
+ TcpClient = null;
+ }
+ }
+
+ // IDisposable
+ protected override void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ TcpClient?.Dispose();
+
+ base.Dispose(disposing);
+ }
+ }
+ _isDisposed = true;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs
new file mode 100644
index 000000000..be2011e83
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TStreamClientTransport.cs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ // ReSharper disable once InconsistentNaming
+ public class TStreamClientTransport : TClientTransport
+ {
+ private bool _isDisposed;
+
+ protected TStreamClientTransport()
+ {
+ }
+
+ public TStreamClientTransport(Stream inputStream, Stream outputStream)
+ {
+ InputStream = inputStream;
+ OutputStream = outputStream;
+ }
+
+ protected Stream OutputStream { get; set; }
+
+ protected Stream InputStream { get; set; }
+
+ public override bool IsOpen => true;
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+ }
+
+ public override void Close()
+ {
+ if (InputStream != null)
+ {
+ InputStream.Dispose();
+ InputStream = null;
+ }
+
+ if (OutputStream != null)
+ {
+ OutputStream.Dispose();
+ OutputStream = null;
+ }
+ }
+
+ public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
+ CancellationToken cancellationToken)
+ {
+ if (InputStream == null)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen,
+ "Cannot read from null inputstream");
+ }
+
+ return await InputStream.ReadAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ if (OutputStream == null)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen,
+ "Cannot write to null outputstream");
+ }
+
+ await OutputStream.WriteAsync(buffer, offset, length, cancellationToken);
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ await OutputStream.FlushAsync(cancellationToken);
+ }
+
+ // IDisposable
+ protected override void Dispose(bool disposing)
+ {
+ if (!_isDisposed)
+ {
+ if (disposing)
+ {
+ InputStream?.Dispose();
+ OutputStream?.Dispose();
+ }
+ }
+ _isDisposed = true;
+ }
+ }
+} \ No newline at end of file
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs
new file mode 100644
index 000000000..c8be4ede1
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Client/TTlsSocketClientTransport.cs
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Net;
+using System.Net.Security;
+using System.Net.Sockets;
+using System.Security.Authentication;
+using System.Security.Cryptography.X509Certificates;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transports.Client
+{
+ //TODO: check for correct work
+
+ // ReSharper disable once InconsistentNaming
+ public class TTlsSocketClientTransport : TStreamClientTransport
+ {
+ private readonly X509Certificate2 _certificate;
+ private readonly RemoteCertificateValidationCallback _certValidator;
+ private readonly IPAddress _host;
+ private readonly bool _isServer;
+ private readonly LocalCertificateSelectionCallback _localCertificateSelectionCallback;
+ private readonly int _port;
+ private readonly SslProtocols _sslProtocols;
+ private TcpClient _client;
+ private SslStream _secureStream;
+ private int _timeout;
+
+ public TTlsSocketClientTransport(TcpClient client, X509Certificate2 certificate, bool isServer = false,
+ RemoteCertificateValidationCallback certValidator = null,
+ LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
+ SslProtocols sslProtocols = SslProtocols.Tls12)
+ {
+ _client = client;
+ _certificate = certificate;
+ _certValidator = certValidator;
+ _localCertificateSelectionCallback = localCertificateSelectionCallback;
+ _sslProtocols = sslProtocols;
+ _isServer = isServer;
+
+ if (isServer && certificate == null)
+ {
+ throw new ArgumentException("TTlsSocketClientTransport needs certificate to be used for server",
+ nameof(certificate));
+ }
+
+ if (IsOpen)
+ {
+ InputStream = client.GetStream();
+ OutputStream = client.GetStream();
+ }
+ }
+
+ public TTlsSocketClientTransport(IPAddress host, int port, string certificatePath,
+ RemoteCertificateValidationCallback certValidator = null,
+ LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
+ SslProtocols sslProtocols = SslProtocols.Tls12)
+ : this(host, port, 0,
+ new X509Certificate2(certificatePath),
+ certValidator,
+ localCertificateSelectionCallback,
+ sslProtocols)
+ {
+ }
+
+ public TTlsSocketClientTransport(IPAddress host, int port,
+ X509Certificate2 certificate = null,
+ RemoteCertificateValidationCallback certValidator = null,
+ LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
+ SslProtocols sslProtocols = SslProtocols.Tls12)
+ : this(host, port, 0,
+ certificate,
+ certValidator,
+ localCertificateSelectionCallback,
+ sslProtocols)
+ {
+ }
+
+ public TTlsSocketClientTransport(IPAddress host, int port, int timeout,
+ X509Certificate2 certificate,
+ RemoteCertificateValidationCallback certValidator = null,
+ LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
+ SslProtocols sslProtocols = SslProtocols.Tls12)
+ {
+ _host = host;
+ _port = port;
+ _timeout = timeout;
+ _certificate = certificate;
+ _certValidator = certValidator;
+ _localCertificateSelectionCallback = localCertificateSelectionCallback;
+ _sslProtocols = sslProtocols;
+
+ InitSocket();
+ }
+
+ public int Timeout
+ {
+ set { _client.ReceiveTimeout = _client.SendTimeout = _timeout = value; }
+ }
+
+ public TcpClient TcpClient => _client;
+
+ public IPAddress Host => _host;
+
+ public int Port => _port;
+
+ public override bool IsOpen
+ {
+ get
+ {
+ if (_client == null)
+ {
+ return false;
+ }
+
+ return _client.Connected;
+ }
+ }
+
+ private void InitSocket()
+ {
+ _client = new TcpClient();
+ _client.ReceiveTimeout = _client.SendTimeout = _timeout;
+ _client.Client.NoDelay = true;
+ }
+
+ private bool DefaultCertificateValidator(object sender, X509Certificate certificate, X509Chain chain,
+ SslPolicyErrors sslValidationErrors)
+ {
+ return sslValidationErrors == SslPolicyErrors.None;
+ }
+
+ public override async Task OpenAsync(CancellationToken cancellationToken)
+ {
+ if (IsOpen)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen, "Socket already connected");
+ }
+
+ if (_host == null)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open null host");
+ }
+
+ if (_port <= 0)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open without port");
+ }
+
+ if (_client == null)
+ {
+ InitSocket();
+ }
+
+ if (_client != null)
+ {
+ await _client.ConnectAsync(_host, _port);
+ await SetupTlsAsync();
+ }
+ }
+
+ public async Task SetupTlsAsync()
+ {
+ var validator = _certValidator ?? DefaultCertificateValidator;
+
+ if (_localCertificateSelectionCallback != null)
+ {
+ _secureStream = new SslStream(_client.GetStream(), false, validator, _localCertificateSelectionCallback);
+ }
+ else
+ {
+ _secureStream = new SslStream(_client.GetStream(), false, validator);
+ }
+
+ try
+ {
+ if (_isServer)
+ {
+ // Server authentication
+ await
+ _secureStream.AuthenticateAsServerAsync(_certificate, _certValidator != null, _sslProtocols,
+ true);
+ }
+ else
+ {
+ // Client authentication
+ var certs = _certificate != null
+ ? new X509CertificateCollection {_certificate}
+ : new X509CertificateCollection();
+
+ var targetHost = _host.ToString();
+ await _secureStream.AuthenticateAsClientAsync(targetHost, certs, _sslProtocols, true);
+ }
+ }
+ catch (Exception)
+ {
+ Close();
+ throw;
+ }
+
+ InputStream = _secureStream;
+ OutputStream = _secureStream;
+ }
+
+ public override void Close()
+ {
+ base.Close();
+ if (_client != null)
+ {
+ _client.Dispose();
+ _client = null;
+ }
+
+ if (_secureStream != null)
+ {
+ _secureStream.Dispose();
+ _secureStream = null;
+ }
+ }
+ }
+} \ No newline at end of file