From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../netstd/Thrift/Server/TThreadPoolAsyncServer.cs | 297 +++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs (limited to 'src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs') diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs new file mode 100644 index 000000000..20e659d3a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Contains some contributions under the Thrift Software License. + * Please see doc/old-thrift-license.txt in the Thrift distribution for + * details. + */ + +using System; +using System.Threading; +using Thrift.Protocol; +using Thrift.Transport; +using Thrift.Processor; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Thrift.Server +{ + /// + /// Server that uses C# built-in ThreadPool to spawn threads when handling requests. + /// + public class TThreadPoolAsyncServer : TServer + { + private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults + private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults + private volatile bool stop = false; + + private CancellationToken ServerCancellationToken; + + public struct Configuration + { + public int MinWorkerThreads; + public int MaxWorkerThreads; + public int MinIOThreads; + public int MaxIOThreads; + + public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS) + { + MinWorkerThreads = min; + MaxWorkerThreads = max; + MinIOThreads = min; + MaxIOThreads = max; + } + + public Configuration(int minWork, int maxWork, int minIO, int maxIO) + { + MinWorkerThreads = minWork; + MaxWorkerThreads = maxWork; + MinIOThreads = minIO; + MaxIOThreads = maxIO; + } + } + + public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null) + : this(new TSingletonProcessorFactory(processor), serverTransport, + null, null, // defaults to TTransportFactory() + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + new Configuration(), logger) + { + } + + public TThreadPoolAsyncServer(ITAsyncProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(new TSingletonProcessorFactory(processor), serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + new Configuration()) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + new Configuration()) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null) + : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + new Configuration(minThreadPoolThreads, maxThreadPoolThreads), + logger) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Configuration threadConfig, + ILogger logger = null) + : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, logger) + { + lock (typeof(TThreadPoolAsyncServer)) + { + if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0)) + { + int work, comm; + ThreadPool.GetMaxThreads(out work, out comm); + if (threadConfig.MaxWorkerThreads > 0) + work = threadConfig.MaxWorkerThreads; + if (threadConfig.MaxIOThreads > 0) + comm = threadConfig.MaxIOThreads; + if (!ThreadPool.SetMaxThreads(work, comm)) + throw new Exception("Error: could not SetMaxThreads in ThreadPool"); + } + + if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0)) + { + int work, comm; + ThreadPool.GetMinThreads(out work, out comm); + if (threadConfig.MinWorkerThreads > 0) + work = threadConfig.MinWorkerThreads; + if (threadConfig.MinIOThreads > 0) + comm = threadConfig.MinIOThreads; + if (!ThreadPool.SetMinThreads(work, comm)) + throw new Exception("Error: could not SetMinThreads in ThreadPool"); + } + } + } + + + /// + /// Use new ThreadPool thread for each new client connection. + /// + public override async Task ServeAsync(CancellationToken cancellationToken) + { + ServerCancellationToken = cancellationToken; + try + { + try + { + ServerTransport.Listen(); + } + catch (TTransportException ttx) + { + LogError("Error, could not listen on ServerTransport: " + ttx); + return; + } + + //Fire the preServe server event when server is up but before any client connections + if (ServerEventHandler != null) + await ServerEventHandler.PreServeAsync(cancellationToken); + + while (!stop) + { + int failureCount = 0; + try + { + TTransport client = await ServerTransport.AcceptAsync(cancellationToken); + ThreadPool.QueueUserWorkItem(this.Execute, client); + } + catch (TTransportException ttx) + { + if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted) + { + ++failureCount; + LogError(ttx.ToString()); + } + + } + } + + if (stop) + { + try + { + ServerTransport.Close(); + } + catch (TTransportException ttx) + { + LogError("TServerTransport failed on close: " + ttx.Message); + } + stop = false; + } + + } + finally + { + ServerCancellationToken = default(CancellationToken); + } + } + + /// + /// Loops on processing a client forever + /// threadContext will be a TTransport instance + /// + /// + private void Execute(object threadContext) + { + var cancellationToken = ServerCancellationToken; + + using (TTransport client = (TTransport)threadContext) + { + ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this); + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + object connectionContext = null; + try + { + try + { + inputTransport = InputTransportFactory.GetTransport(client); + outputTransport = OutputTransportFactory.GetTransport(client); + inputProtocol = InputProtocolFactory.GetProtocol(inputTransport); + outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport); + + //Recover event handler (if any) and fire createContext server event when a client connects + if (ServerEventHandler != null) + connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result; + + //Process client requests until client disconnects + while (!stop) + { + if (! inputTransport.PeekAsync(cancellationToken).Result) + break; + + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. + if (ServerEventHandler != null) + ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait(); + //Process client request (blocks until transport is readable) + if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result) + break; + } + } + catch (TTransportException) + { + //Usually a client disconnect, expected + } + catch (Exception x) + { + //Unexpected + LogError("Error: " + x); + } + + //Fire deleteContext server event after client disconnects + if (ServerEventHandler != null) + ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait(); + + } + finally + { + //Close transports + inputTransport?.Close(); + outputTransport?.Close(); + + // disposable stuff should be disposed + inputProtocol?.Dispose(); + outputProtocol?.Dispose(); + inputTransport?.Dispose(); + outputTransport?.Dispose(); + } + } + } + + public override void Stop() + { + stop = true; + ServerTransport?.Close(); + } + } +} -- cgit v1.2.3