diff options
Diffstat (limited to '')
5 files changed, 790 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs new file mode 100644 index 000000000..032063a37 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/THttpServerTransport.cs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; +using Thrift.Protocols; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class THttpServerTransport + { + protected const string ContentType = "application/x-thrift"; + private readonly ILogger _logger; + private readonly RequestDelegate _next; + protected Encoding Encoding = Encoding.UTF8; + + protected ITProtocolFactory InputProtocolFactory; + protected ITProtocolFactory OutputProtocolFactory; + + protected ITAsyncProcessor Processor; + + public THttpServerTransport(ITAsyncProcessor processor, RequestDelegate next, ILoggerFactory loggerFactory) + : this(processor, new TBinaryProtocol.Factory(), next, loggerFactory) + { + } + + public THttpServerTransport(ITAsyncProcessor processor, ITProtocolFactory protocolFactory, RequestDelegate next, + ILoggerFactory loggerFactory) + : this(processor, protocolFactory, protocolFactory, next, loggerFactory) + { + } + + public THttpServerTransport(ITAsyncProcessor processor, ITProtocolFactory inputProtocolFactory, + ITProtocolFactory outputProtocolFactory, RequestDelegate next, ILoggerFactory loggerFactory) + { + if (loggerFactory == null) + { + throw new ArgumentNullException(nameof(loggerFactory)); + } + + Processor = processor ?? throw new ArgumentNullException(nameof(processor)); + InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory)); + OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory)); + + _next = next; + _logger = loggerFactory.CreateLogger<THttpServerTransport>(); + } + + public async Task Invoke(HttpContext context) + { + context.Response.ContentType = ContentType; + await ProcessRequestAsync(context, context.RequestAborted); //TODO: check for correct logic + } + + public async Task ProcessRequestAsync(HttpContext context, CancellationToken cancellationToken) + { + var transport = new TStreamClientTransport(context.Request.Body, context.Response.Body); + + try + { + var input = InputProtocolFactory.GetProtocol(transport); + var output = OutputProtocolFactory.GetProtocol(transport); + + while (await Processor.ProcessAsync(input, output, cancellationToken)) + { + } + } + catch (TTransportException) + { + // Client died, just move on + } + finally + { + transport.Close(); + } + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs new file mode 100644 index 000000000..186786ed2 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TNamedPipeServerTransport.cs @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO.Pipes; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TNamedPipeServerTransport : TServerTransport + { + /// <summary> + /// This is the address of the Pipe on the localhost. + /// </summary> + private readonly string _pipeAddress; + + private bool _asyncMode = true; + private volatile bool _isPending = true; + + private NamedPipeServerStream _stream = null; + + public TNamedPipeServerTransport(string pipeAddress) + { + _pipeAddress = pipeAddress; + } + + public override void Listen() + { + // nothing to do here + } + + public override void Close() + { + if (_stream != null) + { + try + { + //TODO: check for disconection + _stream.Disconnect(); + _stream.Dispose(); + } + finally + { + _stream = null; + _isPending = false; + } + } + } + + public override bool IsClientPending() + { + return _isPending; + } + + private void EnsurePipeInstance() + { + if (_stream == null) + { + var direction = PipeDirection.InOut; + var maxconn = 254; + var mode = PipeTransmissionMode.Byte; + var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; + var inbuf = 4096; + var outbuf = 4096; + // TODO: security + + try + { + _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); + } + catch (NotImplementedException) // Mono still does not support async, fallback to sync + { + if (_asyncMode) + { + options &= (~PipeOptions.Asynchronous); + _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, + outbuf); + _asyncMode = false; + } + else + { + throw; + } + } + } + } + + protected override async Task<TClientTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + try + { + EnsurePipeInstance(); + + await _stream.WaitForConnectionAsync(cancellationToken); + + var trans = new ServerTransport(_stream); + _stream = null; // pass ownership to ServerTransport + + //_isPending = false; + + return trans; + } + catch (TTransportException) + { + Close(); + throw; + } + catch (Exception e) + { + Close(); + throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message); + } + } + + private class ServerTransport : TClientTransport + { + private readonly NamedPipeServerStream _stream; + + public ServerTransport(NamedPipeServerStream stream) + { + _stream = stream; + } + + public override bool IsOpen => _stream != null && _stream.IsConnected; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + _stream?.Dispose(); + } + + public override async Task<int> ReadAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (_stream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + return await _stream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, + CancellationToken cancellationToken) + { + if (_stream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + await _stream.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + protected override void Dispose(bool disposing) + { + _stream?.Dispose(); + } + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs new file mode 100644 index 000000000..0b86e9ebb --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerFramedTransport.cs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TServerFramedTransport : TServerTransport + { + private readonly int _clientTimeout; + private readonly int _port; + private TcpListener _server; + + public TServerFramedTransport(TcpListener listener) + : this(listener, 0) + { + } + + public TServerFramedTransport(TcpListener listener, int clientTimeout) + { + _server = listener; + _clientTimeout = clientTimeout; + } + + public TServerFramedTransport(int port) + : this(port, 0) + { + } + + public TServerFramedTransport(int port, int clientTimeout) + { + _port = port; + _clientTimeout = clientTimeout; + try + { + // Make server socket + _server = new TcpListener(IPAddress.Any, _port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public override void Listen() + { + // Make sure not to block on accept + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException("Could not accept on listening socket: " + sx.Message); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async Task<TClientTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TClientTransport>(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + TFramedClientTransport tSocketTransport = null; + var tcpClient = await _server.AcceptTcpClientAsync(); + + try + { + tSocketTransport = new TFramedClientTransport(new TSocketClientTransport(tcpClient) + { + Timeout = _clientTimeout + }); + + return tSocketTransport; + } + catch (Exception) + { + if (tSocketTransport != null) + { + tSocketTransport.Dispose(); + } + else // Otherwise, clean it up ourselves. + { + ((IDisposable) tcpClient).Dispose(); + } + + throw; + } + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException("WARNING: Could not close server socket: " + ex); + } + _server = null; + } + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs new file mode 100644 index 000000000..3a9d8a17d --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TServerSocketTransport.cs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TServerSocketTransport : TServerTransport + { + private readonly int _clientTimeout; + private readonly int _port; + private readonly bool _useBufferedSockets; + private readonly bool _useFramedTransport; + private TcpListener _server; + + public TServerSocketTransport(TcpListener listener) + : this(listener, 0) + { + } + + public TServerSocketTransport(TcpListener listener, int clientTimeout) + { + _server = listener; + _clientTimeout = clientTimeout; + } + + public TServerSocketTransport(int port) + : this(port, 0) + { + } + + public TServerSocketTransport(int port, int clientTimeout) + : this(port, clientTimeout, false) + { + } + + public TServerSocketTransport(int port, int clientTimeout, bool useBufferedSockets): + this(port, clientTimeout, useBufferedSockets, false) + { + } + + public TServerSocketTransport(int port, int clientTimeout, bool useBufferedSockets, bool useFramedTransport) + { + _port = port; + _clientTimeout = clientTimeout; + _useBufferedSockets = useBufferedSockets; + _useFramedTransport = useFramedTransport; + try + { + // Make server socket + _server = new TcpListener(IPAddress.Any, _port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public override void Listen() + { + // Make sure not to block on accept + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException("Could not accept on listening socket: " + sx.Message); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async Task<TClientTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TClientTransport>(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + TClientTransport tSocketTransport = null; + var tcpClient = await _server.AcceptTcpClientAsync(); + + try + { + tSocketTransport = new TSocketClientTransport(tcpClient) + { + Timeout = _clientTimeout + }; + + if (_useBufferedSockets) + { + tSocketTransport = new TBufferedClientTransport(tSocketTransport); + } + + if (_useFramedTransport) + { + tSocketTransport = new TFramedClientTransport(tSocketTransport); + } + + return tSocketTransport; + } + catch (Exception) + { + if (tSocketTransport != null) + { + tSocketTransport.Dispose(); + } + else // Otherwise, clean it up ourselves. + { + ((IDisposable) tcpClient).Dispose(); + } + + throw; + } + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException("WARNING: Could not close server socket: " + ex); + } + _server = null; + } + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs new file mode 100644 index 000000000..759feeddd --- /dev/null +++ b/src/jaegertracing/thrift/lib/netcore/Thrift/Transports/Server/TTlsServerSocketTransport.cs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transports.Client; + +namespace Thrift.Transports.Server +{ + // ReSharper disable once InconsistentNaming + public class TTlsServerSocketTransport : TServerTransport + { + private readonly RemoteCertificateValidationCallback _clientCertValidator; + private readonly int _clientTimeout = 0; + private readonly LocalCertificateSelectionCallback _localCertificateSelectionCallback; + private readonly int _port; + private readonly X509Certificate2 _serverCertificate; + private readonly SslProtocols _sslProtocols; + private readonly bool _useBufferedSockets; + private readonly bool _useFramedTransport; + private TcpListener _server; + + public TTlsServerSocketTransport(int port, X509Certificate2 certificate) + : this(port, false, certificate) + { + } + + public TTlsServerSocketTransport( + int port, + bool useBufferedSockets, + X509Certificate2 certificate, + RemoteCertificateValidationCallback clientCertValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(port, useBufferedSockets, false, certificate, + clientCertValidator, localCertificateSelectionCallback, sslProtocols) + { + } + + public TTlsServerSocketTransport( + int port, + bool useBufferedSockets, + bool useFramedTransport, + X509Certificate2 certificate, + RemoteCertificateValidationCallback clientCertValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + if (!certificate.HasPrivateKey) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, + "Your server-certificate needs to have a private key"); + } + + _port = port; + _serverCertificate = certificate; + _useBufferedSockets = useBufferedSockets; + _useFramedTransport = useFramedTransport; + _clientCertValidator = clientCertValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + + try + { + // Create server socket + _server = new TcpListener(IPAddress.Any, _port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException($"Could not create ServerSocket on port {port}."); + } + } + + public override void Listen() + { + // Make sure accept is not blocking + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException($"Could not accept on listening socket: {sx.Message}"); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async Task<TClientTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TClientTransport>(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + var client = await _server.AcceptTcpClientAsync(); + client.SendTimeout = client.ReceiveTimeout = _clientTimeout; + + //wrap the client in an SSL Socket passing in the SSL cert + var tTlsSocket = new TTlsSocketClientTransport(client, _serverCertificate, true, _clientCertValidator, + _localCertificateSelectionCallback, _sslProtocols); + + await tTlsSocket.SetupTlsAsync(); + + TClientTransport trans = tTlsSocket; + + if (_useBufferedSockets) + { + trans = new TBufferedClientTransport(trans); + } + + if (_useFramedTransport) + { + trans = new TFramedClientTransport(trans); + } + + return trans; + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException($"WARNING: Could not close server socket: {ex}"); + } + + _server = null; + } + } + } +}
\ No newline at end of file |