diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/lib/netstd/Thrift/Processor | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
4 files changed, 237 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs new file mode 100644 index 000000000..f5b8d16e3 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; + +namespace Thrift.Processor +{ + public interface ITAsyncProcessor + { + Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken = default(CancellationToken)); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITProcessorFactory.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITProcessorFactory.cs new file mode 100644 index 000000000..e0fe3d0a8 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITProcessorFactory.cs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Thrift.Server; +using Thrift.Transport; + +namespace Thrift.Processor +{ + // ReSharper disable once InconsistentNaming + public interface ITProcessorFactory + { + ITAsyncProcessor GetAsyncProcessor(TTransport trans, TServer baseServer = null); + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs new file mode 100644 index 000000000..e5aeaa6c7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; +using Thrift.Protocol.Entities; + +namespace Thrift.Processor +{ + // ReSharper disable once InconsistentNaming + public class TMultiplexedProcessor : ITAsyncProcessor + { + //TODO: Localization + + private readonly Dictionary<string, ITAsyncProcessor> _serviceProcessorMap = + new Dictionary<string, ITAsyncProcessor>(); + + public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot) + { + return await ProcessAsync(iprot, oprot, CancellationToken.None); + } + + public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<bool>(cancellationToken); + } + + try + { + var message = await iprot.ReadMessageBeginAsync(cancellationToken); + + if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway)) + { + await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType, + "Message exType CALL or ONEWAY expected", cancellationToken); + return false; + } + + // Extract the service name + var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal); + if (index < 0) + { + await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol, + $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?", + cancellationToken); + return false; + } + + // Create a new TMessage, something that can be consumed by any TProtocol + var serviceName = message.Name.Substring(0, index); + ITAsyncProcessor actualProcessor; + if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor)) + { + await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError, + $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?", + cancellationToken); + return false; + } + + // Create a new TMessage, removing the service name + var newMessage = new TMessage( + message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length), + message.Type, + message.SeqID); + + // Dispatch processing to the stored processor + return + await + actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot, + cancellationToken); + } + catch (IOException) + { + return false; // similar to all other processors + } + } + + public void RegisterProcessor(string serviceName, ITAsyncProcessor processor) + { + if (_serviceProcessorMap.ContainsKey(serviceName)) + { + throw new InvalidOperationException( + $"Processor map already contains processor with name: '{serviceName}'"); + } + + _serviceProcessorMap.Add(serviceName, processor); + } + + private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype, + string etxt, CancellationToken cancellationToken) + { + var appex = new TApplicationException(extype, etxt); + + var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID); + + await oprot.WriteMessageBeginAsync(newMessage, cancellationToken); + await appex.WriteAsync(oprot, cancellationToken); + await oprot.WriteMessageEndAsync(cancellationToken); + await oprot.Transport.FlushAsync(cancellationToken); + } + + private class StoredMessageProtocol : TProtocolDecorator + { + readonly TMessage _msgBegin; + + public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) + : base(protocol) + { + _msgBegin = messageBegin; + } + + public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TMessage>(cancellationToken); + } + + return _msgBegin; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TSingletonProcessorFactory.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TSingletonProcessorFactory.cs new file mode 100644 index 000000000..97ecff65c --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TSingletonProcessorFactory.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Thrift.Server; +using Thrift.Transport; + +namespace Thrift.Processor +{ + // ReSharper disable once InconsistentNaming + public class TSingletonProcessorFactory : ITProcessorFactory + { + private readonly ITAsyncProcessor _asyncProcessor; + + public TSingletonProcessorFactory(ITAsyncProcessor asyncProcessor) + { + _asyncProcessor = asyncProcessor; + } + + public ITAsyncProcessor GetAsyncProcessor(TTransport trans, TServer baseServer = null) + { + return _asyncProcessor; + } + } +}
\ No newline at end of file |