From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../thrift/lib/netstd/Thrift/Server/TServer.cs | 87 ++++++ .../netstd/Thrift/Server/TServerEventHandler.cs | 54 ++++ .../lib/netstd/Thrift/Server/TSimpleAsyncServer.cs | 230 ++++++++++++++++ .../netstd/Thrift/Server/TThreadPoolAsyncServer.cs | 297 +++++++++++++++++++++ 4 files changed, 668 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs create mode 100644 src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs create mode 100644 src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs create mode 100644 src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs (limited to 'src/jaegertracing/thrift/lib/netstd/Thrift/Server') diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs new file mode 100644 index 000000000..f40f2b7e7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Thrift.Protocol; +using Thrift.Transport; +using Thrift.Processor; + +namespace Thrift.Server +{ + // ReSharper disable once InconsistentNaming + public abstract class TServer + { + protected readonly ILogger Logger; + protected TProtocolFactory InputProtocolFactory; + protected TTransportFactory InputTransportFactory; + protected ITProcessorFactory ProcessorFactory; + protected TProtocolFactory OutputProtocolFactory; + protected TTransportFactory OutputTransportFactory; + + protected TServerEventHandler ServerEventHandler; + protected TServerTransport ServerTransport; + + protected TServer(ITProcessorFactory processorFactory, TServerTransport serverTransport, + TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, + ILogger logger = null) + { + ProcessorFactory = processorFactory ?? throw new ArgumentNullException(nameof(processorFactory)); + ServerTransport = serverTransport; + InputTransportFactory = inputTransportFactory ?? new TTransportFactory(); + OutputTransportFactory = outputTransportFactory ?? new TTransportFactory(); + InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory)); + OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory)); + Logger = logger; // null is absolutely legal + } + + public void SetEventHandler(TServerEventHandler seh) + { + ServerEventHandler = seh; + } + + public TServerEventHandler GetEventHandler() + { + return ServerEventHandler; + } + + // Log delegation? deprecated, use ILogger + protected void LogError( string msg) + { + if (Logger != null) + Logger.LogError(msg); + } + + public abstract void Stop(); + + public virtual void Start() + { + // do nothing + } + + public virtual async Task ServeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs new file mode 100644 index 000000000..69314efd6 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; +using Thrift.Transport; + +namespace Thrift.Server +{ + //TODO: replacement by event? + + /// + /// Interface implemented by server users to handle events from the server + /// + // ReSharper disable once InconsistentNaming + public interface TServerEventHandler + { + /// + /// Called before the server begins */ + /// + Task PreServeAsync(CancellationToken cancellationToken); + + /// + /// Called when a new client has connected and is about to being processing */ + /// + Task CreateContextAsync(TProtocol input, TProtocol output, CancellationToken cancellationToken); + + /// + /// Called when a client has finished request-handling to delete server context */ + /// + Task DeleteContextAsync(object serverContext, TProtocol input, TProtocol output, + CancellationToken cancellationToken); + + /// + /// Called when a client is about to call the processor */ + /// + Task ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs new file mode 100644 index 000000000..bdaa3489c --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Thrift.Protocol; +using Thrift.Processor; +using Thrift.Transport; + +namespace Thrift.Server +{ + //TODO: unhandled exceptions, etc. + + // ReSharper disable once InconsistentNaming + public class TSimpleAsyncServer : TServer + { + private readonly int _clientWaitingDelay; + private volatile Task _serverTask; + + public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + ILogger logger, + int clientWaitingDelay = 10) + : base(itProcessorFactory, + serverTransport, + inputTransportFactory, + outputTransportFactory, + inputProtocolFactory, + outputProtocolFactory, + logger) + { + _clientWaitingDelay = clientWaitingDelay; + } + + public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + ILoggerFactory loggerFactory, + int clientWaitingDelay = 10) + : this(itProcessorFactory, + serverTransport, + inputTransportFactory, + outputTransportFactory, + inputProtocolFactory, + outputProtocolFactory, + loggerFactory.CreateLogger()) + { + } + + public TSimpleAsyncServer(ITAsyncProcessor processor, + TServerTransport serverTransport, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + ILoggerFactory loggerFactory, + int clientWaitingDelay = 10) + : this(new TSingletonProcessorFactory(processor), + serverTransport, + null, // defaults to TTransportFactory() + null, // defaults to TTransportFactory() + inputProtocolFactory, + outputProtocolFactory, + loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)), + clientWaitingDelay) + { + } + + public override async Task ServeAsync(CancellationToken cancellationToken) + { + try + { + // cancelation token + _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning); + await _serverTask; + } + catch (Exception ex) + { + Logger.LogError(ex.ToString()); + } + } + + private async Task StartListening(CancellationToken cancellationToken) + { + ServerTransport.Listen(); + + Logger.LogTrace("Started listening at server"); + + if (ServerEventHandler != null) + { + await ServerEventHandler.PreServeAsync(cancellationToken); + } + + while (!cancellationToken.IsCancellationRequested) + { + if (ServerTransport.IsClientPending()) + { + Logger.LogTrace("Waiting for client connection"); + + try + { + var client = await ServerTransport.AcceptAsync(cancellationToken); + await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken); + } + catch (TTransportException ttx) + { + Logger.LogTrace($"Transport exception: {ttx}"); + + if (ttx.Type != TTransportException.ExceptionType.Interrupted) + { + Logger.LogError(ttx.ToString()); + } + } + } + else + { + try + { + await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken); + } + catch (TaskCanceledException) { } + } + } + + ServerTransport.Close(); + + Logger.LogTrace("Completed listening at server"); + } + + public override void Stop() + { + } + + private async Task Execute(TTransport client, CancellationToken cancellationToken) + { + Logger.LogTrace("Started client request processing"); + + var processor = ProcessorFactory.GetAsyncProcessor(client, this); + + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + object connectionContext = null; + + try + { + try + { + inputTransport = InputTransportFactory.GetTransport(client); + outputTransport = OutputTransportFactory.GetTransport(client); + inputProtocol = InputProtocolFactory.GetProtocol(inputTransport); + outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport); + + if (ServerEventHandler != null) + { + connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken); + } + + while (!cancellationToken.IsCancellationRequested) + { + if (!await inputTransport.PeekAsync(cancellationToken)) + { + break; + } + + if (ServerEventHandler != null) + { + await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken); + } + + if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken)) + { + break; + } + } + } + catch (TTransportException ttx) + { + Logger.LogTrace($"Transport exception: {ttx}"); + } + catch (Exception x) + { + Logger.LogError($"Error: {x}"); + } + + if (ServerEventHandler != null) + { + await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken); + } + + } + finally + { + //Close transports + inputTransport?.Close(); + outputTransport?.Close(); + + // disposable stuff should be disposed + inputProtocol?.Dispose(); + outputProtocol?.Dispose(); + inputTransport?.Dispose(); + outputTransport?.Dispose(); + } + + Logger.LogTrace("Completed client request processing"); + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs new file mode 100644 index 000000000..20e659d3a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Contains some contributions under the Thrift Software License. + * Please see doc/old-thrift-license.txt in the Thrift distribution for + * details. + */ + +using System; +using System.Threading; +using Thrift.Protocol; +using Thrift.Transport; +using Thrift.Processor; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Thrift.Server +{ + /// + /// Server that uses C# built-in ThreadPool to spawn threads when handling requests. + /// + public class TThreadPoolAsyncServer : TServer + { + private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults + private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults + private volatile bool stop = false; + + private CancellationToken ServerCancellationToken; + + public struct Configuration + { + public int MinWorkerThreads; + public int MaxWorkerThreads; + public int MinIOThreads; + public int MaxIOThreads; + + public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS) + { + MinWorkerThreads = min; + MaxWorkerThreads = max; + MinIOThreads = min; + MaxIOThreads = max; + } + + public Configuration(int minWork, int maxWork, int minIO, int maxIO) + { + MinWorkerThreads = minWork; + MaxWorkerThreads = maxWork; + MinIOThreads = minIO; + MaxIOThreads = maxIO; + } + } + + public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null) + : this(new TSingletonProcessorFactory(processor), serverTransport, + null, null, // defaults to TTransportFactory() + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + new Configuration(), logger) + { + } + + public TThreadPoolAsyncServer(ITAsyncProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(new TSingletonProcessorFactory(processor), serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + new Configuration()) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + new Configuration()) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null) + : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + new Configuration(minThreadPoolThreads, maxThreadPoolThreads), + logger) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Configuration threadConfig, + ILogger logger = null) + : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, logger) + { + lock (typeof(TThreadPoolAsyncServer)) + { + if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0)) + { + int work, comm; + ThreadPool.GetMaxThreads(out work, out comm); + if (threadConfig.MaxWorkerThreads > 0) + work = threadConfig.MaxWorkerThreads; + if (threadConfig.MaxIOThreads > 0) + comm = threadConfig.MaxIOThreads; + if (!ThreadPool.SetMaxThreads(work, comm)) + throw new Exception("Error: could not SetMaxThreads in ThreadPool"); + } + + if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0)) + { + int work, comm; + ThreadPool.GetMinThreads(out work, out comm); + if (threadConfig.MinWorkerThreads > 0) + work = threadConfig.MinWorkerThreads; + if (threadConfig.MinIOThreads > 0) + comm = threadConfig.MinIOThreads; + if (!ThreadPool.SetMinThreads(work, comm)) + throw new Exception("Error: could not SetMinThreads in ThreadPool"); + } + } + } + + + /// + /// Use new ThreadPool thread for each new client connection. + /// + public override async Task ServeAsync(CancellationToken cancellationToken) + { + ServerCancellationToken = cancellationToken; + try + { + try + { + ServerTransport.Listen(); + } + catch (TTransportException ttx) + { + LogError("Error, could not listen on ServerTransport: " + ttx); + return; + } + + //Fire the preServe server event when server is up but before any client connections + if (ServerEventHandler != null) + await ServerEventHandler.PreServeAsync(cancellationToken); + + while (!stop) + { + int failureCount = 0; + try + { + TTransport client = await ServerTransport.AcceptAsync(cancellationToken); + ThreadPool.QueueUserWorkItem(this.Execute, client); + } + catch (TTransportException ttx) + { + if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted) + { + ++failureCount; + LogError(ttx.ToString()); + } + + } + } + + if (stop) + { + try + { + ServerTransport.Close(); + } + catch (TTransportException ttx) + { + LogError("TServerTransport failed on close: " + ttx.Message); + } + stop = false; + } + + } + finally + { + ServerCancellationToken = default(CancellationToken); + } + } + + /// + /// Loops on processing a client forever + /// threadContext will be a TTransport instance + /// + /// + private void Execute(object threadContext) + { + var cancellationToken = ServerCancellationToken; + + using (TTransport client = (TTransport)threadContext) + { + ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this); + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + object connectionContext = null; + try + { + try + { + inputTransport = InputTransportFactory.GetTransport(client); + outputTransport = OutputTransportFactory.GetTransport(client); + inputProtocol = InputProtocolFactory.GetProtocol(inputTransport); + outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport); + + //Recover event handler (if any) and fire createContext server event when a client connects + if (ServerEventHandler != null) + connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result; + + //Process client requests until client disconnects + while (!stop) + { + if (! inputTransport.PeekAsync(cancellationToken).Result) + break; + + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. + if (ServerEventHandler != null) + ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait(); + //Process client request (blocks until transport is readable) + if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result) + break; + } + } + catch (TTransportException) + { + //Usually a client disconnect, expected + } + catch (Exception x) + { + //Unexpected + LogError("Error: " + x); + } + + //Fire deleteContext server event after client disconnects + if (ServerEventHandler != null) + ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait(); + + } + finally + { + //Close transports + inputTransport?.Close(); + outputTransport?.Close(); + + // disposable stuff should be disposed + inputProtocol?.Dispose(); + outputProtocol?.Dispose(); + inputTransport?.Dispose(); + outputTransport?.Dispose(); + } + } + } + + public override void Stop() + { + stop = true; + ServerTransport?.Close(); + } + } +} -- cgit v1.2.3