diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/netstd/Thrift')
54 files changed, 8084 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Collections/TCollections.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Collections/TCollections.cs new file mode 100644 index 000000000..147bfc7d3 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Collections/TCollections.cs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections; + +namespace Thrift.Collections +{ + // ReSharper disable once InconsistentNaming + public class TCollections + { + /// <summary> + /// This will return true if the two collections are value-wise the same. + /// If the collection contains a collection, the collections will be compared using this method. + /// </summary> + public static bool Equals(IEnumerable first, IEnumerable second) + { + if (first == null && second == null) + { + return true; + } + + if (first == null || second == null) + { + return false; + } + + var fiter = first.GetEnumerator(); + var siter = second.GetEnumerator(); + + var fnext = fiter.MoveNext(); + var snext = siter.MoveNext(); + + while (fnext && snext) + { + var fenum = fiter.Current as IEnumerable; + var senum = siter.Current as IEnumerable; + + if (fenum != null && senum != null) + { + if (!Equals(fenum, senum)) + { + return false; + } + } + else if (fenum == null ^ senum == null) + { + return false; + } + else if (!Equals(fiter.Current, siter.Current)) + { + return false; + } + + fnext = fiter.MoveNext(); + snext = siter.MoveNext(); + } + + return fnext == snext; + } + + /// <summary> + /// This returns a hashcode based on the value of the enumerable. + /// </summary> + public static int GetHashCode(IEnumerable enumerable) + { + if (enumerable == null) + { + return 0; + } + + var hashcode = 0; + + foreach (var obj in enumerable) + { + var enum2 = obj as IEnumerable; + var objHash = enum2 == null ? obj.GetHashCode() : GetHashCode(enum2); + + unchecked + { + hashcode = (hashcode*397) ^ (objHash); + } + } + + return hashcode; + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Collections/THashSet.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Collections/THashSet.cs new file mode 100644 index 000000000..ffab57711 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Collections/THashSet.cs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections; +using System.Collections.Generic; + +namespace Thrift.Collections +{ + // ReSharper disable once InconsistentNaming + public class THashSet<T> : ICollection<T> + { + private readonly HashSet<T> Items; + + public THashSet() + { + Items = new HashSet<T>(); + } + + public THashSet(int capacity) + { + // TODO: uncomment capacity when NET Standard also implements it + Items = new HashSet<T>(/*capacity*/); + } + + public int Count => Items.Count; + + public bool IsReadOnly => false; + + public void Add(T item) + { + Items.Add(item); + } + + public void Clear() + { + Items.Clear(); + } + + public bool Contains(T item) + { + return Items.Contains(item); + } + + public void CopyTo(T[] array, int arrayIndex) + { + Items.CopyTo(array, arrayIndex); + } + + public IEnumerator GetEnumerator() + { + return Items.GetEnumerator(); + } + + IEnumerator<T> IEnumerable<T>.GetEnumerator() + { + return ((IEnumerable<T>) Items).GetEnumerator(); + } + + public bool Remove(T item) + { + return Items.Remove(item); + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs new file mode 100644 index 000000000..f5b8d16e3 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; + +namespace Thrift.Processor +{ + public interface ITAsyncProcessor + { + Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken = default(CancellationToken)); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITProcessorFactory.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITProcessorFactory.cs new file mode 100644 index 000000000..e0fe3d0a8 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/ITProcessorFactory.cs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Thrift.Server; +using Thrift.Transport; + +namespace Thrift.Processor +{ + // ReSharper disable once InconsistentNaming + public interface ITProcessorFactory + { + ITAsyncProcessor GetAsyncProcessor(TTransport trans, TServer baseServer = null); + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs new file mode 100644 index 000000000..e5aeaa6c7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; +using Thrift.Protocol.Entities; + +namespace Thrift.Processor +{ + // ReSharper disable once InconsistentNaming + public class TMultiplexedProcessor : ITAsyncProcessor + { + //TODO: Localization + + private readonly Dictionary<string, ITAsyncProcessor> _serviceProcessorMap = + new Dictionary<string, ITAsyncProcessor>(); + + public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot) + { + return await ProcessAsync(iprot, oprot, CancellationToken.None); + } + + public async Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<bool>(cancellationToken); + } + + try + { + var message = await iprot.ReadMessageBeginAsync(cancellationToken); + + if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway)) + { + await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidMessageType, + "Message exType CALL or ONEWAY expected", cancellationToken); + return false; + } + + // Extract the service name + var index = message.Name.IndexOf(TMultiplexedProtocol.Separator, StringComparison.Ordinal); + if (index < 0) + { + await FailAsync(oprot, message, TApplicationException.ExceptionType.InvalidProtocol, + $"Service name not found in message name: {message.Name}. Did you forget to use a TMultiplexProtocol in your client?", + cancellationToken); + return false; + } + + // Create a new TMessage, something that can be consumed by any TProtocol + var serviceName = message.Name.Substring(0, index); + ITAsyncProcessor actualProcessor; + if (!_serviceProcessorMap.TryGetValue(serviceName, out actualProcessor)) + { + await FailAsync(oprot, message, TApplicationException.ExceptionType.InternalError, + $"Service name not found: {serviceName}. Did you forget to call RegisterProcessor()?", + cancellationToken); + return false; + } + + // Create a new TMessage, removing the service name + var newMessage = new TMessage( + message.Name.Substring(serviceName.Length + TMultiplexedProtocol.Separator.Length), + message.Type, + message.SeqID); + + // Dispatch processing to the stored processor + return + await + actualProcessor.ProcessAsync(new StoredMessageProtocol(iprot, newMessage), oprot, + cancellationToken); + } + catch (IOException) + { + return false; // similar to all other processors + } + } + + public void RegisterProcessor(string serviceName, ITAsyncProcessor processor) + { + if (_serviceProcessorMap.ContainsKey(serviceName)) + { + throw new InvalidOperationException( + $"Processor map already contains processor with name: '{serviceName}'"); + } + + _serviceProcessorMap.Add(serviceName, processor); + } + + private async Task FailAsync(TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype, + string etxt, CancellationToken cancellationToken) + { + var appex = new TApplicationException(extype, etxt); + + var newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID); + + await oprot.WriteMessageBeginAsync(newMessage, cancellationToken); + await appex.WriteAsync(oprot, cancellationToken); + await oprot.WriteMessageEndAsync(cancellationToken); + await oprot.Transport.FlushAsync(cancellationToken); + } + + private class StoredMessageProtocol : TProtocolDecorator + { + readonly TMessage _msgBegin; + + public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) + : base(protocol) + { + _msgBegin = messageBegin; + } + + public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TMessage>(cancellationToken); + } + + return _msgBegin; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TSingletonProcessorFactory.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TSingletonProcessorFactory.cs new file mode 100644 index 000000000..97ecff65c --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Processor/TSingletonProcessorFactory.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Thrift.Server; +using Thrift.Transport; + +namespace Thrift.Processor +{ + // ReSharper disable once InconsistentNaming + public class TSingletonProcessorFactory : ITProcessorFactory + { + private readonly ITAsyncProcessor _asyncProcessor; + + public TSingletonProcessorFactory(ITAsyncProcessor asyncProcessor) + { + _asyncProcessor = asyncProcessor; + } + + public ITAsyncProcessor GetAsyncProcessor(TTransport trans, TServer baseServer = null) + { + return _asyncProcessor; + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Properties/AssemblyInfo.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..597290de5 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Properties/AssemblyInfo.cs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Reflection; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. + +[assembly: AssemblyTitle("Thrift")] +[assembly: AssemblyDescription("C# .NET Core bindings for the Apache Thrift RPC system")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("The Apache Software Foundation")] +[assembly: AssemblyProduct("Thrift")] +[assembly: AssemblyCopyright("The Apache Software Foundation")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] +//@TODO where to put License information? + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a exType in this assembly from +// COM, set the ComVisible attribute to true on that exType. + +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM + +[assembly: Guid("df3f8ef0-e0a3-4c86-a65b-8ec84e016b1d")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: + +[assembly: AssemblyVersion("0.13.0.0")] +[assembly: AssemblyFileVersion("0.13.0.0")] + diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TField.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TField.cs new file mode 100644 index 000000000..4e29bb5d4 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TField.cs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public struct TField + { + public TField(string name, TType type, short id) + { + Name = name; + Type = type; + ID = id; + } + + public string Name { get; set; } + + public TType Type { get; set; } + + // ReSharper disable once InconsistentNaming - do not rename - it used for generation + public short ID { get; set; } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TList.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TList.cs new file mode 100644 index 000000000..f59922564 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TList.cs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public struct TList + { + public TList(TType elementType, int count) + { + ElementType = elementType; + Count = count; + } + + public TType ElementType { get; set; } + + public int Count { get; set; } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMap.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMap.cs new file mode 100644 index 000000000..1efebe7a1 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMap.cs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public struct TMap + { + public TMap(TType keyType, TType valueType, int count) + { + KeyType = keyType; + ValueType = valueType; + Count = count; + } + + public TType KeyType { get; set; } + + public TType ValueType { get; set; } + + public int Count { get; set; } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMessage.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMessage.cs new file mode 100644 index 000000000..08d741d65 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMessage.cs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public struct TMessage + { + public TMessage(string name, TMessageType type, int seqid) + { + Name = name; + Type = type; + SeqID = seqid; + } + + public string Name { get; set; } + + public TMessageType Type { get; set; } + + // ReSharper disable once InconsistentNaming - do not rename - it used for generation + public int SeqID { get; set; } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMessageType.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMessageType.cs new file mode 100644 index 000000000..24d663e2d --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TMessageType.cs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public enum TMessageType + { + Call = 1, + Reply = 2, + Exception = 3, + Oneway = 4 + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TSet.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TSet.cs new file mode 100644 index 000000000..692d5642c --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TSet.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public struct TSet + { + public TSet(TType elementType, int count) + { + ElementType = elementType; + Count = count; + } + + public TSet(TList list) + : this(list.ElementType, list.Count) + { + } + + public TType ElementType { get; set; } + + public int Count { get; set; } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TStruct.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TStruct.cs new file mode 100644 index 000000000..e04167e47 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TStruct.cs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public struct TStruct + { + public TStruct(string name) + { + Name = name; + } + + public string Name { get; set; } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TType.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TType.cs new file mode 100644 index 000000000..4e922a7e7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Entities/TType.cs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Entities +{ + // ReSharper disable once InconsistentNaming + public enum TType : byte + { + Stop = 0, + Void = 1, + Bool = 2, + Byte = 3, + Double = 4, + I16 = 6, + I32 = 8, + I64 = 10, + String = 11, + Struct = 12, + Map = 13, + Set = 14, + List = 15 + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TBase.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TBase.cs new file mode 100644 index 000000000..b5ef2aea9 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TBase.cs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Protocol +{ + public interface TUnionBase + { + Task WriteAsync(TProtocol tProtocol, CancellationToken cancellationToken = default(CancellationToken)); + } + + // ReSharper disable once InconsistentNaming + public interface TBase : TUnionBase + { + Task ReadAsync(TProtocol tProtocol, CancellationToken cancellationToken = default(CancellationToken)); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs new file mode 100644 index 000000000..3f30d4aa1 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs @@ -0,0 +1,600 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; +using Thrift.Transport; + +namespace Thrift.Protocol +{ + // ReSharper disable once InconsistentNaming + public class TBinaryProtocol : TProtocol + { + protected const uint VersionMask = 0xffff0000; + protected const uint Version1 = 0x80010000; + + protected bool StrictRead; + protected bool StrictWrite; + + // minimize memory allocations by means of an preallocated bytes buffer + // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long) + private byte[] PreAllocatedBuffer = new byte[128]; + + private static readonly TStruct AnonymousStruct = new TStruct(string.Empty); + private static readonly TField StopField = new TField() { Type = TType.Stop }; + + public TBinaryProtocol(TTransport trans) + : this(trans, false, true) + { + } + + public TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite) + : base(trans) + { + StrictRead = strictRead; + StrictWrite = strictWrite; + } + + public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + if (StrictWrite) + { + var version = Version1 | (uint) message.Type; + await WriteI32Async((int) version, cancellationToken); + await WriteStringAsync(message.Name, cancellationToken); + await WriteI32Async(message.SeqID, cancellationToken); + } + else + { + await WriteStringAsync(message.Name, cancellationToken); + await WriteByteAsync((sbyte) message.Type, cancellationToken); + await WriteI32Async(message.SeqID, cancellationToken); + } + } + + public override async Task WriteMessageEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteStructEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteByteAsync((sbyte) field.Type, cancellationToken); + await WriteI16Async(field.ID, cancellationToken); + } + + public override async Task WriteFieldEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteByteAsync((sbyte) TType.Stop, cancellationToken); + } + + public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = (byte)map.KeyType; + PreAllocatedBuffer[1] = (byte)map.ValueType; + await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); + + await WriteI32Async(map.Count, cancellationToken); + } + + public override async Task WriteMapEndAsync(CancellationToken cancellationToken) + + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteByteAsync((sbyte) list.ElementType, cancellationToken); + await WriteI32Async(list.Count, cancellationToken); + } + + public override async Task WriteListEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteByteAsync((sbyte) set.ElementType, cancellationToken); + await WriteI32Async(set.Count, cancellationToken); + } + + public override async Task WriteSetEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken); + } + + public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = (byte)b; + + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + } + public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = (byte)(0xff & (i16 >> 8)); + PreAllocatedBuffer[1] = (byte)(0xff & i16); + + await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); + } + + public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = (byte)(0xff & (i32 >> 24)); + PreAllocatedBuffer[1] = (byte)(0xff & (i32 >> 16)); + PreAllocatedBuffer[2] = (byte)(0xff & (i32 >> 8)); + PreAllocatedBuffer[3] = (byte)(0xff & i32); + + await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken); + } + + + public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = (byte)(0xff & (i64 >> 56)); + PreAllocatedBuffer[1] = (byte)(0xff & (i64 >> 48)); + PreAllocatedBuffer[2] = (byte)(0xff & (i64 >> 40)); + PreAllocatedBuffer[3] = (byte)(0xff & (i64 >> 32)); + PreAllocatedBuffer[4] = (byte)(0xff & (i64 >> 24)); + PreAllocatedBuffer[5] = (byte)(0xff & (i64 >> 16)); + PreAllocatedBuffer[6] = (byte)(0xff & (i64 >> 8)); + PreAllocatedBuffer[7] = (byte)(0xff & i64); + + await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken); + } + + public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken); + } + + + public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteI32Async(bytes.Length, cancellationToken); + await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); + } + + public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TMessage>(cancellationToken); + } + + var message = new TMessage(); + var size = await ReadI32Async(cancellationToken); + if (size < 0) + { + var version = (uint) size & VersionMask; + if (version != Version1) + { + throw new TProtocolException(TProtocolException.BAD_VERSION, + $"Bad version in ReadMessageBegin: {version}"); + } + message.Type = (TMessageType) (size & 0x000000ff); + message.Name = await ReadStringAsync(cancellationToken); + message.SeqID = await ReadI32Async(cancellationToken); + } + else + { + if (StrictRead) + { + throw new TProtocolException(TProtocolException.BAD_VERSION, + "Missing version in ReadMessageBegin, old client?"); + } + message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty; + message.Type = (TMessageType) await ReadByteAsync(cancellationToken); + message.SeqID = await ReadI32Async(cancellationToken); + } + return message; + } + + public override async Task ReadMessageEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + return AnonymousStruct; + } + + public override async Task ReadStructEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TField>(cancellationToken); + } + + + var type = (TType)await ReadByteAsync(cancellationToken); + if (type == TType.Stop) + { + return StopField; + } + + return new TField { + Type = type, + ID = await ReadI16Async(cancellationToken) + }; + } + + public override async Task ReadFieldEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TMap>(cancellationToken); + } + + var map = new TMap + { + KeyType = (TType) await ReadByteAsync(cancellationToken), + ValueType = (TType) await ReadByteAsync(cancellationToken), + Count = await ReadI32Async(cancellationToken) + }; + + return map; + } + + public override async Task ReadMapEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TList>(cancellationToken); + } + + var list = new TList + { + ElementType = (TType) await ReadByteAsync(cancellationToken), + Count = await ReadI32Async(cancellationToken) + }; + + return list; + } + + public override async Task ReadListEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TSet>(cancellationToken); + } + + var set = new TSet + { + ElementType = (TType) await ReadByteAsync(cancellationToken), + Count = await ReadI32Async(cancellationToken) + }; + + return set; + } + + public override async Task ReadSetEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<bool>(cancellationToken); + } + + return await ReadByteAsync(cancellationToken) == 1; + } + + public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<sbyte>(cancellationToken); + } + + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + return (sbyte)PreAllocatedBuffer[0]; + } + + public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<short>(cancellationToken); + } + + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken); + var result = (short) (((PreAllocatedBuffer[0] & 0xff) << 8) | PreAllocatedBuffer[1] & 0xff); + return result; + } + + public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<int>(cancellationToken); + } + + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken); + + var result = + ((PreAllocatedBuffer[0] & 0xff) << 24) | + ((PreAllocatedBuffer[1] & 0xff) << 16) | + ((PreAllocatedBuffer[2] & 0xff) << 8) | + PreAllocatedBuffer[3] & 0xff; + + return result; + } + +#pragma warning disable 675 + + protected internal long ReadI64FromPreAllocatedBuffer() + { + var result = + ((long) (PreAllocatedBuffer[0] & 0xff) << 56) | + ((long) (PreAllocatedBuffer[1] & 0xff) << 48) | + ((long) (PreAllocatedBuffer[2] & 0xff) << 40) | + ((long) (PreAllocatedBuffer[3] & 0xff) << 32) | + ((long) (PreAllocatedBuffer[4] & 0xff) << 24) | + ((long) (PreAllocatedBuffer[5] & 0xff) << 16) | + ((long) (PreAllocatedBuffer[6] & 0xff) << 8) | + PreAllocatedBuffer[7] & 0xff; + + return result; + } + +#pragma warning restore 675 + + public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<long>(cancellationToken); + } + + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken); + return ReadI64FromPreAllocatedBuffer(); + } + + public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<double>(cancellationToken); + } + + var d = await ReadI64Async(cancellationToken); + return BitConverter.Int64BitsToDouble(d); + } + + public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<byte[]>(cancellationToken); + } + + var size = await ReadI32Async(cancellationToken); + var buf = new byte[size]; + await Trans.ReadAllAsync(buf, 0, size, cancellationToken); + return buf; + } + + public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<string>(cancellationToken); + } + + var size = await ReadI32Async(cancellationToken); + return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty; + } + + private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled<string>(cancellationToken); + } + + if (size <= PreAllocatedBuffer.Length) + { + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken); + return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size); + } + + var buf = new byte[size]; + await Trans.ReadAllAsync(buf, 0, size, cancellationToken); + return Encoding.UTF8.GetString(buf, 0, buf.Length); + } + + public class Factory : TProtocolFactory + { + protected bool StrictRead; + protected bool StrictWrite; + + public Factory() + : this(false, true) + { + } + + public Factory(bool strictRead, bool strictWrite) + { + StrictRead = strictRead; + StrictWrite = strictWrite; + } + + public override TProtocol GetProtocol(TTransport trans) + { + return new TBinaryProtocol(trans, StrictRead, StrictWrite); + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TCompactProtocol.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TCompactProtocol.cs new file mode 100644 index 000000000..c26633a14 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TCompactProtocol.cs @@ -0,0 +1,919 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; +using Thrift.Transport; + +namespace Thrift.Protocol +{ + //TODO: implementation of TProtocol + + // ReSharper disable once InconsistentNaming + public class TCompactProtocol : TProtocol + { + private const byte ProtocolId = 0x82; + private const byte Version = 1; + private const byte VersionMask = 0x1f; // 0001 1111 + private const byte TypeMask = 0xE0; // 1110 0000 + private const byte TypeBits = 0x07; // 0000 0111 + private const int TypeShiftAmount = 5; + private static readonly TStruct AnonymousStruct = new TStruct(string.Empty); + private static readonly TField StopField = new TField(string.Empty, TType.Stop, 0); + + private const byte NoTypeOverride = 0xFF; + + // ReSharper disable once InconsistentNaming + private static readonly byte[] TTypeToCompactType = new byte[16]; + private static readonly TType[] CompactTypeToTType = new TType[13]; + + /// <summary> + /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff. + /// </summary> + private readonly Stack<short> _lastField = new Stack<short>(15); + + /// <summary> + /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated. + /// </summary> + private TField? _booleanField; + + /// <summary> + /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it. + /// </summary> + private bool? _boolValue; + + private short _lastFieldId; + + // minimize memory allocations by means of an preallocated bytes buffer + // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long) + private byte[] PreAllocatedBuffer = new byte[128]; + + private struct VarInt + { + public byte[] bytes; + public int count; + } + + // minimize memory allocations by means of an preallocated VarInt buffer + private VarInt PreAllocatedVarInt = new VarInt() + { + bytes = new byte[10], // see Int64ToVarInt() + count = 0 + }; + + + + + public TCompactProtocol(TTransport trans) + : base(trans) + { + TTypeToCompactType[(int) TType.Stop] = Types.Stop; + TTypeToCompactType[(int) TType.Bool] = Types.BooleanTrue; + TTypeToCompactType[(int) TType.Byte] = Types.Byte; + TTypeToCompactType[(int) TType.I16] = Types.I16; + TTypeToCompactType[(int) TType.I32] = Types.I32; + TTypeToCompactType[(int) TType.I64] = Types.I64; + TTypeToCompactType[(int) TType.Double] = Types.Double; + TTypeToCompactType[(int) TType.String] = Types.Binary; + TTypeToCompactType[(int) TType.List] = Types.List; + TTypeToCompactType[(int) TType.Set] = Types.Set; + TTypeToCompactType[(int) TType.Map] = Types.Map; + TTypeToCompactType[(int) TType.Struct] = Types.Struct; + + CompactTypeToTType[Types.Stop] = TType.Stop; + CompactTypeToTType[Types.BooleanTrue] = TType.Bool; + CompactTypeToTType[Types.BooleanFalse] = TType.Bool; + CompactTypeToTType[Types.Byte] = TType.Byte; + CompactTypeToTType[Types.I16] = TType.I16; + CompactTypeToTType[Types.I32] = TType.I32; + CompactTypeToTType[Types.I64] = TType.I64; + CompactTypeToTType[Types.Double] = TType.Double; + CompactTypeToTType[Types.Binary] = TType.String; + CompactTypeToTType[Types.List] = TType.List; + CompactTypeToTType[Types.Set] = TType.Set; + CompactTypeToTType[Types.Map] = TType.Map; + CompactTypeToTType[Types.Struct] = TType.Struct; + } + + public void Reset() + { + _lastField.Clear(); + _lastFieldId = 0; + } + + public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) + { + PreAllocatedBuffer[0] = ProtocolId; + PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask)); + await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken); + + Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + + await WriteStringAsync(message.Name, cancellationToken); + } + + public override async Task WriteMessageEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + /// <summary> + /// Write a struct begin. This doesn't actually put anything on the wire. We + /// use it as an opportunity to put special placeholder markers on the field + /// stack so we can get the field id deltas correct. + /// </summary> + public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + _lastField.Push(_lastFieldId); + _lastFieldId = 0; + } + + public override async Task WriteStructEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + _lastFieldId = _lastField.Pop(); + } + + private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken) + { + // if there's a exType override passed in, use that. Otherwise ask GetCompactType(). + if (fieldType == NoTypeOverride) + fieldType = GetCompactType(field.Type); + + + // check if we can use delta encoding for the field id + if (field.ID > _lastFieldId) + { + var delta = field.ID - _lastFieldId; + if (delta <= 15) + { + // Write them together + PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType); + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + _lastFieldId = field.ID; + return; + } + } + + // Write them separate + PreAllocatedBuffer[0] = fieldType; + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + await WriteI16Async(field.ID, cancellationToken); + _lastFieldId = field.ID; + } + + public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) + { + if (field.Type == TType.Bool) + { + _booleanField = field; + } + else + { + await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken); + } + } + + public override async Task WriteFieldEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = Types.Stop; + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + } + + protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + /* + Abstract method for writing the start of lists and sets. List and sets on + the wire differ only by the exType indicator. + */ + + if (size <= 14) + { + PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType)); + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + } + else + { + PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType)); + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + + Int32ToVarInt((uint) size, ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + } + } + + public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) + { + await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken); + } + + public override async Task WriteListEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken); + } + + public override async Task WriteSetEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + /* + Write a boolean value. Potentially, this could be a boolean field, in + which case the field header info isn't written yet. If so, decide what the + right exType header is for the value and then Write the field header. + Otherwise, Write a single byte. + */ + + if (_booleanField != null) + { + // we haven't written the field header yet + var type = b ? Types.BooleanTrue : Types.BooleanFalse; + await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken); + _booleanField = null; + } + else + { + // we're not part of a field, so just write the value. + PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse; + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + } + } + + public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + PreAllocatedBuffer[0] = (byte)b; + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + } + + public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + } + + private static void Int32ToVarInt(uint n, ref VarInt varint) + { + // Write an i32 as a varint. Results in 1 - 5 bytes on the wire. + varint.count = 0; + Debug.Assert(varint.bytes.Length >= 5); + + while (true) + { + if ((n & ~0x7F) == 0) + { + varint.bytes[varint.count++] = (byte)n; + break; + } + + varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80); + n >>= 7; + } + } + + public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + } + + static private void Int64ToVarInt(ulong n, ref VarInt varint) + { + // Write an i64 as a varint. Results in 1-10 bytes on the wire. + varint.count = 0; + Debug.Assert(varint.bytes.Length >= 10); + + while (true) + { + if ((n & ~(ulong)0x7FL) == 0) + { + varint.bytes[varint.count++] = (byte)n; + break; + } + varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80); + n >>= 7; + } + } + + public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + } + + public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + FixedLongToBytes(BitConverter.DoubleToInt64Bits(d), PreAllocatedBuffer, 0); + await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken); + } + + public override async Task WriteStringAsync(string str, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + var bytes = Encoding.UTF8.GetBytes(str); + + Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); + } + + public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken); + } + + public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + if (map.Count == 0) + { + PreAllocatedBuffer[0] = 0; + await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken); + } + else + { + Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt); + await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken); + + PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType)); + await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + } + } + + public override async Task WriteMapEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TMessage>(cancellationToken); + } + + var protocolId = (byte) await ReadByteAsync(cancellationToken); + if (protocolId != ProtocolId) + { + throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}"); + } + + var versionAndType = (byte) await ReadByteAsync(cancellationToken); + var version = (byte) (versionAndType & VersionMask); + + if (version != Version) + { + throw new TProtocolException($"Expected version {Version} but got {version}"); + } + + var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits); + var seqid = (int) await ReadVarInt32Async(cancellationToken); + var messageName = await ReadStringAsync(cancellationToken); + + return new TMessage(messageName, (TMessageType) type, seqid); + } + + public override async Task ReadMessageEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TStruct>(cancellationToken); + } + + // some magic is here ) + + _lastField.Push(_lastFieldId); + _lastFieldId = 0; + + return AnonymousStruct; + } + + public override async Task ReadStructEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + /* + Doesn't actually consume any wire data, just removes the last field for + this struct from the field stack. + */ + + // consume the last field we Read off the wire. + _lastFieldId = _lastField.Pop(); + } + + public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) + { + // Read a field header off the wire. + var type = (byte) await ReadByteAsync(cancellationToken); + + // if it's a stop, then we can return immediately, as the struct is over. + if (type == Types.Stop) + { + return StopField; + } + + + // mask off the 4 MSB of the exType header. it could contain a field id delta. + var modifier = (short) ((type & 0xf0) >> 4); + var compactType = (byte)(type & 0x0f); + + short fieldId; + if (modifier == 0) + { + fieldId = await ReadI16Async(cancellationToken); + } + else + { + fieldId = (short) (_lastFieldId + modifier); + } + + var ttype = GetTType(compactType); + var field = new TField(string.Empty, ttype, fieldId); + + // if this happens to be a boolean field, the value is encoded in the exType + if( ttype == TType.Bool) + { + _boolValue = (compactType == Types.BooleanTrue); + } + + // push the new field onto the field stack so we can keep the deltas going. + _lastFieldId = field.ID; + return field; + } + + public override async Task ReadFieldEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled<TMap>(cancellationToken); + } + + /* + Read a map header off the wire. If the size is zero, skip Reading the key + and value exType. This means that 0-length maps will yield TMaps without the + "correct" types. + */ + + var size = (int) await ReadVarInt32Async(cancellationToken); + var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken); + return new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size); + } + + public override async Task ReadMapEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) + { + /* + Read a set header off the wire. If the set size is 0-14, the size will + be packed into the element exType header. If it's a longer set, the 4 MSB + of the element exType header will be 0xF, and a varint will follow with the + true size. + */ + + return new TSet(await ReadListBeginAsync(cancellationToken)); + } + + public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) + { + /* + Read a boolean off the wire. If this is a boolean field, the value should + already have been Read during ReadFieldBegin, so we'll just consume the + pre-stored value. Otherwise, Read a byte. + */ + + if (_boolValue != null) + { + var result = _boolValue.Value; + _boolValue = null; + return new ValueTask<bool>(result); + } + + return InternalCall(); + + async ValueTask<bool> InternalCall() + { + var data = await ReadByteAsync(cancellationToken); + return (data == Types.BooleanTrue); + } + } + + + public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) + { + // Read a single byte off the wire. Nothing interesting here. + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken); + return (sbyte)PreAllocatedBuffer[0]; + } + + public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<short>(cancellationToken); + } + + return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken)); + } + + public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<int>(cancellationToken); + } + + return ZigzagToInt(await ReadVarInt32Async(cancellationToken)); + } + + public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<long>(cancellationToken); + } + + return ZigzagToLong(await ReadVarInt64Async(cancellationToken)); + } + + public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<double>(cancellationToken); + } + + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken); + + return BitConverter.Int64BitsToDouble(BytesToLong(PreAllocatedBuffer)); + } + + public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) + { + // read length + var length = (int) await ReadVarInt32Async(cancellationToken); + if (length == 0) + { + return string.Empty; + } + + // read and decode data + if (length < PreAllocatedBuffer.Length) + { + await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken); + return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length); + } + + var buf = new byte[length]; + await Trans.ReadAllAsync(buf, 0, length, cancellationToken); + return Encoding.UTF8.GetString(buf, 0, length); + } + + public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) + { + // read length + var length = (int) await ReadVarInt32Async(cancellationToken); + if (length == 0) + { + return new byte[0]; + } + + // read data + var buf = new byte[length]; + await Trans.ReadAllAsync(buf, 0, length, cancellationToken); + return buf; + } + + public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled<TList>(cancellationToken); + } + + /* + Read a list header off the wire. If the list size is 0-14, the size will + be packed into the element exType header. If it's a longer list, the 4 MSB + of the element exType header will be 0xF, and a varint will follow with the + true size. + */ + + var sizeAndType = (byte) await ReadByteAsync(cancellationToken); + var size = (sizeAndType >> 4) & 0x0f; + if (size == 15) + { + size = (int) await ReadVarInt32Async(cancellationToken); + } + + var type = GetTType(sizeAndType); + return new TList(type, size); + } + + public override async Task ReadListEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task ReadSetEndAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + private static byte GetCompactType(TType ttype) + { + // Given a TType value, find the appropriate TCompactProtocol.Types constant. + return TTypeToCompactType[(int) ttype]; + } + + + private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<uint>(cancellationToken); + } + + /* + Read an i32 from the wire as a varint. The MSB of each byte is set + if there is another byte to follow. This can Read up to 5 bytes. + */ + + uint result = 0; + var shift = 0; + + while (true) + { + var b = (byte) await ReadByteAsync(cancellationToken); + result |= (uint) (b & 0x7f) << shift; + if ((b & 0x80) != 0x80) + { + break; + } + shift += 7; + } + + return result; + } + + private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<uint>(cancellationToken); + } + + /* + Read an i64 from the wire as a proper varint. The MSB of each byte is set + if there is another byte to follow. This can Read up to 10 bytes. + */ + + var shift = 0; + ulong result = 0; + while (true) + { + var b = (byte) await ReadByteAsync(cancellationToken); + result |= (ulong) (b & 0x7f) << shift; + if ((b & 0x80) != 0x80) + { + break; + } + shift += 7; + } + + return result; + } + + private static int ZigzagToInt(uint n) + { + return (int) (n >> 1) ^ -(int) (n & 1); + } + + private static long ZigzagToLong(ulong n) + { + return (long) (n >> 1) ^ -(long) (n & 1); + } + + private static long BytesToLong(byte[] bytes) + { + /* + Note that it's important that the mask bytes are long literals, + otherwise they'll default to ints, and when you shift an int left 56 bits, + you just get a messed up int. + */ + + return + ((bytes[7] & 0xffL) << 56) | + ((bytes[6] & 0xffL) << 48) | + ((bytes[5] & 0xffL) << 40) | + ((bytes[4] & 0xffL) << 32) | + ((bytes[3] & 0xffL) << 24) | + ((bytes[2] & 0xffL) << 16) | + ((bytes[1] & 0xffL) << 8) | + (bytes[0] & 0xffL); + } + + private static TType GetTType(byte type) + { + // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value. + return CompactTypeToTType[type & 0x0f]; + } + + private static ulong LongToZigzag(long n) + { + // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint + return (ulong) (n << 1) ^ (ulong) (n >> 63); + } + + private static uint IntToZigzag(int n) + { + // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint + return (uint) (n << 1) ^ (uint) (n >> 31); + } + + private static void FixedLongToBytes(long n, byte[] buf, int off) + { + // Convert a long into little-endian bytes in buf starting at off and going until off+7. + buf[off + 0] = (byte) (n & 0xff); + buf[off + 1] = (byte) ((n >> 8) & 0xff); + buf[off + 2] = (byte) ((n >> 16) & 0xff); + buf[off + 3] = (byte) ((n >> 24) & 0xff); + buf[off + 4] = (byte) ((n >> 32) & 0xff); + buf[off + 5] = (byte) ((n >> 40) & 0xff); + buf[off + 6] = (byte) ((n >> 48) & 0xff); + buf[off + 7] = (byte) ((n >> 56) & 0xff); + } + + public class Factory : TProtocolFactory + { + public override TProtocol GetProtocol(TTransport trans) + { + return new TCompactProtocol(trans); + } + } + + /// <summary> + /// All of the on-wire exType codes. + /// </summary> + private static class Types + { + public const byte Stop = 0x00; + public const byte BooleanTrue = 0x01; + public const byte BooleanFalse = 0x02; + public const byte Byte = 0x03; + public const byte I16 = 0x04; + public const byte I32 = 0x05; + public const byte I64 = 0x06; + public const byte Double = 0x07; + public const byte Binary = 0x08; + public const byte List = 0x09; + public const byte Set = 0x0A; + public const byte Map = 0x0B; + public const byte Struct = 0x0C; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TJSONProtocol.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TJSONProtocol.cs new file mode 100644 index 000000000..464bd62ff --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TJSONProtocol.cs @@ -0,0 +1,981 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; +using Thrift.Protocol.Utilities; +using Thrift.Transport; + +namespace Thrift.Protocol +{ + /// <summary> + /// JSON protocol implementation for thrift. + /// This is a full-featured protocol supporting Write and Read. + /// Please see the C++ class header for a detailed description of the + /// protocol's wire format. + /// Adapted from the Java version. + /// </summary> + // ReSharper disable once InconsistentNaming + public class TJsonProtocol : TProtocol + { + private const long Version = 1; + + // Temporary buffer used by several methods + private readonly byte[] _tempBuffer = new byte[4]; + + // Current context that we are in + protected JSONBaseContext Context; + + // Stack of nested contexts that we may be in + protected Stack<JSONBaseContext> ContextStack = new Stack<JSONBaseContext>(); + + // Reader that manages a 1-byte buffer + protected LookaheadReader Reader; + + // Default encoding + protected Encoding Utf8Encoding = Encoding.UTF8; + + /// <summary> + /// TJsonProtocol Constructor + /// </summary> + public TJsonProtocol(TTransport trans) + : base(trans) + { + Context = new JSONBaseContext(this); + Reader = new LookaheadReader(this); + } + + /// <summary> + /// Push a new JSON context onto the stack. + /// </summary> + protected void PushContext(JSONBaseContext c) + { + ContextStack.Push(Context); + Context = c; + } + + /// <summary> + /// Pop the last JSON context off the stack + /// </summary> + protected void PopContext() + { + Context = ContextStack.Pop(); + } + + /// <summary> + /// Read a byte that must match b[0]; otherwise an exception is thrown. + /// Marked protected to avoid synthetic accessor in JSONListContext.Read + /// and JSONPairContext.Read + /// </summary> + protected async Task ReadJsonSyntaxCharAsync(byte[] bytes, CancellationToken cancellationToken) + { + var ch = await Reader.ReadAsync(cancellationToken); + if (ch != bytes[0]) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, $"Unexpected character: {(char) ch}"); + } + } + + /// <summary> + /// Write the bytes in array buf as a JSON characters, escaping as needed + /// </summary> + private async Task WriteJsonStringAsync(byte[] bytes, CancellationToken cancellationToken) + { + await Context.WriteConditionalDelimiterAsync(cancellationToken); + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + + var len = bytes.Length; + for (var i = 0; i < len; i++) + { + if ((bytes[i] & 0x00FF) >= 0x30) + { + if (bytes[i] == TJSONProtocolConstants.Backslash[0]) + { + await Trans.WriteAsync(TJSONProtocolConstants.Backslash, cancellationToken); + await Trans.WriteAsync(TJSONProtocolConstants.Backslash, cancellationToken); + } + else + { + await Trans.WriteAsync(bytes.ToArray(), i, 1, cancellationToken); + } + } + else + { + _tempBuffer[0] = TJSONProtocolConstants.JsonCharTable[bytes[i]]; + if (_tempBuffer[0] == 1) + { + await Trans.WriteAsync(bytes, i, 1, cancellationToken); + } + else if (_tempBuffer[0] > 1) + { + await Trans.WriteAsync(TJSONProtocolConstants.Backslash, cancellationToken); + await Trans.WriteAsync(_tempBuffer, 0, 1, cancellationToken); + } + else + { + await Trans.WriteAsync(TJSONProtocolConstants.EscSequences, cancellationToken); + _tempBuffer[0] = TJSONProtocolHelper.ToHexChar((byte) (bytes[i] >> 4)); + _tempBuffer[1] = TJSONProtocolHelper.ToHexChar(bytes[i]); + await Trans.WriteAsync(_tempBuffer, 0, 2, cancellationToken); + } + } + } + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + /// <summary> + /// Write out number as a JSON value. If the context dictates so, it will be + /// wrapped in quotes to output as a JSON string. + /// </summary> + private async Task WriteJsonIntegerAsync(long num, CancellationToken cancellationToken) + { + await Context.WriteConditionalDelimiterAsync(cancellationToken); + var str = num.ToString(); + + var escapeNum = Context.EscapeNumbers(); + if (escapeNum) + { + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + var bytes = Utf8Encoding.GetBytes(str); + await Trans.WriteAsync(bytes, cancellationToken); + + if (escapeNum) + { + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + } + + /// <summary> + /// Write out a double as a JSON value. If it is NaN or infinity or if the + /// context dictates escaping, Write out as JSON string. + /// </summary> + private async Task WriteJsonDoubleAsync(double num, CancellationToken cancellationToken) + { + await Context.WriteConditionalDelimiterAsync(cancellationToken); + var str = num.ToString("G17", CultureInfo.InvariantCulture); + var special = false; + + switch (str[0]) + { + case 'N': // NaN + case 'I': // Infinity + special = true; + break; + case '-': + if (str[1] == 'I') + { + // -Infinity + special = true; + } + break; + } + + var escapeNum = special || Context.EscapeNumbers(); + + if (escapeNum) + { + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + await Trans.WriteAsync(Utf8Encoding.GetBytes(str), cancellationToken); + + if (escapeNum) + { + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + } + + /// <summary> + /// Write out contents of byte array b as a JSON string with base-64 encoded + /// data + /// </summary> + private async Task WriteJsonBase64Async(byte[] bytes, CancellationToken cancellationToken) + { + await Context.WriteConditionalDelimiterAsync(cancellationToken); + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + + var len = bytes.Length; + var off = 0; + + while (len >= 3) + { + // Encode 3 bytes at a time + TBase64Utils.Encode(bytes, off, 3, _tempBuffer, 0); + await Trans.WriteAsync(_tempBuffer, 0, 4, cancellationToken); + off += 3; + len -= 3; + } + + if (len > 0) + { + // Encode remainder + TBase64Utils.Encode(bytes, off, len, _tempBuffer, 0); + await Trans.WriteAsync(_tempBuffer, 0, len + 1, cancellationToken); + } + + await Trans.WriteAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + private async Task WriteJsonObjectStartAsync(CancellationToken cancellationToken) + { + await Context.WriteConditionalDelimiterAsync(cancellationToken); + await Trans.WriteAsync(TJSONProtocolConstants.LeftBrace, cancellationToken); + PushContext(new JSONPairContext(this)); + } + + private async Task WriteJsonObjectEndAsync(CancellationToken cancellationToken) + { + PopContext(); + await Trans.WriteAsync(TJSONProtocolConstants.RightBrace, cancellationToken); + } + + private async Task WriteJsonArrayStartAsync(CancellationToken cancellationToken) + { + await Context.WriteConditionalDelimiterAsync(cancellationToken); + await Trans.WriteAsync(TJSONProtocolConstants.LeftBracket, cancellationToken); + PushContext(new JSONListContext(this)); + } + + private async Task WriteJsonArrayEndAsync(CancellationToken cancellationToken) + { + PopContext(); + await Trans.WriteAsync(TJSONProtocolConstants.RightBracket, cancellationToken); + } + + public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) + { + await WriteJsonArrayStartAsync(cancellationToken); + await WriteJsonIntegerAsync(Version, cancellationToken); + + var b = Utf8Encoding.GetBytes(message.Name); + await WriteJsonStringAsync(b, cancellationToken); + + await WriteJsonIntegerAsync((long) message.Type, cancellationToken); + await WriteJsonIntegerAsync(message.SeqID, cancellationToken); + } + + public override async Task WriteMessageEndAsync(CancellationToken cancellationToken) + { + await WriteJsonArrayEndAsync(cancellationToken); + } + + public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) + { + await WriteJsonObjectStartAsync(cancellationToken); + } + + public override async Task WriteStructEndAsync(CancellationToken cancellationToken) + { + await WriteJsonObjectEndAsync(cancellationToken); + } + + public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) + { + await WriteJsonIntegerAsync(field.ID, cancellationToken); + await WriteJsonObjectStartAsync(cancellationToken); + await WriteJsonStringAsync(TJSONProtocolHelper.GetTypeNameForTypeId(field.Type), cancellationToken); + } + + public override async Task WriteFieldEndAsync(CancellationToken cancellationToken) + { + await WriteJsonObjectEndAsync(cancellationToken); + } + + public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) + { + await WriteJsonArrayStartAsync(cancellationToken); + await WriteJsonStringAsync(TJSONProtocolHelper.GetTypeNameForTypeId(map.KeyType), cancellationToken); + await WriteJsonStringAsync(TJSONProtocolHelper.GetTypeNameForTypeId(map.ValueType), cancellationToken); + await WriteJsonIntegerAsync(map.Count, cancellationToken); + await WriteJsonObjectStartAsync(cancellationToken); + } + + public override async Task WriteMapEndAsync(CancellationToken cancellationToken) + { + await WriteJsonObjectEndAsync(cancellationToken); + await WriteJsonArrayEndAsync(cancellationToken); + } + + public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) + { + await WriteJsonArrayStartAsync(cancellationToken); + await WriteJsonStringAsync(TJSONProtocolHelper.GetTypeNameForTypeId(list.ElementType), cancellationToken); + await WriteJsonIntegerAsync(list.Count, cancellationToken); + } + + public override async Task WriteListEndAsync(CancellationToken cancellationToken) + { + await WriteJsonArrayEndAsync(cancellationToken); + } + + public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) + { + await WriteJsonArrayStartAsync(cancellationToken); + await WriteJsonStringAsync(TJSONProtocolHelper.GetTypeNameForTypeId(set.ElementType), cancellationToken); + await WriteJsonIntegerAsync(set.Count, cancellationToken); + } + + public override async Task WriteSetEndAsync(CancellationToken cancellationToken) + { + await WriteJsonArrayEndAsync(cancellationToken); + } + + public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) + { + await WriteJsonIntegerAsync(b ? 1 : 0, cancellationToken); + } + + public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) + { + await WriteJsonIntegerAsync(b, cancellationToken); + } + + public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) + { + await WriteJsonIntegerAsync(i16, cancellationToken); + } + + public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) + { + await WriteJsonIntegerAsync(i32, cancellationToken); + } + + public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) + { + await WriteJsonIntegerAsync(i64, cancellationToken); + } + + public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) + { + await WriteJsonDoubleAsync(d, cancellationToken); + } + + public override async Task WriteStringAsync(string s, CancellationToken cancellationToken) + { + var b = Utf8Encoding.GetBytes(s); + await WriteJsonStringAsync(b, cancellationToken); + } + + public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) + { + await WriteJsonBase64Async(bytes, cancellationToken); + } + + /// <summary> + /// Read in a JSON string, unescaping as appropriate.. Skip Reading from the + /// context if skipContext is true. + /// </summary> + private async ValueTask<byte[]> ReadJsonStringAsync(bool skipContext, CancellationToken cancellationToken) + { + using (var buffer = new MemoryStream()) + { + var codeunits = new List<char>(); + + + if (!skipContext) + { + await Context.ReadConditionalDelimiterAsync(cancellationToken); + } + + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.Quote, cancellationToken); + + while (true) + { + var ch = await Reader.ReadAsync(cancellationToken); + if (ch == TJSONProtocolConstants.Quote[0]) + { + break; + } + + // escaped? + if (ch != TJSONProtocolConstants.EscSequences[0]) + { + await buffer.WriteAsync(new[] {ch}, 0, 1, cancellationToken); + continue; + } + + // distinguish between \uXXXX and \? + ch = await Reader.ReadAsync(cancellationToken); + if (ch != TJSONProtocolConstants.EscSequences[1]) // control chars like \n + { + var off = Array.IndexOf(TJSONProtocolConstants.EscapeChars, (char) ch); + if (off == -1) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, "Expected control char"); + } + ch = TJSONProtocolConstants.EscapeCharValues[off]; + await buffer.WriteAsync(new[] {ch}, 0, 1, cancellationToken); + continue; + } + + // it's \uXXXX + await Trans.ReadAllAsync(_tempBuffer, 0, 4, cancellationToken); + + var wch = (short) ((TJSONProtocolHelper.ToHexVal(_tempBuffer[0]) << 12) + + (TJSONProtocolHelper.ToHexVal(_tempBuffer[1]) << 8) + + (TJSONProtocolHelper.ToHexVal(_tempBuffer[2]) << 4) + + TJSONProtocolHelper.ToHexVal(_tempBuffer[3])); + + if (char.IsHighSurrogate((char) wch)) + { + if (codeunits.Count > 0) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, "Expected low surrogate char"); + } + codeunits.Add((char) wch); + } + else if (char.IsLowSurrogate((char) wch)) + { + if (codeunits.Count == 0) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, "Expected high surrogate char"); + } + + codeunits.Add((char) wch); + var tmp = Utf8Encoding.GetBytes(codeunits.ToArray()); + await buffer.WriteAsync(tmp, 0, tmp.Length, cancellationToken); + codeunits.Clear(); + } + else + { + var tmp = Utf8Encoding.GetBytes(new[] {(char) wch}); + await buffer.WriteAsync(tmp, 0, tmp.Length, cancellationToken); + } + } + + if (codeunits.Count > 0) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, "Expected low surrogate char"); + } + + return buffer.ToArray(); + } + } + + /// <summary> + /// Read in a sequence of characters that are all valid in JSON numbers. Does + /// not do a complete regex check to validate that this is actually a number. + /// </summary> + private async ValueTask<string> ReadJsonNumericCharsAsync(CancellationToken cancellationToken) + { + var strbld = new StringBuilder(); + while (true) + { + //TODO: workaround for primitive types with TJsonProtocol, think - how to rewrite into more easy form without exceptions + try + { + var ch = await Reader.PeekAsync(cancellationToken); + if (!TJSONProtocolHelper.IsJsonNumeric(ch)) + { + break; + } + var c = (char)await Reader.ReadAsync(cancellationToken); + strbld.Append(c); + } + catch (TTransportException) + { + break; + } + } + return strbld.ToString(); + } + + /// <summary> + /// Read in a JSON number. If the context dictates, Read in enclosing quotes. + /// </summary> + private async ValueTask<long> ReadJsonIntegerAsync(CancellationToken cancellationToken) + { + await Context.ReadConditionalDelimiterAsync(cancellationToken); + if (Context.EscapeNumbers()) + { + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + var str = await ReadJsonNumericCharsAsync(cancellationToken); + if (Context.EscapeNumbers()) + { + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + try + { + return long.Parse(str); + } + catch (FormatException) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, "Bad data encounted in numeric data"); + } + } + + /// <summary> + /// Read in a JSON double value. Throw if the value is not wrapped in quotes + /// when expected or if wrapped in quotes when not expected. + /// </summary> + private async ValueTask<double> ReadJsonDoubleAsync(CancellationToken cancellationToken) + { + await Context.ReadConditionalDelimiterAsync(cancellationToken); + if (await Reader.PeekAsync(cancellationToken) == TJSONProtocolConstants.Quote[0]) + { + var arr = await ReadJsonStringAsync(true, cancellationToken); + var dub = double.Parse(Utf8Encoding.GetString(arr, 0, arr.Length), CultureInfo.InvariantCulture); + + if (!Context.EscapeNumbers() && !double.IsNaN(dub) && !double.IsInfinity(dub)) + { + // Throw exception -- we should not be in a string in this case + throw new TProtocolException(TProtocolException.INVALID_DATA, "Numeric data unexpectedly quoted"); + } + + return dub; + } + + if (Context.EscapeNumbers()) + { + // This will throw - we should have had a quote if escapeNum == true + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.Quote, cancellationToken); + } + + try + { + return double.Parse(await ReadJsonNumericCharsAsync(cancellationToken), CultureInfo.InvariantCulture); + } + catch (FormatException) + { + throw new TProtocolException(TProtocolException.INVALID_DATA, "Bad data encounted in numeric data"); + } + } + + /// <summary> + /// Read in a JSON string containing base-64 encoded data and decode it. + /// </summary> + private async ValueTask<byte[]> ReadJsonBase64Async(CancellationToken cancellationToken) + { + var b = await ReadJsonStringAsync(false, cancellationToken); + var len = b.Length; + var off = 0; + var size = 0; + + // reduce len to ignore fill bytes + while ((len > 0) && (b[len - 1] == '=')) + { + --len; + } + + // read & decode full byte triplets = 4 source bytes + while (len > 4) + { + // Decode 4 bytes at a time + TBase64Utils.Decode(b, off, 4, b, size); // NB: decoded in place + off += 4; + len -= 4; + size += 3; + } + + // Don't decode if we hit the end or got a single leftover byte (invalid + // base64 but legal for skip of regular string exType) + if (len > 1) + { + // Decode remainder + TBase64Utils.Decode(b, off, len, b, size); // NB: decoded in place + size += len - 1; + } + + // Sadly we must copy the byte[] (any way around this?) + var result = new byte[size]; + Array.Copy(b, 0, result, 0, size); + return result; + } + + private async Task ReadJsonObjectStartAsync(CancellationToken cancellationToken) + { + await Context.ReadConditionalDelimiterAsync(cancellationToken); + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.LeftBrace, cancellationToken); + PushContext(new JSONPairContext(this)); + } + + private async Task ReadJsonObjectEndAsync(CancellationToken cancellationToken) + { + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.RightBrace, cancellationToken); + PopContext(); + } + + private async Task ReadJsonArrayStartAsync(CancellationToken cancellationToken) + { + await Context.ReadConditionalDelimiterAsync(cancellationToken); + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.LeftBracket, cancellationToken); + PushContext(new JSONListContext(this)); + } + + private async Task ReadJsonArrayEndAsync(CancellationToken cancellationToken) + { + await ReadJsonSyntaxCharAsync(TJSONProtocolConstants.RightBracket, cancellationToken); + PopContext(); + } + + public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) + { + var message = new TMessage(); + await ReadJsonArrayStartAsync(cancellationToken); + if (await ReadJsonIntegerAsync(cancellationToken) != Version) + { + throw new TProtocolException(TProtocolException.BAD_VERSION, "Message contained bad version."); + } + + var buf = await ReadJsonStringAsync(false, cancellationToken); + message.Name = Utf8Encoding.GetString(buf, 0, buf.Length); + message.Type = (TMessageType) await ReadJsonIntegerAsync(cancellationToken); + message.SeqID = (int) await ReadJsonIntegerAsync(cancellationToken); + return message; + } + + public override async Task ReadMessageEndAsync(CancellationToken cancellationToken) + { + await ReadJsonArrayEndAsync(cancellationToken); + } + + public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) + { + await ReadJsonObjectStartAsync(cancellationToken); + return new TStruct(); + } + + public override async Task ReadStructEndAsync(CancellationToken cancellationToken) + { + await ReadJsonObjectEndAsync(cancellationToken); + } + + public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) + { + var field = new TField(); + var ch = await Reader.PeekAsync(cancellationToken); + if (ch == TJSONProtocolConstants.RightBrace[0]) + { + field.Type = TType.Stop; + } + else + { + field.ID = (short) await ReadJsonIntegerAsync(cancellationToken); + await ReadJsonObjectStartAsync(cancellationToken); + field.Type = TJSONProtocolHelper.GetTypeIdForTypeName(await ReadJsonStringAsync(false, cancellationToken)); + } + return field; + } + + public override async Task ReadFieldEndAsync(CancellationToken cancellationToken) + { + await ReadJsonObjectEndAsync(cancellationToken); + } + + public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) + { + var map = new TMap(); + await ReadJsonArrayStartAsync(cancellationToken); + map.KeyType = TJSONProtocolHelper.GetTypeIdForTypeName(await ReadJsonStringAsync(false, cancellationToken)); + map.ValueType = TJSONProtocolHelper.GetTypeIdForTypeName(await ReadJsonStringAsync(false, cancellationToken)); + map.Count = (int) await ReadJsonIntegerAsync(cancellationToken); + await ReadJsonObjectStartAsync(cancellationToken); + return map; + } + + public override async Task ReadMapEndAsync(CancellationToken cancellationToken) + { + await ReadJsonObjectEndAsync(cancellationToken); + await ReadJsonArrayEndAsync(cancellationToken); + } + + public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) + { + var list = new TList(); + await ReadJsonArrayStartAsync(cancellationToken); + list.ElementType = TJSONProtocolHelper.GetTypeIdForTypeName(await ReadJsonStringAsync(false, cancellationToken)); + list.Count = (int) await ReadJsonIntegerAsync(cancellationToken); + return list; + } + + public override async Task ReadListEndAsync(CancellationToken cancellationToken) + { + await ReadJsonArrayEndAsync(cancellationToken); + } + + public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) + { + var set = new TSet(); + await ReadJsonArrayStartAsync(cancellationToken); + set.ElementType = TJSONProtocolHelper.GetTypeIdForTypeName(await ReadJsonStringAsync(false, cancellationToken)); + set.Count = (int) await ReadJsonIntegerAsync(cancellationToken); + return set; + } + + public override async Task ReadSetEndAsync(CancellationToken cancellationToken) + { + await ReadJsonArrayEndAsync(cancellationToken); + } + + public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) + { + return await ReadJsonIntegerAsync(cancellationToken) != 0; + } + + public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) + { + return (sbyte) await ReadJsonIntegerAsync(cancellationToken); + } + + public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) + { + return (short) await ReadJsonIntegerAsync(cancellationToken); + } + + public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) + { + return (int) await ReadJsonIntegerAsync(cancellationToken); + } + + public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) + { + return await ReadJsonIntegerAsync(cancellationToken); + } + + public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) + { + return await ReadJsonDoubleAsync(cancellationToken); + } + + public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) + { + var buf = await ReadJsonStringAsync(false, cancellationToken); + return Utf8Encoding.GetString(buf, 0, buf.Length); + } + + public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) + { + return await ReadJsonBase64Async(cancellationToken); + } + + /// <summary> + /// Factory for JSON protocol objects + /// </summary> + public class Factory : TProtocolFactory + { + public override TProtocol GetProtocol(TTransport trans) + { + return new TJsonProtocol(trans); + } + } + + /// <summary> + /// Base class for tracking JSON contexts that may require + /// inserting/Reading additional JSON syntax characters + /// This base context does nothing. + /// </summary> + protected class JSONBaseContext + { + protected TJsonProtocol Proto; + + public JSONBaseContext(TJsonProtocol proto) + { + Proto = proto; + } + + public virtual async Task WriteConditionalDelimiterAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public virtual async Task ReadConditionalDelimiterAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public virtual bool EscapeNumbers() + { + return false; + } + } + + /// <summary> + /// Context for JSON lists. Will insert/Read commas before each item except + /// for the first one + /// </summary> + protected class JSONListContext : JSONBaseContext + { + private bool _first = true; + + public JSONListContext(TJsonProtocol protocol) + : base(protocol) + { + } + + public override async Task WriteConditionalDelimiterAsync(CancellationToken cancellationToken) + { + if (_first) + { + _first = false; + } + else + { + await Proto.Trans.WriteAsync(TJSONProtocolConstants.Comma, cancellationToken); + } + } + + public override async Task ReadConditionalDelimiterAsync(CancellationToken cancellationToken) + { + if (_first) + { + _first = false; + } + else + { + await Proto.ReadJsonSyntaxCharAsync(TJSONProtocolConstants.Comma, cancellationToken); + } + } + } + + /// <summary> + /// Context for JSON records. Will insert/Read colons before the value portion + /// of each record pair, and commas before each key except the first. In + /// addition, will indicate that numbers in the key position need to be + /// escaped in quotes (since JSON keys must be strings). + /// </summary> + // ReSharper disable once InconsistentNaming + protected class JSONPairContext : JSONBaseContext + { + private bool _colon = true; + + private bool _first = true; + + public JSONPairContext(TJsonProtocol proto) + : base(proto) + { + } + + public override async Task WriteConditionalDelimiterAsync(CancellationToken cancellationToken) + { + if (_first) + { + _first = false; + _colon = true; + } + else + { + await Proto.Trans.WriteAsync(_colon ? TJSONProtocolConstants.Colon : TJSONProtocolConstants.Comma, cancellationToken); + _colon = !_colon; + } + } + + public override async Task ReadConditionalDelimiterAsync(CancellationToken cancellationToken) + { + if (_first) + { + _first = false; + _colon = true; + } + else + { + await Proto.ReadJsonSyntaxCharAsync(_colon ? TJSONProtocolConstants.Colon : TJSONProtocolConstants.Comma, cancellationToken); + _colon = !_colon; + } + } + + public override bool EscapeNumbers() + { + return _colon; + } + } + + /// <summary> + /// Holds up to one byte from the transport + /// </summary> + protected class LookaheadReader + { + private readonly byte[] _data = new byte[1]; + + private bool _hasData; + protected TJsonProtocol Proto; + + public LookaheadReader(TJsonProtocol proto) + { + Proto = proto; + } + + /// <summary> + /// Return and consume the next byte to be Read, either taking it from the + /// data buffer if present or getting it from the transport otherwise. + /// </summary> + public async ValueTask<byte> ReadAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<byte>(cancellationToken); + } + + if (_hasData) + { + _hasData = false; + } + else + { + // find more easy way to avoid exception on reading primitive types + await Proto.Trans.ReadAllAsync(_data, 0, 1, cancellationToken); + } + return _data[0]; + } + + /// <summary> + /// Return the next byte to be Read without consuming, filling the data + /// buffer if it has not been filled alReady. + /// </summary> + public async ValueTask<byte> PeekAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<byte>(cancellationToken); + } + + if (!_hasData) + { + // find more easy way to avoid exception on reading primitive types + await Proto.Trans.ReadAllAsync(_data, 0, 1, cancellationToken); + _hasData = true; + } + return _data[0]; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TMultiplexedProtocol.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TMultiplexedProtocol.cs new file mode 100644 index 000000000..fbc8c05cc --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TMultiplexedProtocol.cs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; + +namespace Thrift.Protocol +{ + /** + * TMultiplexedProtocol is a protocol-independent concrete decorator that allows a Thrift + * client to communicate with a multiplexing Thrift server, by prepending the service name + * to the function name during function calls. + * + * NOTE: THIS IS NOT TO BE USED BY SERVERS. + * On the server, use TMultiplexedProcessor to handle requests from a multiplexing client. + * + * This example uses a single socket transport to invoke two services: + * + * TSocketTransport transport = new TSocketTransport("localhost", 9090); + * transport.open(); + * + * TBinaryProtocol protocol = new TBinaryProtocol(transport); + * + * TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator"); + * Calculator.Client service = new Calculator.Client(mp); + * + * TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport"); + * WeatherReport.Client service2 = new WeatherReport.Client(mp2); + * + * System.out.println(service.add(2,2)); + * System.out.println(service2.getTemperature()); + * + */ + + //TODO: implementation of TProtocol + + // ReSharper disable once InconsistentNaming + public class TMultiplexedProtocol : TProtocolDecorator + { + /** Used to delimit the service name from the function name */ + public const string Separator = ":"; + + private readonly string _serviceName; + + /** + * Wrap the specified protocol, allowing it to be used to communicate with a + * multiplexing server. The <code>serviceName</code> is required as it is + * prepended to the message header so that the multiplexing server can broker + * the function call to the proper service. + * + * Args: + * protocol Your communication protocol of choice, e.g. TBinaryProtocol + * serviceName The service name of the service communicating via this protocol. + */ + + public TMultiplexedProtocol(TProtocol protocol, string serviceName) + : base(protocol) + { + _serviceName = serviceName; + } + + public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) + { + switch (message.Type) + { + case TMessageType.Call: + case TMessageType.Oneway: + await base.WriteMessageBeginAsync(new TMessage($"{_serviceName}{Separator}{message.Name}", message.Type, message.SeqID), cancellationToken); + break; + default: + await base.WriteMessageBeginAsync(message, cancellationToken); + break; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocol.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocol.cs new file mode 100644 index 000000000..75edb11d1 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocol.cs @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; +using Thrift.Transport; + +namespace Thrift.Protocol +{ + // ReSharper disable once InconsistentNaming + public abstract class TProtocol : IDisposable + { + public const int DefaultRecursionDepth = 64; + private bool _isDisposed; + protected int RecursionDepth; + + protected TTransport Trans; + + protected TProtocol(TTransport trans) + { + Trans = trans; + RecursionLimit = DefaultRecursionDepth; + RecursionDepth = 0; + } + + public TTransport Transport => Trans; + + protected int RecursionLimit { get; set; } + + public void Dispose() + { + Dispose(true); + } + + public void IncrementRecursionDepth() + { + if (RecursionDepth < RecursionLimit) + { + ++RecursionDepth; + } + else + { + throw new TProtocolException(TProtocolException.DEPTH_LIMIT, "Depth limit exceeded"); + } + } + + public void DecrementRecursionDepth() + { + --RecursionDepth; + } + + protected virtual void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + (Trans as IDisposable)?.Dispose(); + } + } + _isDisposed = true; + } + + public virtual async Task WriteMessageBeginAsync(TMessage message) + { + await WriteMessageBeginAsync(message, CancellationToken.None); + } + + public abstract Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken); + + public virtual async Task WriteMessageEndAsync() + { + await WriteMessageEndAsync(CancellationToken.None); + } + + public abstract Task WriteMessageEndAsync(CancellationToken cancellationToken); + + public virtual async Task WriteStructBeginAsync(TStruct @struct) + { + await WriteStructBeginAsync(@struct, CancellationToken.None); + } + + public abstract Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken); + + public virtual async Task WriteStructEndAsync() + { + await WriteStructEndAsync(CancellationToken.None); + } + + public abstract Task WriteStructEndAsync(CancellationToken cancellationToken); + + public virtual async Task WriteFieldBeginAsync(TField field) + { + await WriteFieldBeginAsync(field, CancellationToken.None); + } + + public abstract Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken); + + public virtual async Task WriteFieldEndAsync() + { + await WriteFieldEndAsync(CancellationToken.None); + } + + public abstract Task WriteFieldEndAsync(CancellationToken cancellationToken); + + public virtual async Task WriteFieldStopAsync() + { + await WriteFieldStopAsync(CancellationToken.None); + } + + public abstract Task WriteFieldStopAsync(CancellationToken cancellationToken); + + public virtual async Task WriteMapBeginAsync(TMap map) + { + await WriteMapBeginAsync(map, CancellationToken.None); + } + + public abstract Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken); + + public virtual async Task WriteMapEndAsync() + { + await WriteMapEndAsync(CancellationToken.None); + } + + public abstract Task WriteMapEndAsync(CancellationToken cancellationToken); + + public virtual async Task WriteListBeginAsync(TList list) + { + await WriteListBeginAsync(list, CancellationToken.None); + } + + public abstract Task WriteListBeginAsync(TList list, CancellationToken cancellationToken); + + public virtual async Task WriteListEndAsync() + { + await WriteListEndAsync(CancellationToken.None); + } + + public abstract Task WriteListEndAsync(CancellationToken cancellationToken); + + public virtual async Task WriteSetBeginAsync(TSet set) + { + await WriteSetBeginAsync(set, CancellationToken.None); + } + + public abstract Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken); + + public virtual async Task WriteSetEndAsync() + { + await WriteSetEndAsync(CancellationToken.None); + } + + public abstract Task WriteSetEndAsync(CancellationToken cancellationToken); + + public virtual async Task WriteBoolAsync(bool b) + { + await WriteBoolAsync(b, CancellationToken.None); + } + + public abstract Task WriteBoolAsync(bool b, CancellationToken cancellationToken); + + public virtual async Task WriteByteAsync(sbyte b) + { + await WriteByteAsync(b, CancellationToken.None); + } + + public abstract Task WriteByteAsync(sbyte b, CancellationToken cancellationToken); + + public virtual async Task WriteI16Async(short i16) + { + await WriteI16Async(i16, CancellationToken.None); + } + + public abstract Task WriteI16Async(short i16, CancellationToken cancellationToken); + + public virtual async Task WriteI32Async(int i32) + { + await WriteI32Async(i32, CancellationToken.None); + } + + public abstract Task WriteI32Async(int i32, CancellationToken cancellationToken); + + public virtual async Task WriteI64Async(long i64) + { + await WriteI64Async(i64, CancellationToken.None); + } + + public abstract Task WriteI64Async(long i64, CancellationToken cancellationToken); + + public virtual async Task WriteDoubleAsync(double d) + { + await WriteDoubleAsync(d, CancellationToken.None); + } + + public abstract Task WriteDoubleAsync(double d, CancellationToken cancellationToken); + + public virtual async Task WriteStringAsync(string s) + { + await WriteStringAsync(s, CancellationToken.None); + } + + public virtual async Task WriteStringAsync(string s, CancellationToken cancellationToken) + { + var bytes = Encoding.UTF8.GetBytes(s); + await WriteBinaryAsync(bytes, cancellationToken); + } + + public virtual async Task WriteBinaryAsync(byte[] bytes) + { + await WriteBinaryAsync(bytes, CancellationToken.None); + } + + public abstract Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken); + + public virtual async ValueTask<TMessage> ReadMessageBeginAsync() + { + return await ReadMessageBeginAsync(CancellationToken.None); + } + + public abstract ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken); + + public virtual async Task ReadMessageEndAsync() + { + await ReadMessageEndAsync(CancellationToken.None); + } + + public abstract Task ReadMessageEndAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<TStruct> ReadStructBeginAsync() + { + return await ReadStructBeginAsync(CancellationToken.None); + } + + public abstract ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken); + + public virtual async Task ReadStructEndAsync() + { + await ReadStructEndAsync(CancellationToken.None); + } + + public abstract Task ReadStructEndAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<TField> ReadFieldBeginAsync() + { + return await ReadFieldBeginAsync(CancellationToken.None); + } + + public abstract ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken); + + public virtual async Task ReadFieldEndAsync() + { + await ReadFieldEndAsync(CancellationToken.None); + } + + public abstract Task ReadFieldEndAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<TMap> ReadMapBeginAsync() + { + return await ReadMapBeginAsync(CancellationToken.None); + } + + public abstract ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken); + + public virtual async Task ReadMapEndAsync() + { + await ReadMapEndAsync(CancellationToken.None); + } + + public abstract Task ReadMapEndAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<TList> ReadListBeginAsync() + { + return await ReadListBeginAsync(CancellationToken.None); + } + + public abstract ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken); + + public virtual async Task ReadListEndAsync() + { + await ReadListEndAsync(CancellationToken.None); + } + + public abstract Task ReadListEndAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<TSet> ReadSetBeginAsync() + { + return await ReadSetBeginAsync(CancellationToken.None); + } + + public abstract ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken); + + public virtual async Task ReadSetEndAsync() + { + await ReadSetEndAsync(CancellationToken.None); + } + + public abstract Task ReadSetEndAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<bool> ReadBoolAsync() + { + return await ReadBoolAsync(CancellationToken.None); + } + + public abstract ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<sbyte> ReadByteAsync() + { + return await ReadByteAsync(CancellationToken.None); + } + + public abstract ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<short> ReadI16Async() + { + return await ReadI16Async(CancellationToken.None); + } + + public abstract ValueTask<short> ReadI16Async(CancellationToken cancellationToken); + + public virtual async ValueTask<int> ReadI32Async() + { + return await ReadI32Async(CancellationToken.None); + } + + public abstract ValueTask<int> ReadI32Async(CancellationToken cancellationToken); + + public virtual async ValueTask<long> ReadI64Async() + { + return await ReadI64Async(CancellationToken.None); + } + + public abstract ValueTask<long> ReadI64Async(CancellationToken cancellationToken); + + public virtual async ValueTask<double> ReadDoubleAsync() + { + return await ReadDoubleAsync(CancellationToken.None); + } + + public abstract ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken); + + public virtual async ValueTask<string> ReadStringAsync() + { + return await ReadStringAsync(CancellationToken.None); + } + + public virtual async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) + { + var buf = await ReadBinaryAsync(cancellationToken); + return Encoding.UTF8.GetString(buf, 0, buf.Length); + } + + public virtual async ValueTask<byte[]> ReadBinaryAsync() + { + return await ReadBinaryAsync(CancellationToken.None); + } + + public abstract ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs new file mode 100644 index 000000000..845c82749 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; + +namespace Thrift.Protocol +{ + // ReSharper disable once InconsistentNaming + /// <summary> + /// TProtocolDecorator forwards all requests to an enclosed TProtocol instance, + /// providing a way to author concise concrete decorator subclasses.While it has + /// no abstract methods, it is marked abstract as a reminder that by itself, + /// it does not modify the behaviour of the enclosed TProtocol. + /// </summary> + public abstract class TProtocolDecorator : TProtocol + { + private readonly TProtocol _wrappedProtocol; + + protected TProtocolDecorator(TProtocol protocol) + : base(protocol.Transport) + { + _wrappedProtocol = protocol ?? throw new ArgumentNullException(nameof(protocol)); + } + + public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteMessageBeginAsync(message, cancellationToken); + } + + public override async Task WriteMessageEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteMessageEndAsync(cancellationToken); + } + + public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteStructBeginAsync(@struct, cancellationToken); + } + + public override async Task WriteStructEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteStructEndAsync(cancellationToken); + } + + public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteFieldBeginAsync(field, cancellationToken); + } + + public override async Task WriteFieldEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteFieldEndAsync(cancellationToken); + } + + public override async Task WriteFieldStopAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteFieldStopAsync(cancellationToken); + } + + public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteMapBeginAsync(map, cancellationToken); + } + + public override async Task WriteMapEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteMapEndAsync(cancellationToken); + } + + public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteListBeginAsync(list, cancellationToken); + } + + public override async Task WriteListEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteListEndAsync(cancellationToken); + } + + public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteSetBeginAsync(set, cancellationToken); + } + + public override async Task WriteSetEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteSetEndAsync(cancellationToken); + } + + public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteBoolAsync(b, cancellationToken); + } + + public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteByteAsync(b, cancellationToken); + } + + public override async Task WriteI16Async(short i16, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteI16Async(i16, cancellationToken); + } + + public override async Task WriteI32Async(int i32, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteI32Async(i32, cancellationToken); + } + + public override async Task WriteI64Async(long i64, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteI64Async(i64, cancellationToken); + } + + public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteDoubleAsync(d, cancellationToken); + } + + public override async Task WriteStringAsync(string s, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteStringAsync(s, cancellationToken); + } + + public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken) + { + await _wrappedProtocol.WriteBinaryAsync(bytes, cancellationToken); + } + + public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadMessageBeginAsync(cancellationToken); + } + + public override async Task ReadMessageEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.ReadMessageEndAsync(cancellationToken); + } + + public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadStructBeginAsync(cancellationToken); + } + + public override async Task ReadStructEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.ReadStructEndAsync(cancellationToken); + } + + public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadFieldBeginAsync(cancellationToken); + } + + public override async Task ReadFieldEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.ReadFieldEndAsync(cancellationToken); + } + + public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadMapBeginAsync(cancellationToken); + } + + public override async Task ReadMapEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.ReadMapEndAsync(cancellationToken); + } + + public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadListBeginAsync(cancellationToken); + } + + public override async Task ReadListEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.ReadListEndAsync(cancellationToken); + } + + public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadSetBeginAsync(cancellationToken); + } + + public override async Task ReadSetEndAsync(CancellationToken cancellationToken) + { + await _wrappedProtocol.ReadSetEndAsync(cancellationToken); + } + + public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadBoolAsync(cancellationToken); + } + + public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadByteAsync(cancellationToken); + } + + public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadI16Async(cancellationToken); + } + + public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadI32Async(cancellationToken); + } + + public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadI64Async(cancellationToken); + } + + public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadDoubleAsync(cancellationToken); + } + + public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadStringAsync(cancellationToken); + } + + public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken) + { + return await _wrappedProtocol.ReadBinaryAsync(cancellationToken); + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolException.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolException.cs new file mode 100644 index 000000000..328babd05 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolException.cs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// ReSharper disable InconsistentNaming +using System; + +namespace Thrift.Protocol +{ + public class TProtocolException : TException + { + // do not rename public constants - they used in generated files + public const int UNKNOWN = 0; + public const int INVALID_DATA = 1; + public const int NEGATIVE_SIZE = 2; + public const int SIZE_LIMIT = 3; + public const int BAD_VERSION = 4; + public const int NOT_IMPLEMENTED = 5; + public const int DEPTH_LIMIT = 6; + + protected int Type = UNKNOWN; + + public TProtocolException() + { + } + + public TProtocolException(int type, Exception inner = null) + : base(string.Empty, inner) + { + Type = type; + } + + public TProtocolException(int type, string message, Exception inner = null) + : base(message, inner) + { + Type = type; + } + + public TProtocolException(string message, Exception inner = null) + : base(message, inner) + { + } + + public int GetExceptionType() + { + return Type; + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolFactory.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolFactory.cs new file mode 100644 index 000000000..31b05148b --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/TProtocolFactory.cs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Thrift.Transport; + +namespace Thrift.Protocol +{ + // ReSharper disable once InconsistentNaming + public abstract class TProtocolFactory + { + public abstract TProtocol GetProtocol(TTransport trans); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TBase64Utils.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TBase64Utils.cs new file mode 100644 index 000000000..90b8f8867 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TBase64Utils.cs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Thrift.Protocol.Utilities +{ + // ReSharper disable once InconsistentNaming + internal static class TBase64Utils + { + //TODO: Constants + //TODO: Check for args + //TODO: Unitests + + internal const string EncodeTable = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + private static readonly int[] DecodeTable = + { + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 62, -1, -1, -1, 63, + 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, + -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, + 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1, -1, -1, -1, -1, + -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 + }; + + internal static void Encode(byte[] src, int srcOff, int len, byte[] dst, int dstOff) + { + if (src == null) + { + throw new ArgumentNullException(nameof(src)); + } + + dst[dstOff] = (byte) EncodeTable[(src[srcOff] >> 2) & 0x3F]; + + if (len == 3) + { + dst[dstOff + 1] = (byte) EncodeTable[((src[srcOff] << 4) & 0x30) | ((src[srcOff + 1] >> 4) & 0x0F)]; + dst[dstOff + 2] = (byte) EncodeTable[((src[srcOff + 1] << 2) & 0x3C) | ((src[srcOff + 2] >> 6) & 0x03)]; + dst[dstOff + 3] = (byte) EncodeTable[src[srcOff + 2] & 0x3F]; + } + else if (len == 2) + { + dst[dstOff + 1] = (byte) EncodeTable[((src[srcOff] << 4) & 0x30) | ((src[srcOff + 1] >> 4) & 0x0F)]; + dst[dstOff + 2] = (byte) EncodeTable[(src[srcOff + 1] << 2) & 0x3C]; + } + else + { + // len == 1 + dst[dstOff + 1] = (byte) EncodeTable[(src[srcOff] << 4) & 0x30]; + } + } + + internal static void Decode(byte[] src, int srcOff, int len, byte[] dst, int dstOff) + { + if (src == null) + { + throw new ArgumentNullException(nameof(src)); + } + + dst[dstOff] = (byte) ((DecodeTable[src[srcOff] & 0x0FF] << 2) | (DecodeTable[src[srcOff + 1] & 0x0FF] >> 4)); + + if (len > 2) + { + dst[dstOff + 1] = + (byte) + (((DecodeTable[src[srcOff + 1] & 0x0FF] << 4) & 0xF0) | (DecodeTable[src[srcOff + 2] & 0x0FF] >> 2)); + if (len > 3) + { + dst[dstOff + 2] = + (byte) + (((DecodeTable[src[srcOff + 2] & 0x0FF] << 6) & 0xC0) | DecodeTable[src[srcOff + 3] & 0x0FF]); + } + } + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TJsonProtocolConstants.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TJsonProtocolConstants.cs new file mode 100644 index 000000000..6cc1302e9 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TJsonProtocolConstants.cs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Protocol.Utilities +{ + // ReSharper disable once InconsistentNaming + public static class TJSONProtocolConstants + { + //TODO Check for performance for reusing ImmutableArray from System.Collections.Immutable (https://blogs.msdn.microsoft.com/dotnet/2013/06/24/please-welcome-immutablearrayt/) + // can be possible to get better performance and also better GC + + public static readonly byte[] Comma = {(byte) ','}; + public static readonly byte[] Colon = {(byte) ':'}; + public static readonly byte[] LeftBrace = {(byte) '{'}; + public static readonly byte[] RightBrace = {(byte) '}'}; + public static readonly byte[] LeftBracket = {(byte) '['}; + public static readonly byte[] RightBracket = {(byte) ']'}; + public static readonly byte[] Quote = {(byte) '"'}; + public static readonly byte[] Backslash = {(byte) '\\'}; + + public static readonly byte[] JsonCharTable = + { + 0, 0, 0, 0, 0, 0, 0, 0, (byte) 'b', (byte) 't', (byte) 'n', 0, (byte) 'f', (byte) 'r', 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 1, (byte) '"', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 + }; + + public static readonly char[] EscapeChars = "\"\\/bfnrt".ToCharArray(); + public static readonly byte[] EscapeCharValues = {(byte) '"', (byte) '\\', (byte) '/', (byte) '\b', (byte) '\f', (byte) '\n', (byte) '\r', (byte) '\t'}; + public static readonly byte[] EscSequences = {(byte) '\\', (byte) 'u', (byte) '0', (byte) '0'}; + + public static class TypeNames + { + public static readonly byte[] NameBool = { (byte)'t', (byte)'f' }; + public static readonly byte[] NameByte = { (byte)'i', (byte)'8' }; + public static readonly byte[] NameI16 = { (byte)'i', (byte)'1', (byte)'6' }; + public static readonly byte[] NameI32 = { (byte)'i', (byte)'3', (byte)'2' }; + public static readonly byte[] NameI64 = { (byte)'i', (byte)'6', (byte)'4' }; + public static readonly byte[] NameDouble = { (byte)'d', (byte)'b', (byte)'l' }; + public static readonly byte[] NameStruct = { (byte)'r', (byte)'e', (byte)'c' }; + public static readonly byte[] NameString = { (byte)'s', (byte)'t', (byte)'r' }; + public static readonly byte[] NameMap = { (byte)'m', (byte)'a', (byte)'p' }; + public static readonly byte[] NameList = { (byte)'l', (byte)'s', (byte)'t' }; + public static readonly byte[] NameSet = { (byte)'s', (byte)'e', (byte)'t' }; + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TJsonProtocolHelper.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TJsonProtocolHelper.cs new file mode 100644 index 000000000..ff49ebe24 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TJsonProtocolHelper.cs @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Thrift.Protocol.Entities; + +namespace Thrift.Protocol.Utilities +{ + // ReSharper disable once InconsistentNaming + public static class TJSONProtocolHelper + { + public static byte[] GetTypeNameForTypeId(TType typeId) + { + switch (typeId) + { + case TType.Bool: + return TJSONProtocolConstants.TypeNames.NameBool; + case TType.Byte: + return TJSONProtocolConstants.TypeNames.NameByte; + case TType.I16: + return TJSONProtocolConstants.TypeNames.NameI16; + case TType.I32: + return TJSONProtocolConstants.TypeNames.NameI32; + case TType.I64: + return TJSONProtocolConstants.TypeNames.NameI64; + case TType.Double: + return TJSONProtocolConstants.TypeNames.NameDouble; + case TType.String: + return TJSONProtocolConstants.TypeNames.NameString; + case TType.Struct: + return TJSONProtocolConstants.TypeNames.NameStruct; + case TType.Map: + return TJSONProtocolConstants.TypeNames.NameMap; + case TType.Set: + return TJSONProtocolConstants.TypeNames.NameSet; + case TType.List: + return TJSONProtocolConstants.TypeNames.NameList; + default: + throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, "Unrecognized exType"); + } + } + + public static TType GetTypeIdForTypeName(byte[] name) + { + var result = TType.Stop; + if (name.Length > 1) + { + switch (name[0]) + { + case (byte) 'd': + result = TType.Double; + break; + case (byte) 'i': + switch (name[1]) + { + case (byte) '8': + result = TType.Byte; + break; + case (byte) '1': + result = TType.I16; + break; + case (byte) '3': + result = TType.I32; + break; + case (byte) '6': + result = TType.I64; + break; + } + break; + case (byte) 'l': + result = TType.List; + break; + case (byte) 'm': + result = TType.Map; + break; + case (byte) 'r': + result = TType.Struct; + break; + case (byte) 's': + if (name[1] == (byte) 't') + { + result = TType.String; + } + else if (name[1] == (byte) 'e') + { + result = TType.Set; + } + break; + case (byte) 't': + result = TType.Bool; + break; + } + } + if (result == TType.Stop) + { + throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, "Unrecognized exType"); + } + return result; + } + + /// <summary> + /// Return true if the given byte could be a valid part of a JSON number. + /// </summary> + public static bool IsJsonNumeric(byte b) + { + switch (b) + { + case (byte)'+': + case (byte)'-': + case (byte)'.': + case (byte)'0': + case (byte)'1': + case (byte)'2': + case (byte)'3': + case (byte)'4': + case (byte)'5': + case (byte)'6': + case (byte)'7': + case (byte)'8': + case (byte)'9': + case (byte)'E': + case (byte)'e': + return true; + default: + return false; + } + } + + /// <summary> + /// Convert a byte containing a hex char ('0'-'9' or 'a'-'f') into its + /// corresponding hex value + /// </summary> + public static byte ToHexVal(byte ch) + { + if (ch >= '0' && ch <= '9') + { + return (byte)((char)ch - '0'); + } + + if (ch >= 'a' && ch <= 'f') + { + ch += 10; + return (byte)((char)ch - 'a'); + } + + throw new TProtocolException(TProtocolException.INVALID_DATA, "Expected hex character"); + } + + /// <summary> + /// Convert a byte containing a hex value to its corresponding hex character + /// </summary> + public static byte ToHexChar(byte val) + { + val &= 0x0F; + if (val < 10) + { + return (byte)((char)val + '0'); + } + val -= 10; + return (byte)((char)val + 'a'); + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TProtocolUtil.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TProtocolUtil.cs new file mode 100644 index 000000000..18f92d816 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Protocol/Utilities/TProtocolUtil.cs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol.Entities; + +namespace Thrift.Protocol.Utilities +{ + // ReSharper disable once InconsistentNaming + public static class TProtocolUtil + { + public static async Task SkipAsync(TProtocol protocol, TType type, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + protocol.IncrementRecursionDepth(); + try + { + switch (type) + { + case TType.Bool: + await protocol.ReadBoolAsync(cancellationToken); + break; + case TType.Byte: + await protocol.ReadByteAsync(cancellationToken); + break; + case TType.I16: + await protocol.ReadI16Async(cancellationToken); + break; + case TType.I32: + await protocol.ReadI32Async(cancellationToken); + break; + case TType.I64: + await protocol.ReadI64Async(cancellationToken); + break; + case TType.Double: + await protocol.ReadDoubleAsync(cancellationToken); + break; + case TType.String: + // Don't try to decode the string, just skip it. + await protocol.ReadBinaryAsync(cancellationToken); + break; + case TType.Struct: + await protocol.ReadStructBeginAsync(cancellationToken); + while (true) + { + var field = await protocol.ReadFieldBeginAsync(cancellationToken); + if (field.Type == TType.Stop) + { + break; + } + await SkipAsync(protocol, field.Type, cancellationToken); + await protocol.ReadFieldEndAsync(cancellationToken); + } + await protocol.ReadStructEndAsync(cancellationToken); + break; + case TType.Map: + var map = await protocol.ReadMapBeginAsync(cancellationToken); + for (var i = 0; i < map.Count; i++) + { + await SkipAsync(protocol, map.KeyType, cancellationToken); + await SkipAsync(protocol, map.ValueType, cancellationToken); + } + await protocol.ReadMapEndAsync(cancellationToken); + break; + case TType.Set: + var set = await protocol.ReadSetBeginAsync(cancellationToken); + for (var i = 0; i < set.Count; i++) + { + await SkipAsync(protocol, set.ElementType, cancellationToken); + } + await protocol.ReadSetEndAsync(cancellationToken); + break; + case TType.List: + var list = await protocol.ReadListBeginAsync(cancellationToken); + for (var i = 0; i < list.Count; i++) + { + await SkipAsync(protocol, list.ElementType, cancellationToken); + } + await protocol.ReadListEndAsync(cancellationToken); + break; + default: + throw new TProtocolException(TProtocolException.INVALID_DATA, "Unknown data type " + type.ToString("d")); + } + } + finally + { + protocol.DecrementRecursionDepth(); + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs new file mode 100644 index 000000000..f40f2b7e7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServer.cs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Thrift.Protocol; +using Thrift.Transport; +using Thrift.Processor; + +namespace Thrift.Server +{ + // ReSharper disable once InconsistentNaming + public abstract class TServer + { + protected readonly ILogger Logger; + protected TProtocolFactory InputProtocolFactory; + protected TTransportFactory InputTransportFactory; + protected ITProcessorFactory ProcessorFactory; + protected TProtocolFactory OutputProtocolFactory; + protected TTransportFactory OutputTransportFactory; + + protected TServerEventHandler ServerEventHandler; + protected TServerTransport ServerTransport; + + protected TServer(ITProcessorFactory processorFactory, TServerTransport serverTransport, + TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, + ILogger logger = null) + { + ProcessorFactory = processorFactory ?? throw new ArgumentNullException(nameof(processorFactory)); + ServerTransport = serverTransport; + InputTransportFactory = inputTransportFactory ?? new TTransportFactory(); + OutputTransportFactory = outputTransportFactory ?? new TTransportFactory(); + InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory)); + OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory)); + Logger = logger; // null is absolutely legal + } + + public void SetEventHandler(TServerEventHandler seh) + { + ServerEventHandler = seh; + } + + public TServerEventHandler GetEventHandler() + { + return ServerEventHandler; + } + + // Log delegation? deprecated, use ILogger + protected void LogError( string msg) + { + if (Logger != null) + Logger.LogError(msg); + } + + public abstract void Stop(); + + public virtual void Start() + { + // do nothing + } + + public virtual async Task ServeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs new file mode 100644 index 000000000..69314efd6 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TServerEventHandler.cs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; +using Thrift.Transport; + +namespace Thrift.Server +{ + //TODO: replacement by event? + + /// <summary> + /// Interface implemented by server users to handle events from the server + /// </summary> + // ReSharper disable once InconsistentNaming + public interface TServerEventHandler + { + /// <summary> + /// Called before the server begins */ + /// </summary> + Task PreServeAsync(CancellationToken cancellationToken); + + /// <summary> + /// Called when a new client has connected and is about to being processing */ + /// </summary> + Task<object> CreateContextAsync(TProtocol input, TProtocol output, CancellationToken cancellationToken); + + /// <summary> + /// Called when a client has finished request-handling to delete server context */ + /// </summary> + Task DeleteContextAsync(object serverContext, TProtocol input, TProtocol output, + CancellationToken cancellationToken); + + /// <summary> + /// Called when a client is about to call the processor */ + /// </summary> + Task ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs new file mode 100644 index 000000000..bdaa3489c --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Thrift.Protocol; +using Thrift.Processor; +using Thrift.Transport; + +namespace Thrift.Server +{ + //TODO: unhandled exceptions, etc. + + // ReSharper disable once InconsistentNaming + public class TSimpleAsyncServer : TServer + { + private readonly int _clientWaitingDelay; + private volatile Task _serverTask; + + public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + ILogger logger, + int clientWaitingDelay = 10) + : base(itProcessorFactory, + serverTransport, + inputTransportFactory, + outputTransportFactory, + inputProtocolFactory, + outputProtocolFactory, + logger) + { + _clientWaitingDelay = clientWaitingDelay; + } + + public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + ILoggerFactory loggerFactory, + int clientWaitingDelay = 10) + : this(itProcessorFactory, + serverTransport, + inputTransportFactory, + outputTransportFactory, + inputProtocolFactory, + outputProtocolFactory, + loggerFactory.CreateLogger<TSimpleAsyncServer>()) + { + } + + public TSimpleAsyncServer(ITAsyncProcessor processor, + TServerTransport serverTransport, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + ILoggerFactory loggerFactory, + int clientWaitingDelay = 10) + : this(new TSingletonProcessorFactory(processor), + serverTransport, + null, // defaults to TTransportFactory() + null, // defaults to TTransportFactory() + inputProtocolFactory, + outputProtocolFactory, + loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)), + clientWaitingDelay) + { + } + + public override async Task ServeAsync(CancellationToken cancellationToken) + { + try + { + // cancelation token + _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning); + await _serverTask; + } + catch (Exception ex) + { + Logger.LogError(ex.ToString()); + } + } + + private async Task StartListening(CancellationToken cancellationToken) + { + ServerTransport.Listen(); + + Logger.LogTrace("Started listening at server"); + + if (ServerEventHandler != null) + { + await ServerEventHandler.PreServeAsync(cancellationToken); + } + + while (!cancellationToken.IsCancellationRequested) + { + if (ServerTransport.IsClientPending()) + { + Logger.LogTrace("Waiting for client connection"); + + try + { + var client = await ServerTransport.AcceptAsync(cancellationToken); + await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken); + } + catch (TTransportException ttx) + { + Logger.LogTrace($"Transport exception: {ttx}"); + + if (ttx.Type != TTransportException.ExceptionType.Interrupted) + { + Logger.LogError(ttx.ToString()); + } + } + } + else + { + try + { + await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken); + } + catch (TaskCanceledException) { } + } + } + + ServerTransport.Close(); + + Logger.LogTrace("Completed listening at server"); + } + + public override void Stop() + { + } + + private async Task Execute(TTransport client, CancellationToken cancellationToken) + { + Logger.LogTrace("Started client request processing"); + + var processor = ProcessorFactory.GetAsyncProcessor(client, this); + + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + object connectionContext = null; + + try + { + try + { + inputTransport = InputTransportFactory.GetTransport(client); + outputTransport = OutputTransportFactory.GetTransport(client); + inputProtocol = InputProtocolFactory.GetProtocol(inputTransport); + outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport); + + if (ServerEventHandler != null) + { + connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken); + } + + while (!cancellationToken.IsCancellationRequested) + { + if (!await inputTransport.PeekAsync(cancellationToken)) + { + break; + } + + if (ServerEventHandler != null) + { + await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken); + } + + if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken)) + { + break; + } + } + } + catch (TTransportException ttx) + { + Logger.LogTrace($"Transport exception: {ttx}"); + } + catch (Exception x) + { + Logger.LogError($"Error: {x}"); + } + + if (ServerEventHandler != null) + { + await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken); + } + + } + finally + { + //Close transports + inputTransport?.Close(); + outputTransport?.Close(); + + // disposable stuff should be disposed + inputProtocol?.Dispose(); + outputProtocol?.Dispose(); + inputTransport?.Dispose(); + outputTransport?.Dispose(); + } + + Logger.LogTrace("Completed client request processing"); + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs new file mode 100644 index 000000000..20e659d3a --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Contains some contributions under the Thrift Software License. + * Please see doc/old-thrift-license.txt in the Thrift distribution for + * details. + */ + +using System; +using System.Threading; +using Thrift.Protocol; +using Thrift.Transport; +using Thrift.Processor; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Thrift.Server +{ + /// <summary> + /// Server that uses C# built-in ThreadPool to spawn threads when handling requests. + /// </summary> + public class TThreadPoolAsyncServer : TServer + { + private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults + private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults + private volatile bool stop = false; + + private CancellationToken ServerCancellationToken; + + public struct Configuration + { + public int MinWorkerThreads; + public int MaxWorkerThreads; + public int MinIOThreads; + public int MaxIOThreads; + + public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS) + { + MinWorkerThreads = min; + MaxWorkerThreads = max; + MinIOThreads = min; + MaxIOThreads = max; + } + + public Configuration(int minWork, int maxWork, int minIO, int maxIO) + { + MinWorkerThreads = minWork; + MaxWorkerThreads = maxWork; + MinIOThreads = minIO; + MaxIOThreads = maxIO; + } + } + + public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null) + : this(new TSingletonProcessorFactory(processor), serverTransport, + null, null, // defaults to TTransportFactory() + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + new Configuration(), logger) + { + } + + public TThreadPoolAsyncServer(ITAsyncProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(new TSingletonProcessorFactory(processor), serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + new Configuration()) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + new Configuration()) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null) + : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + new Configuration(minThreadPoolThreads, maxThreadPoolThreads), + logger) + { + } + + public TThreadPoolAsyncServer(ITProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Configuration threadConfig, + ILogger logger = null) + : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, logger) + { + lock (typeof(TThreadPoolAsyncServer)) + { + if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0)) + { + int work, comm; + ThreadPool.GetMaxThreads(out work, out comm); + if (threadConfig.MaxWorkerThreads > 0) + work = threadConfig.MaxWorkerThreads; + if (threadConfig.MaxIOThreads > 0) + comm = threadConfig.MaxIOThreads; + if (!ThreadPool.SetMaxThreads(work, comm)) + throw new Exception("Error: could not SetMaxThreads in ThreadPool"); + } + + if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0)) + { + int work, comm; + ThreadPool.GetMinThreads(out work, out comm); + if (threadConfig.MinWorkerThreads > 0) + work = threadConfig.MinWorkerThreads; + if (threadConfig.MinIOThreads > 0) + comm = threadConfig.MinIOThreads; + if (!ThreadPool.SetMinThreads(work, comm)) + throw new Exception("Error: could not SetMinThreads in ThreadPool"); + } + } + } + + + /// <summary> + /// Use new ThreadPool thread for each new client connection. + /// </summary> + public override async Task ServeAsync(CancellationToken cancellationToken) + { + ServerCancellationToken = cancellationToken; + try + { + try + { + ServerTransport.Listen(); + } + catch (TTransportException ttx) + { + LogError("Error, could not listen on ServerTransport: " + ttx); + return; + } + + //Fire the preServe server event when server is up but before any client connections + if (ServerEventHandler != null) + await ServerEventHandler.PreServeAsync(cancellationToken); + + while (!stop) + { + int failureCount = 0; + try + { + TTransport client = await ServerTransport.AcceptAsync(cancellationToken); + ThreadPool.QueueUserWorkItem(this.Execute, client); + } + catch (TTransportException ttx) + { + if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted) + { + ++failureCount; + LogError(ttx.ToString()); + } + + } + } + + if (stop) + { + try + { + ServerTransport.Close(); + } + catch (TTransportException ttx) + { + LogError("TServerTransport failed on close: " + ttx.Message); + } + stop = false; + } + + } + finally + { + ServerCancellationToken = default(CancellationToken); + } + } + + /// <summary> + /// Loops on processing a client forever + /// threadContext will be a TTransport instance + /// </summary> + /// <param name="threadContext"></param> + private void Execute(object threadContext) + { + var cancellationToken = ServerCancellationToken; + + using (TTransport client = (TTransport)threadContext) + { + ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this); + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + object connectionContext = null; + try + { + try + { + inputTransport = InputTransportFactory.GetTransport(client); + outputTransport = OutputTransportFactory.GetTransport(client); + inputProtocol = InputProtocolFactory.GetProtocol(inputTransport); + outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport); + + //Recover event handler (if any) and fire createContext server event when a client connects + if (ServerEventHandler != null) + connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result; + + //Process client requests until client disconnects + while (!stop) + { + if (! inputTransport.PeekAsync(cancellationToken).Result) + break; + + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. + if (ServerEventHandler != null) + ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait(); + //Process client request (blocks until transport is readable) + if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result) + break; + } + } + catch (TTransportException) + { + //Usually a client disconnect, expected + } + catch (Exception x) + { + //Unexpected + LogError("Error: " + x); + } + + //Fire deleteContext server event after client disconnects + if (ServerEventHandler != null) + ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait(); + + } + finally + { + //Close transports + inputTransport?.Close(); + outputTransport?.Close(); + + // disposable stuff should be disposed + inputProtocol?.Dispose(); + outputProtocol?.Dispose(); + inputTransport?.Dispose(); + outputTransport?.Dispose(); + } + } + } + + public override void Stop() + { + stop = true; + ServerTransport?.Close(); + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/TApplicationException.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/TApplicationException.cs new file mode 100644 index 000000000..67ac2f8c7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/TApplicationException.cs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; +using Thrift.Protocol.Entities; +using Thrift.Protocol.Utilities; + +namespace Thrift +{ + // ReSharper disable once InconsistentNaming + public class TApplicationException : TException + { + public enum ExceptionType + { + Unknown, + UnknownMethod, + InvalidMessageType, + WrongMethodName, + BadSequenceId, + MissingResult, + InternalError, + ProtocolError, + InvalidTransform, + InvalidProtocol, + UnsupportedClientType + } + + private const int MessageTypeFieldId = 1; + private const int ExTypeFieldId = 2; + + public ExceptionType Type { get; private set; } + + public TApplicationException() + { + } + + public TApplicationException(ExceptionType type) + { + Type = type; + } + + public TApplicationException(ExceptionType type, string message) + : base(message, null) // TApplicationException is serializable, but we never serialize InnerException + { + Type = type; + } + + public static async ValueTask<TApplicationException> ReadAsync(TProtocol inputProtocol, CancellationToken cancellationToken) + { + string message = null; + var type = ExceptionType.Unknown; + + await inputProtocol.ReadStructBeginAsync(cancellationToken); + while (true) + { + var field = await inputProtocol.ReadFieldBeginAsync(cancellationToken); + if (field.Type == TType.Stop) + { + break; + } + + switch (field.ID) + { + case MessageTypeFieldId: + if (field.Type == TType.String) + { + message = await inputProtocol.ReadStringAsync(cancellationToken); + } + else + { + await TProtocolUtil.SkipAsync(inputProtocol, field.Type, cancellationToken); + } + break; + case ExTypeFieldId: + if (field.Type == TType.I32) + { + type = (ExceptionType) await inputProtocol.ReadI32Async(cancellationToken); + } + else + { + await TProtocolUtil.SkipAsync(inputProtocol, field.Type, cancellationToken); + } + break; + default: + await TProtocolUtil.SkipAsync(inputProtocol, field.Type, cancellationToken); + break; + } + + await inputProtocol.ReadFieldEndAsync(cancellationToken); + } + + await inputProtocol.ReadStructEndAsync(cancellationToken); + + return new TApplicationException(type, message); + } + + public async Task WriteAsync(TProtocol outputProtocol, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + const string messageTypeFieldName = "message"; + const string exTypeFieldName = "exType"; + const string structApplicationExceptionName = "TApplicationException"; + + var struc = new TStruct(structApplicationExceptionName); + var field = new TField(); + + await outputProtocol.WriteStructBeginAsync(struc, cancellationToken); + + if (!string.IsNullOrEmpty(Message)) + { + field.Name = messageTypeFieldName; + field.Type = TType.String; + field.ID = MessageTypeFieldId; + await outputProtocol.WriteFieldBeginAsync(field, cancellationToken); + await outputProtocol.WriteStringAsync(Message, cancellationToken); + await outputProtocol.WriteFieldEndAsync(cancellationToken); + } + + field.Name = exTypeFieldName; + field.Type = TType.I32; + field.ID = ExTypeFieldId; + + await outputProtocol.WriteFieldBeginAsync(field, cancellationToken); + await outputProtocol.WriteI32Async((int) Type, cancellationToken); + await outputProtocol.WriteFieldEndAsync(cancellationToken); + await outputProtocol.WriteFieldStopAsync(cancellationToken); + await outputProtocol.WriteStructEndAsync(cancellationToken); + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/TBaseClient.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/TBaseClient.cs new file mode 100644 index 000000000..0edac0f08 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/TBaseClient.cs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Protocol; + +namespace Thrift +{ + // ReSharper disable once InconsistentNaming + /// <summary> + /// TBaseClient. + /// Base client for generated clients. + /// Do not change this class without checking generated code (namings, etc.) + /// </summary> + public abstract class TBaseClient + { + private readonly TProtocol _inputProtocol; + private readonly TProtocol _outputProtocol; + private bool _isDisposed; + private int _seqId; + public readonly Guid ClientId = Guid.NewGuid(); + + protected TBaseClient(TProtocol inputProtocol, TProtocol outputProtocol) + { + _inputProtocol = inputProtocol ?? throw new ArgumentNullException(nameof(inputProtocol)); + _outputProtocol = outputProtocol ?? throw new ArgumentNullException(nameof(outputProtocol)); + } + + public TProtocol InputProtocol => _inputProtocol; + + public TProtocol OutputProtocol => _outputProtocol; + + public int SeqId + { + get { return ++_seqId; } + } + + public virtual async Task OpenTransportAsync() + { + await OpenTransportAsync(CancellationToken.None); + } + + public virtual async Task OpenTransportAsync(CancellationToken cancellationToken) + { + if (!_inputProtocol.Transport.IsOpen) + { + await _inputProtocol.Transport.OpenAsync(cancellationToken); + } + + if (!_inputProtocol.Transport.IsOpen) + { + await _outputProtocol.Transport.OpenAsync(cancellationToken); + } + } + + public void Dispose() + { + Dispose(true); + } + + protected virtual void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _inputProtocol?.Dispose(); + _outputProtocol?.Dispose(); + } + } + + _isDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/TException.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/TException.cs new file mode 100644 index 000000000..43e70549b --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/TException.cs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Thrift +{ + // ReSharper disable once InconsistentNaming + public class TException : Exception + { + public TException() + { + } + + public TException(string message, Exception inner) + : base(message, inner) + { + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Thrift.csproj b/src/jaegertracing/thrift/lib/netstd/Thrift/Thrift.csproj new file mode 100644 index 000000000..ceb4409c0 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Thrift.csproj @@ -0,0 +1,59 @@ +<Project Sdk="Microsoft.NET.Sdk"> + <!-- + Licensed to the Apache Software Foundation(ASF) under one + or more contributor license agreements.See the NOTICE file + distributed with this work for additional information + regarding copyright ownership.The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + + <PropertyGroup> + <TargetFramework>netstandard2.0</TargetFramework> + <AssemblyName>Thrift</AssemblyName> + <PackageId>Thrift</PackageId> + <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects> + <GenerateBindingRedirectsOutputType>true</GenerateBindingRedirectsOutputType> + <GenerateAssemblyTitleAttribute>false</GenerateAssemblyTitleAttribute> + <GenerateAssemblyDescriptionAttribute>false</GenerateAssemblyDescriptionAttribute> + <GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute> + <GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute> + <GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute> + <GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute> + <GenerateAssemblyVersionAttribute>false</GenerateAssemblyVersionAttribute> + <GenerateAssemblyFileVersionAttribute>false</GenerateAssemblyFileVersionAttribute> + </PropertyGroup> + + <PropertyGroup> + <AllowUnsafeBlocks>true</AllowUnsafeBlocks> + <SignAssembly>true</SignAssembly> + <AssemblyOriginatorKeyFile>thrift.snk</AssemblyOriginatorKeyFile> + <DelaySign>false</DelaySign> + <Version>0.13.0.0</Version> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" /> + <PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" /> + <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" /> + <PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.2.0" /> + <PackageReference Include="System.IO.Pipes" Version="[4.3,)" /> + <PackageReference Include="System.IO.Pipes.AccessControl" Version="4.5.1" /> + <PackageReference Include="System.Net.Http.WinHttpHandler" Version="4.5.2" /> + <PackageReference Include="System.Net.NameResolution" Version="[4.3,)" /> + <PackageReference Include="System.Net.Requests" Version="[4.3,)" /> + <PackageReference Include="System.Net.Security" Version="4.3.2" /> + <PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.2" /> + </ItemGroup> + +</Project> diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/THttpTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/THttpTransport.cs new file mode 100644 index 000000000..c84df83ae --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/THttpTransport.cs @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport.Client +{ + // ReSharper disable once InconsistentNaming + public class THttpTransport : TTransport + { + private readonly X509Certificate[] _certificates; + private readonly Uri _uri; + + private int _connectTimeout = 30000; // Timeouts in milliseconds + private HttpClient _httpClient; + private Stream _inputStream; + private MemoryStream _outputStream = new MemoryStream(); + private bool _isDisposed; + + public THttpTransport(Uri uri, IDictionary<string, string> customRequestHeaders = null, string userAgent = null) + : this(uri, Enumerable.Empty<X509Certificate>(), customRequestHeaders, userAgent) + { + } + + public THttpTransport(Uri uri, IEnumerable<X509Certificate> certificates, + IDictionary<string, string> customRequestHeaders, string userAgent = null) + { + _uri = uri; + _certificates = (certificates ?? Enumerable.Empty<X509Certificate>()).ToArray(); + + if (!string.IsNullOrEmpty(userAgent)) + UserAgent = userAgent; + + // due to current bug with performance of Dispose in netcore https://github.com/dotnet/corefx/issues/8809 + // this can be switched to default way (create client->use->dispose per flush) later + _httpClient = CreateClient(customRequestHeaders); + } + + // According to RFC 2616 section 3.8, the "User-Agent" header may not carry a version number + public readonly string UserAgent = "Thrift netstd THttpClient"; + + public override bool IsOpen => true; + + public HttpRequestHeaders RequestHeaders => _httpClient.DefaultRequestHeaders; + + public MediaTypeHeaderValue ContentType { get; set; } + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + if (_inputStream != null) + { + _inputStream.Dispose(); + _inputStream = null; + } + + if (_outputStream != null) + { + _outputStream.Dispose(); + _outputStream = null; + } + + if (_httpClient != null) + { + _httpClient.Dispose(); + _httpClient = null; + } + } + + public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<int>(cancellationToken); + } + + if (_inputStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent"); + } + + try + { + var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken); + + if (ret == -1) + { + throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available"); + } + + return ret; + } + catch (IOException iox) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); + } + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + await _outputStream.WriteAsync(buffer, offset, length, cancellationToken); + } + + private HttpClient CreateClient(IDictionary<string, string> customRequestHeaders) + { + var handler = new HttpClientHandler(); + handler.ClientCertificates.AddRange(_certificates); + handler.AutomaticDecompression = System.Net.DecompressionMethods.Deflate | System.Net.DecompressionMethods.GZip; + + var httpClient = new HttpClient(handler); + + if (_connectTimeout > 0) + { + httpClient.Timeout = TimeSpan.FromMilliseconds(_connectTimeout); + } + + httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/x-thrift")); + httpClient.DefaultRequestHeaders.UserAgent.TryParseAdd(UserAgent); + + httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate")); + httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip")); + + if (customRequestHeaders != null) + { + foreach (var item in customRequestHeaders) + { + httpClient.DefaultRequestHeaders.Add(item.Key, item.Value); + } + } + + return httpClient; + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + try + { + _outputStream.Seek(0, SeekOrigin.Begin); + + using (var contentStream = new StreamContent(_outputStream)) + { + contentStream.Headers.ContentType = ContentType ?? new MediaTypeHeaderValue(@"application/x-thrift"); + + var response = (await _httpClient.PostAsync(_uri, contentStream, cancellationToken)).EnsureSuccessStatusCode(); + + _inputStream?.Dispose(); + _inputStream = await response.Content.ReadAsStreamAsync(); + if (_inputStream.CanSeek) + { + _inputStream.Seek(0, SeekOrigin.Begin); + } + } + } + catch (IOException iox) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, iox.ToString()); + } + catch (HttpRequestException wx) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, + "Couldn't connect to server: " + wx); + } + catch (Exception ex) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, ex.Message); + } + finally + { + _outputStream = new MemoryStream(); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + _inputStream?.Dispose(); + _outputStream?.Dispose(); + _httpClient?.Dispose(); + } + } + _isDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs new file mode 100644 index 000000000..25895c2b7 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport.Client +{ + // ReSharper disable once InconsistentNaming + public class TMemoryBufferTransport : TTransport + { + private bool IsDisposed; + private byte[] Bytes; + private int _bytesUsed; + + public TMemoryBufferTransport() + { + Bytes = new byte[2048]; // default size + } + + public TMemoryBufferTransport(int initialCapacity) + { + Bytes = new byte[initialCapacity]; // default size + } + + public TMemoryBufferTransport(byte[] buf) + { + Bytes = (byte[])buf.Clone(); + _bytesUsed = Bytes.Length; + } + + public int Position { get; set; } + + public int Capacity + { + get + { + Debug.Assert(_bytesUsed <= Bytes.Length); + return Bytes.Length; + } + set + { + Array.Resize(ref Bytes, value); + _bytesUsed = value; + } + } + + public int Length + { + get { + Debug.Assert(_bytesUsed <= Bytes.Length); + return _bytesUsed; + } + set { + if ((Bytes.Length < value) || (Bytes.Length > (10 * value))) + Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25))); + _bytesUsed = value; + } + } + + public void SetLength(int value) + { + Length = value; + Position = Math.Min(Position, value); + } + + public override bool IsOpen => true; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + /** do nothing **/ + } + + public void Seek(int delta, SeekOrigin origin) + { + int newPos; + switch (origin) + { + case SeekOrigin.Begin: + newPos = delta; + break; + case SeekOrigin.Current: + newPos = Position + delta; + break; + case SeekOrigin.End: + newPos = _bytesUsed + delta; + break; + default: + throw new ArgumentException(nameof(origin)); + } + + if ((0 > newPos) || (newPos > _bytesUsed)) + throw new ArgumentException(nameof(origin)); + Position = newPos; + } + + public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + var count = Math.Min(Length - Position, length); + Buffer.BlockCopy(Bytes, Position, buffer, offset, count); + Position += count; + return new ValueTask<int>(count); + } + + public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) + { + return WriteAsync(buffer, 0, buffer.Length, cancellationToken); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var free = Length - Position; + Length = Length + count - free; + Buffer.BlockCopy(buffer, offset, Bytes, Position, count); + Position += count; + return Task.CompletedTask; + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public byte[] GetBuffer() + { + var retval = new byte[Length]; + Buffer.BlockCopy(Bytes, 0, retval, 0, Length); + return retval; + } + + internal bool TryGetBuffer(out ArraySegment<byte> bufSegment) + { + bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed); + return true; + } + + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!IsDisposed) + { + if (disposing) + { + // nothing to do + } + } + IsDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs new file mode 100644 index 000000000..7dfe0131e --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO.Pipes; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport.Client +{ + // ReSharper disable once InconsistentNaming + public class TNamedPipeTransport : TTransport + { + private NamedPipeClientStream PipeStream; + private int ConnectTimeout; + + public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite) + : this(".", pipe, timeout) + { + } + + public TNamedPipeTransport(string server, string pipe, int timeout = Timeout.Infinite) + { + var serverName = string.IsNullOrWhiteSpace(server) ? server : "."; + ConnectTimeout = (timeout > 0) ? timeout : Timeout.Infinite; + + PipeStream = new NamedPipeClientStream(serverName, pipe, PipeDirection.InOut, PipeOptions.None); + } + + public override bool IsOpen => PipeStream != null && PipeStream.IsConnected; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen); + } + + await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken); + } + + public override void Close() + { + if (PipeStream != null) + { + PipeStream.Dispose(); + PipeStream = null; + } + } + + public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (PipeStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (PipeStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit + while (nBytes > 0) + { + await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken); + offset += nBytes; + length -= nBytes; + nBytes = Math.Min(nBytes, length); + } + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + protected override void Dispose(bool disposing) + { + PipeStream.Dispose(); + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs new file mode 100644 index 000000000..00da04581 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport.Client +{ + // ReSharper disable once InconsistentNaming + public class TSocketTransport : TStreamTransport + { + private bool _isDisposed; + + + public TSocketTransport(TcpClient client) + { + TcpClient = client ?? throw new ArgumentNullException(nameof(client)); + SetInputOutputStream(); + } + + public TSocketTransport(IPAddress host, int port) + : this(host, port, 0) + { + } + + public TSocketTransport(IPAddress host, int port, int timeout) + { + Host = host; + Port = port; + + TcpClient = new TcpClient(); + TcpClient.ReceiveTimeout = TcpClient.SendTimeout = timeout; + TcpClient.Client.NoDelay = true; + SetInputOutputStream(); + } + + public TSocketTransport(string host, int port, int timeout = 0) + { + try + { + var entry = Dns.GetHostEntry(host); + if (entry.AddressList.Length == 0) + throw new TTransportException(TTransportException.ExceptionType.Unknown, "unable to resolve host name"); + + var addr = entry.AddressList[0]; + Host = new IPAddress(addr.GetAddressBytes(), addr.ScopeId); + Port = port; + + TcpClient = new TcpClient(host, port); + TcpClient.ReceiveTimeout = TcpClient.SendTimeout = timeout; + TcpClient.Client.NoDelay = true; + SetInputOutputStream(); + } + catch (SocketException e) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, e.Message, e); + } + } + + private void SetInputOutputStream() + { + if (IsOpen) + { + InputStream = TcpClient.GetStream(); + OutputStream = TcpClient.GetStream(); + } + } + + public TcpClient TcpClient { get; private set; } + public IPAddress Host { get; } + public int Port { get; } + + public int Timeout + { + set + { + if (TcpClient != null) + { + TcpClient.ReceiveTimeout = TcpClient.SendTimeout = value; + } + } + } + + public override bool IsOpen + { + get + { + return (TcpClient != null) && TcpClient.Connected; + } + } + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + + if (IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen, "Socket already connected"); + } + + if (Port <= 0) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open without port"); + } + + if (TcpClient == null) + { + throw new InvalidOperationException("Invalid or not initialized tcp client"); + } + + await TcpClient.ConnectAsync(Host, Port); + SetInputOutputStream(); + } + + public override void Close() + { + base.Close(); + + if (TcpClient != null) + { + TcpClient.Dispose(); + TcpClient = null; + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + TcpClient?.Dispose(); + + base.Dispose(disposing); + } + } + _isDisposed = true; + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs new file mode 100644 index 000000000..d8574d610 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport.Client +{ + // ReSharper disable once InconsistentNaming + public class TStreamTransport : TTransport + { + private bool _isDisposed; + + protected TStreamTransport() + { + } + + public TStreamTransport(Stream inputStream, Stream outputStream) + { + InputStream = inputStream; + OutputStream = outputStream; + } + + protected Stream OutputStream { get; set; } + + protected Stream InputStream { get; set; } + + public override bool IsOpen => true; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + if (InputStream != null) + { + InputStream.Dispose(); + InputStream = null; + } + + if (OutputStream != null) + { + OutputStream.Dispose(); + OutputStream = null; + } + } + + public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (InputStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, + "Cannot read from null inputstream"); + } + + return await InputStream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (OutputStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, + "Cannot write to null outputstream"); + } + + await OutputStream.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await OutputStream.FlushAsync(cancellationToken); + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + InputStream?.Dispose(); + OutputStream?.Dispose(); + } + } + _isDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs new file mode 100644 index 000000000..9295bb01b --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport.Client +{ + //TODO: check for correct work + + // ReSharper disable once InconsistentNaming + public class TTlsSocketTransport : TStreamTransport + { + private readonly X509Certificate2 _certificate; + private readonly RemoteCertificateValidationCallback _certValidator; + private readonly IPAddress _host; + private readonly bool _isServer; + private readonly LocalCertificateSelectionCallback _localCertificateSelectionCallback; + private readonly int _port; + private readonly SslProtocols _sslProtocols; + private TcpClient _client; + private SslStream _secureStream; + private int _timeout; + + public TTlsSocketTransport(TcpClient client, X509Certificate2 certificate, bool isServer = false, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + _client = client; + _certificate = certificate; + _certValidator = certValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + _isServer = isServer; + + if (isServer && certificate == null) + { + throw new ArgumentException("TTlsSocketTransport needs certificate to be used for server", + nameof(certificate)); + } + + if (IsOpen) + { + InputStream = client.GetStream(); + OutputStream = client.GetStream(); + } + } + + public TTlsSocketTransport(IPAddress host, int port, string certificatePath, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(host, port, 0, + new X509Certificate2(certificatePath), + certValidator, + localCertificateSelectionCallback, + sslProtocols) + { + } + + public TTlsSocketTransport(IPAddress host, int port, + X509Certificate2 certificate = null, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(host, port, 0, + certificate, + certValidator, + localCertificateSelectionCallback, + sslProtocols) + { + } + + public TTlsSocketTransport(IPAddress host, int port, int timeout, + X509Certificate2 certificate, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + _host = host; + _port = port; + _timeout = timeout; + _certificate = certificate; + _certValidator = certValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + + InitSocket(); + } + + public TTlsSocketTransport(string host, int port, int timeout, + X509Certificate2 certificate, + RemoteCertificateValidationCallback certValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + try + { + var entry = Dns.GetHostEntry(host); + if (entry.AddressList.Length == 0) + throw new TTransportException(TTransportException.ExceptionType.Unknown, "unable to resolve host name"); + + var addr = entry.AddressList[0]; + + _host = new IPAddress(addr.GetAddressBytes(), addr.ScopeId); + _port = port; + _timeout = timeout; + _certificate = certificate; + _certValidator = certValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + + InitSocket(); + } + catch (SocketException e) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, e.Message, e); + } + } + + public int Timeout + { + set { _client.ReceiveTimeout = _client.SendTimeout = _timeout = value; } + } + + public TcpClient TcpClient => _client; + + public IPAddress Host => _host; + + public int Port => _port; + + public override bool IsOpen + { + get + { + if (_client == null) + { + return false; + } + + return _client.Connected; + } + } + + private void InitSocket() + { + _client = new TcpClient(); + _client.ReceiveTimeout = _client.SendTimeout = _timeout; + _client.Client.NoDelay = true; + } + + private bool DefaultCertificateValidator(object sender, X509Certificate certificate, X509Chain chain, + SslPolicyErrors sslValidationErrors) + { + return sslValidationErrors == SslPolicyErrors.None; + } + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.AlreadyOpen, "Socket already connected"); + } + + if (_host == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open null host"); + } + + if (_port <= 0) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "Cannot open without port"); + } + + if (_client == null) + { + InitSocket(); + } + + if (_client != null) + { + await _client.ConnectAsync(_host, _port); + await SetupTlsAsync(); + } + } + + public async Task SetupTlsAsync() + { + var validator = _certValidator ?? DefaultCertificateValidator; + + if (_localCertificateSelectionCallback != null) + { + _secureStream = new SslStream(_client.GetStream(), false, validator, _localCertificateSelectionCallback); + } + else + { + _secureStream = new SslStream(_client.GetStream(), false, validator); + } + + try + { + if (_isServer) + { + // Server authentication + await + _secureStream.AuthenticateAsServerAsync(_certificate, _certValidator != null, _sslProtocols, + true); + } + else + { + // Client authentication + var certs = _certificate != null + ? new X509CertificateCollection {_certificate} + : new X509CertificateCollection(); + + var targetHost = _host.ToString(); + await _secureStream.AuthenticateAsClientAsync(targetHost, certs, _sslProtocols, true); + } + } + catch (Exception) + { + Close(); + throw; + } + + InputStream = _secureStream; + OutputStream = _secureStream; + } + + public override void Close() + { + base.Close(); + if (_client != null) + { + _client.Dispose(); + _client = null; + } + + if (_secureStream != null) + { + _secureStream.Dispose(); + _secureStream = null; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/NullLogger.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/NullLogger.cs new file mode 100644 index 000000000..1f1f542d5 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/NullLogger.cs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Microsoft.Extensions.Logging; +using System; + + +namespace Thrift.Transport.Server +{ + // sometimes we just don't want to log anything + internal class NullLogger<T> : IDisposable, ILogger, ILogger<T> + { + internal class NullScope : IDisposable + { + public void Dispose() + { + // nothing to do + } + } + + public IDisposable BeginScope<TState>(TState state) + { + return new NullScope(); + } + + public void Dispose() + { + // nothing to do + } + + public bool IsEnabled(LogLevel logLevel) + { + return false; // no + } + + public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter) + { + // do nothing + } + } +} + diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs new file mode 100644 index 000000000..056300cfe --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; +using Thrift.Processor; +using Thrift.Protocol; +using Thrift.Transport.Client; + +namespace Thrift.Transport.Server +{ + // ReSharper disable once InconsistentNaming + public class THttpServerTransport + { + protected const string ContentType = "application/x-thrift"; + private readonly ILogger _logger; + private readonly RequestDelegate _next; + protected Encoding Encoding = Encoding.UTF8; + + protected TProtocolFactory InputProtocolFactory; + protected TProtocolFactory OutputProtocolFactory; + + protected TTransportFactory InputTransportFactory; + protected TTransportFactory OutputTransportFactory; + + protected ITAsyncProcessor Processor; + + public THttpServerTransport(ITAsyncProcessor processor, RequestDelegate next = null, ILoggerFactory loggerFactory = null) + : this(processor, new TBinaryProtocol.Factory(), null, next, loggerFactory) + { + } + + public THttpServerTransport( + ITAsyncProcessor processor, + TProtocolFactory protocolFactory, + TTransportFactory transFactory = null, + RequestDelegate next = null, + ILoggerFactory loggerFactory = null) + : this(processor, protocolFactory, protocolFactory, transFactory, transFactory, next, loggerFactory) + { + } + + public THttpServerTransport( + ITAsyncProcessor processor, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + TTransportFactory inputTransFactory = null, + TTransportFactory outputTransFactory = null, + RequestDelegate next = null, + ILoggerFactory loggerFactory = null) + { + // loggerFactory == null is not illegal anymore + + Processor = processor ?? throw new ArgumentNullException(nameof(processor)); + InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory)); + OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory)); + + InputTransportFactory = inputTransFactory; + OutputTransportFactory = outputTransFactory; + + _next = next; + _logger = (loggerFactory != null) ? loggerFactory.CreateLogger<THttpServerTransport>() : new NullLogger<THttpServerTransport>(); + } + + public async Task Invoke(HttpContext context) + { + context.Response.ContentType = ContentType; + await ProcessRequestAsync(context, context.RequestAborted); //TODO: check for correct logic + } + + public async Task ProcessRequestAsync(HttpContext context, CancellationToken cancellationToken) + { + var transport = new TStreamTransport(context.Request.Body, context.Response.Body); + + try + { + var intrans = (InputTransportFactory != null) ? InputTransportFactory.GetTransport(transport) : transport; + var outtrans = (OutputTransportFactory != null) ? OutputTransportFactory.GetTransport(transport) : transport; + + var input = InputProtocolFactory.GetProtocol(intrans); + var output = OutputProtocolFactory.GetProtocol(outtrans); + + while (await Processor.ProcessAsync(input, output, cancellationToken)) + { + if (!context.Response.HasStarted) // oneway method called + await context.Response.Body.FlushAsync(cancellationToken); + } + } + catch (TTransportException) + { + if (!context.Response.HasStarted) // if something goes bust, let the client know + context.Response.StatusCode = 500; + } + finally + { + transport.Close(); + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs new file mode 100644 index 000000000..77b825143 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs @@ -0,0 +1,308 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Microsoft.Win32.SafeHandles; +using System; +using System.IO.Pipes; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using System.ComponentModel; +using System.Security.AccessControl; +using System.Security.Principal; + +namespace Thrift.Transport.Server +{ + // ReSharper disable once InconsistentNaming + public class TNamedPipeServerTransport : TServerTransport + { + /// <summary> + /// This is the address of the Pipe on the localhost. + /// </summary> + private readonly string _pipeAddress; + private bool _asyncMode = true; + private volatile bool _isPending = true; + private NamedPipeServerStream _stream = null; + + public TNamedPipeServerTransport(string pipeAddress) + { + _pipeAddress = pipeAddress; + } + + public override void Listen() + { + // nothing to do here + } + + public override void Close() + { + if (_stream != null) + { + try + { + if (_stream.IsConnected) + _stream.Disconnect(); + _stream.Dispose(); + } + finally + { + _stream = null; + _isPending = false; + } + } + } + + public override bool IsClientPending() + { + return _isPending; + } + + private void EnsurePipeInstance() + { + if (_stream == null) + { + const PipeDirection direction = PipeDirection.InOut; + const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances; + const PipeTransmissionMode mode = PipeTransmissionMode.Byte; + const int inbuf = 4096; + const int outbuf = 4096; + var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None; + + + // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes: + // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative() + // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative() + // EITHER WAY, + // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings + + try + { + var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf); + if( (handle != null) && (!handle.IsInvalid)) + _stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle); + else + _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/); + } + catch (NotImplementedException) // Mono still does not support async, fallback to sync + { + if (_asyncMode) + { + options &= (~PipeOptions.Asynchronous); + _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf); + _asyncMode = false; + } + else + { + throw; + } + } + } + } + + + #region CreatePipeNative workaround + + + [StructLayout(LayoutKind.Sequential)] + internal class SECURITY_ATTRIBUTES + { + internal int nLength = 0; + internal IntPtr lpSecurityDescriptor = IntPtr.Zero; + internal int bInheritHandle = 0; + } + + + private const string Kernel32 = "kernel32.dll"; + + [DllImport(Kernel32, SetLastError = true)] + internal static extern IntPtr CreateNamedPipe( + string lpName, uint dwOpenMode, uint dwPipeMode, + uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut, + SECURITY_ATTRIBUTES pipeSecurityDescriptor + ); + + + + // Workaround: create the pipe via API call + // we have to do it this way, since NamedPipeServerStream() for netstd still lacks a few CTORs + // and _stream.SetAccessControl(pipesec); only keeps throwing ACCESS_DENIED errors at us + // References: + // - https://github.com/dotnet/corefx/issues/30170 (closed, continued in 31190) + // - https://github.com/dotnet/corefx/issues/31190 System.IO.Pipes.AccessControl package does not work + // - https://github.com/dotnet/corefx/issues/24040 NamedPipeServerStream: Provide support for WRITE_DAC + // - https://github.com/dotnet/corefx/issues/34400 Have a mechanism for lower privileged user to connect to a privileged user's pipe + private SafePipeHandle CreatePipeNative(string name, int inbuf, int outbuf) + { + if (Environment.OSVersion.Platform != PlatformID.Win32NT) + return null; // Windows only + + var pinningHandle = new GCHandle(); + try + { + // owner gets full access, everyone else read/write + var pipesec = new PipeSecurity(); + using (var currentIdentity = WindowsIdentity.GetCurrent()) + { + var sidOwner = currentIdentity.Owner; + var sidWorld = new SecurityIdentifier(WellKnownSidType.WorldSid, null); + + pipesec.SetOwner(sidOwner); + pipesec.AddAccessRule(new PipeAccessRule(sidOwner, PipeAccessRights.FullControl, AccessControlType.Allow)); + pipesec.AddAccessRule(new PipeAccessRule(sidWorld, PipeAccessRights.ReadWrite, AccessControlType.Allow)); + } + + // create a security descriptor and assign it to the security attribs + var secAttrs = new SECURITY_ATTRIBUTES(); + byte[] sdBytes = pipesec.GetSecurityDescriptorBinaryForm(); + pinningHandle = GCHandle.Alloc(sdBytes, GCHandleType.Pinned); + unsafe { + fixed (byte* pSD = sdBytes) { + secAttrs.lpSecurityDescriptor = (IntPtr)pSD; + } + } + + // a bunch of constants we will need shortly + const int PIPE_ACCESS_DUPLEX = 0x00000003; + const int FILE_FLAG_OVERLAPPED = 0x40000000; + const int WRITE_DAC = 0x00040000; + const int PIPE_TYPE_BYTE = 0x00000000; + const int PIPE_READMODE_BYTE = 0x00000000; + const int PIPE_UNLIMITED_INSTANCES = 255; + + // create the pipe via API call + var rawHandle = CreateNamedPipe( + @"\\.\pipe\" + name, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + PIPE_UNLIMITED_INSTANCES, (uint)inbuf, (uint)outbuf, + 5 * 1000, + secAttrs + ); + + // make a SafePipeHandle() from it + var handle = new SafePipeHandle(rawHandle, true); + if (handle.IsInvalid) + throw new Win32Exception(Marshal.GetLastWin32Error()); + + // return it (to be packaged) + return handle; + } + finally + { + if (pinningHandle.IsAllocated) + pinningHandle.Free(); + } + } + + #endregion + + protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + try + { + EnsurePipeInstance(); + + await _stream.WaitForConnectionAsync(cancellationToken); + + var trans = new ServerTransport(_stream); + _stream = null; // pass ownership to ServerTransport + + //_isPending = false; + + return trans; + } + catch (TTransportException) + { + Close(); + throw; + } + catch (Exception e) + { + Close(); + throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message); + } + } + + private class ServerTransport : TTransport + { + private readonly NamedPipeServerStream PipeStream; + + public ServerTransport(NamedPipeServerStream stream) + { + PipeStream = stream; + } + + public override bool IsOpen => PipeStream != null && PipeStream.IsConnected; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + public override void Close() + { + PipeStream?.Dispose(); + } + + public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (PipeStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + return await PipeStream.ReadAsync(buffer, offset, length, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + if (PipeStream == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + // if necessary, send the data in chunks + // there's a system limit around 0x10000 bytes that we hit otherwise + // MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section." + var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit + while (nBytes > 0) + { + await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken); + offset += nBytes; + length -= nBytes; + nBytes = Math.Min(nBytes, length); + } + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + await Task.FromCanceled(cancellationToken); + } + } + + protected override void Dispose(bool disposing) + { + PipeStream?.Dispose(); + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs new file mode 100644 index 000000000..86d82e3fc --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transport.Client; + +namespace Thrift.Transport.Server +{ + + // ReSharper disable once InconsistentNaming + public class TServerSocketTransport : TServerTransport + { + private readonly int _clientTimeout; + private TcpListener _server; + + public TServerSocketTransport(TcpListener listener, int clientTimeout = 0) + { + _server = listener; + _clientTimeout = clientTimeout; + } + + public TServerSocketTransport(int port, int clientTimeout = 0) + : this(null, clientTimeout) + { + try + { + // Make server socket + _server = new TcpListener(IPAddress.Any, port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public override void Listen() + { + // Make sure not to block on accept + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException("Could not accept on listening socket: " + sx.Message); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TTransport>(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + TTransport tSocketTransport = null; + var tcpClient = await _server.AcceptTcpClientAsync(); + + try + { + tSocketTransport = new TSocketTransport(tcpClient) + { + Timeout = _clientTimeout + }; + + return tSocketTransport; + } + catch (Exception) + { + if (tSocketTransport != null) + { + tSocketTransport.Dispose(); + } + else // Otherwise, clean it up ourselves. + { + ((IDisposable) tcpClient).Dispose(); + } + + throw; + } + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException("WARNING: Could not close server socket: " + ex); + } + _server = null; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs new file mode 100644 index 000000000..128680599 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using Thrift.Transport.Client; + +namespace Thrift.Transport.Server +{ + // ReSharper disable once InconsistentNaming + public class TTlsServerSocketTransport : TServerTransport + { + private readonly RemoteCertificateValidationCallback _clientCertValidator; + private readonly int _clientTimeout = 0; + private readonly LocalCertificateSelectionCallback _localCertificateSelectionCallback; + private readonly X509Certificate2 _serverCertificate; + private readonly SslProtocols _sslProtocols; + private TcpListener _server; + + public TTlsServerSocketTransport( + TcpListener listener, + X509Certificate2 certificate, + RemoteCertificateValidationCallback clientCertValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + { + if (!certificate.HasPrivateKey) + { + throw new TTransportException(TTransportException.ExceptionType.Unknown, + "Your server-certificate needs to have a private key"); + } + + _serverCertificate = certificate; + _clientCertValidator = clientCertValidator; + _localCertificateSelectionCallback = localCertificateSelectionCallback; + _sslProtocols = sslProtocols; + _server = listener; + } + + public TTlsServerSocketTransport( + int port, + X509Certificate2 certificate, + RemoteCertificateValidationCallback clientCertValidator = null, + LocalCertificateSelectionCallback localCertificateSelectionCallback = null, + SslProtocols sslProtocols = SslProtocols.Tls12) + : this(null, certificate, clientCertValidator, localCertificateSelectionCallback) + { + try + { + // Create server socket + _server = new TcpListener(IPAddress.Any, port); + _server.Server.NoDelay = true; + } + catch (Exception) + { + _server = null; + throw new TTransportException($"Could not create ServerSocket on port {port}."); + } + } + + public override void Listen() + { + // Make sure accept is not blocking + if (_server != null) + { + try + { + _server.Start(); + } + catch (SocketException sx) + { + throw new TTransportException($"Could not accept on listening socket: {sx.Message}"); + } + } + } + + public override bool IsClientPending() + { + return _server.Pending(); + } + + protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return await Task.FromCanceled<TTransport>(cancellationToken); + } + + if (_server == null) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No underlying server socket."); + } + + try + { + var client = await _server.AcceptTcpClientAsync(); + client.SendTimeout = client.ReceiveTimeout = _clientTimeout; + + //wrap the client in an SSL Socket passing in the SSL cert + var tTlsSocket = new TTlsSocketTransport(client, _serverCertificate, true, _clientCertValidator, + _localCertificateSelectionCallback, _sslProtocols); + + await tTlsSocket.SetupTlsAsync(); + + return tTlsSocket; + } + catch (Exception ex) + { + throw new TTransportException(ex.ToString()); + } + } + + public override void Close() + { + if (_server != null) + { + try + { + _server.Stop(); + } + catch (Exception ex) + { + throw new TTransportException($"WARNING: Could not close server socket: {ex}"); + } + + _server = null; + } + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TBufferedTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TBufferedTransport.cs new file mode 100644 index 000000000..e4fdd3a8d --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TBufferedTransport.cs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport +{ + // ReSharper disable once InconsistentNaming + public class TBufferedTransport : TTransport + { + private readonly int DesiredBufferSize; + private readonly Client.TMemoryBufferTransport ReadBuffer = new Client.TMemoryBufferTransport(1024); + private readonly Client.TMemoryBufferTransport WriteBuffer = new Client.TMemoryBufferTransport(1024); + private readonly TTransport InnerTransport; + private bool IsDisposed; + + public class Factory : TTransportFactory + { + public override TTransport GetTransport(TTransport trans) + { + return new TBufferedTransport(trans); + } + } + + //TODO: should support only specified input transport? + public TBufferedTransport(TTransport transport, int bufSize = 1024) + { + if (bufSize <= 0) + { + throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number."); + } + + InnerTransport = transport ?? throw new ArgumentNullException(nameof(transport)); + DesiredBufferSize = bufSize; + + if (DesiredBufferSize != ReadBuffer.Capacity) + ReadBuffer.Capacity = DesiredBufferSize; + if (DesiredBufferSize != WriteBuffer.Capacity) + WriteBuffer.Capacity = DesiredBufferSize; + } + + public TTransport UnderlyingTransport + { + get + { + CheckNotDisposed(); + + return InnerTransport; + } + } + + public override bool IsOpen => !IsDisposed && InnerTransport.IsOpen; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + await InnerTransport.OpenAsync(cancellationToken); + } + + public override void Close() + { + CheckNotDisposed(); + + InnerTransport.Close(); + } + + public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + + // do we have something buffered? + var count = ReadBuffer.Length - ReadBuffer.Position; + if (count > 0) + { + return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken); + } + + // does the request even fit into the buffer? + // Note we test for >= instead of > to avoid nonsense buffering + if (length >= ReadBuffer.Capacity) + { + return await InnerTransport.ReadAsync(buffer, offset, length, cancellationToken); + } + + // buffer a new chunk of bytes from the underlying transport + ReadBuffer.Length = ReadBuffer.Capacity; + ArraySegment<byte> bufSegment; + ReadBuffer.TryGetBuffer(out bufSegment); + ReadBuffer.Length = await InnerTransport.ReadAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken); + ReadBuffer.Position = 0; + + // deliver the bytes + return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken); + } + + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + // enough space left in buffer? + var free = WriteBuffer.Capacity - WriteBuffer.Length; + if (length > free) + { + ArraySegment<byte> bufSegment; + WriteBuffer.TryGetBuffer(out bufSegment); + await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken); + WriteBuffer.SetLength(0); + } + + // do the data even fit into the buffer? + // Note we test for < instead of <= to avoid nonsense buffering + if (length < WriteBuffer.Capacity) + { + await WriteBuffer.WriteAsync(buffer, offset, length, cancellationToken); + return; + } + + // write thru + await InnerTransport.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (WriteBuffer.Length > 0) + { + ArraySegment<byte> bufSegment; + WriteBuffer.TryGetBuffer(out bufSegment); + await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken); + WriteBuffer.SetLength(0); + } + + await InnerTransport.FlushAsync(cancellationToken); + } + + private void CheckNotDisposed() + { + if (IsDisposed) + { + throw new ObjectDisposedException(nameof(InnerTransport)); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!IsDisposed) + { + if (disposing) + { + ReadBuffer?.Dispose(); + WriteBuffer?.Dispose(); + InnerTransport?.Dispose(); + } + } + IsDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TFramedTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TFramedTransport.cs new file mode 100644 index 000000000..de6df7238 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TFramedTransport.cs @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport +{ + // ReSharper disable once InconsistentNaming + public class TFramedTransport : TTransport + { + private const int HeaderSize = 4; + private readonly byte[] HeaderBuf = new byte[HeaderSize]; + private readonly Client.TMemoryBufferTransport ReadBuffer = new Client.TMemoryBufferTransport(); + private readonly Client.TMemoryBufferTransport WriteBuffer = new Client.TMemoryBufferTransport(); + private readonly TTransport InnerTransport; + + private bool IsDisposed; + + public class Factory : TTransportFactory + { + public override TTransport GetTransport(TTransport trans) + { + return new TFramedTransport(trans); + } + } + + public TFramedTransport(TTransport transport) + { + InnerTransport = transport ?? throw new ArgumentNullException(nameof(transport)); + + InitWriteBuffer(); + } + + public override bool IsOpen => !IsDisposed && InnerTransport.IsOpen; + + public override async Task OpenAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + await InnerTransport.OpenAsync(cancellationToken); + } + + public override void Close() + { + CheckNotDisposed(); + + InnerTransport.Close(); + } + + public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + // Read another frame of data if we run out of bytes + if (ReadBuffer.Position >= ReadBuffer.Length) + { + await ReadFrameAsync(cancellationToken); + } + + return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken); + } + + private async ValueTask ReadFrameAsync(CancellationToken cancellationToken) + { + await InnerTransport.ReadAllAsync(HeaderBuf, 0, HeaderSize, cancellationToken); + var size = DecodeFrameSize(HeaderBuf); + + ReadBuffer.SetLength(size); + ReadBuffer.Seek(0, SeekOrigin.Begin); + + ArraySegment<byte> bufSegment; + ReadBuffer.TryGetBuffer(out bufSegment); + await InnerTransport.ReadAllAsync(bufSegment.Array, 0, size, cancellationToken); + } + + public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + CheckNotDisposed(); + ValidateBufferArgs(buffer, offset, length); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + if (WriteBuffer.Length > (int.MaxValue - length)) + { + await FlushAsync(cancellationToken); + } + + await WriteBuffer.WriteAsync(buffer, offset, length, cancellationToken); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + CheckNotDisposed(); + + if (!IsOpen) + { + throw new TTransportException(TTransportException.ExceptionType.NotOpen); + } + + ArraySegment<byte> bufSegment; + WriteBuffer.TryGetBuffer(out bufSegment); + + int dataLen = bufSegment.Count - HeaderSize; + if (dataLen < 0) + { + throw new InvalidOperationException(); // logic error actually + } + + // Inject message header into the reserved buffer space + EncodeFrameSize(dataLen, bufSegment.Array); + + // Send the entire message at once + await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken); + + InitWriteBuffer(); + + await InnerTransport.FlushAsync(cancellationToken); + } + + private void InitWriteBuffer() + { + // Reserve space for message header to be put right before sending it out + WriteBuffer.SetLength(HeaderSize); + WriteBuffer.Seek(0, SeekOrigin.End); + } + + private static void EncodeFrameSize(int frameSize, byte[] buf) + { + buf[0] = (byte) (0xff & (frameSize >> 24)); + buf[1] = (byte) (0xff & (frameSize >> 16)); + buf[2] = (byte) (0xff & (frameSize >> 8)); + buf[3] = (byte) (0xff & (frameSize)); + } + + private static int DecodeFrameSize(byte[] buf) + { + return + ((buf[0] & 0xff) << 24) | + ((buf[1] & 0xff) << 16) | + ((buf[2] & 0xff) << 8) | + (buf[3] & 0xff); + } + + + private void CheckNotDisposed() + { + if (IsDisposed) + { + throw new ObjectDisposedException(this.GetType().Name); + } + } + + // IDisposable + protected override void Dispose(bool disposing) + { + if (!IsDisposed) + { + if (disposing) + { + ReadBuffer?.Dispose(); + WriteBuffer?.Dispose(); + InnerTransport?.Dispose(); + } + } + IsDisposed = true; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TServerTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TServerTransport.cs new file mode 100644 index 000000000..74c54cd0e --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TServerTransport.cs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport +{ + // ReSharper disable once InconsistentNaming + public abstract class TServerTransport + { + public abstract void Listen(); + public abstract void Close(); + public abstract bool IsClientPending(); + + protected virtual async ValueTask<TTransport> AcceptImplementationAsync() + { + return await AcceptImplementationAsync(CancellationToken.None); + } + + protected abstract ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken); + + public async ValueTask<TTransport> AcceptAsync() + { + return await AcceptAsync(CancellationToken.None); + } + + public async ValueTask<TTransport> AcceptAsync(CancellationToken cancellationToken) + { + var transport = await AcceptImplementationAsync(cancellationToken); + + if (transport == null) + { + throw new TTransportException($"{nameof(AcceptImplementationAsync)} should not return null"); + } + + return transport; + } + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs new file mode 100644 index 000000000..799801202 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Thrift.Transport +{ + //TODO: think about client info + // ReSharper disable once InconsistentNaming + public abstract class TTransport : IDisposable + { + //TODO: think how to avoid peek byte + private readonly byte[] _peekBuffer = new byte[1]; + private bool _hasPeekByte; + public abstract bool IsOpen { get; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public async ValueTask<bool> PeekAsync(CancellationToken cancellationToken) + { + //If we already have a byte read but not consumed, do nothing. + if (_hasPeekByte) + { + return true; + } + + //If transport closed we can't peek. + if (!IsOpen) + { + return false; + } + + //Try to read one byte. If succeeds we will need to store it for the next read. + try + { + var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken); + if (bytes == 0) + { + return false; + } + } + catch (IOException) + { + return false; + } + + _hasPeekByte = true; + return true; + } + + public virtual async Task OpenAsync() + { + await OpenAsync(CancellationToken.None); + } + + public abstract Task OpenAsync(CancellationToken cancellationToken); + + public abstract void Close(); + + protected static void ValidateBufferArgs(byte[] buffer, int offset, int length) + { + if (buffer == null) + { + throw new ArgumentNullException(nameof(buffer)); + } + +#if DEBUG // let it fail with OutOfRange in RELEASE mode + if (offset < 0) + { + throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset must be >= 0"); + } + + if (length < 0) + { + throw new ArgumentOutOfRangeException(nameof(length), "Buffer length must be >= 0"); + } + + if (offset + length > buffer.Length) + { + throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data"); + } +#endif + } + + public virtual async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length) + { + return await ReadAsync(buffer, offset, length, CancellationToken.None); + } + + public abstract ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken); + + public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length) + { + return await ReadAllAsync(buffer, offset, length, CancellationToken.None); + } + + public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken) + { + ValidateBufferArgs(buffer, offset, length); + + if (cancellationToken.IsCancellationRequested) + return await Task.FromCanceled<int>(cancellationToken); + + if (length <= 0) + return 0; + + // If we previously peeked a byte, we need to use that first. + var totalBytes = 0; + if (_hasPeekByte) + { + buffer[offset++] = _peekBuffer[0]; + _hasPeekByte = false; + if (1 == length) + { + return 1; // we're done + } + ++totalBytes; + } + + var remaining = length - totalBytes; + Debug.Assert(remaining > 0); // any other possible cases should have been handled already + while (true) + { + var numBytes = await ReadAsync(buffer, offset, remaining, cancellationToken); + totalBytes += numBytes; + if (totalBytes >= length) + { + return totalBytes; // we're done + } + + if (numBytes <= 0) + { + throw new TTransportException(TTransportException.ExceptionType.EndOfFile, + "Cannot read, Remote side has closed"); + } + + remaining -= numBytes; + offset += numBytes; + } + } + + public virtual async Task WriteAsync(byte[] buffer) + { + await WriteAsync(buffer, CancellationToken.None); + } + + public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken) + { + await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None); + } + + public virtual async Task WriteAsync(byte[] buffer, int offset, int length) + { + await WriteAsync(buffer, offset, length, CancellationToken.None); + } + + public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken); + + public virtual async Task FlushAsync() + { + await FlushAsync(CancellationToken.None); + } + + public abstract Task FlushAsync(CancellationToken cancellationToken); + + protected abstract void Dispose(bool disposing); + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransportException.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransportException.cs new file mode 100644 index 000000000..760a178e6 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransportException.cs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Thrift.Transport +{ + // ReSharper disable once InconsistentNaming + public class TTransportException : TException + { + public enum ExceptionType + { + Unknown, + NotOpen, + AlreadyOpen, + TimedOut, + EndOfFile, + Interrupted + } + + public ExceptionType ExType { get; private set; } + + public TTransportException() + { + } + + public TTransportException(ExceptionType exType, Exception inner = null) + : base(string.Empty, inner) + { + ExType = exType; + } + + public TTransportException(ExceptionType exType, string message, Exception inner = null) + : base(message, inner) + { + ExType = exType; + } + + public TTransportException(string message, Exception inner = null) + : base(message, inner) + { + } + + public ExceptionType Type => ExType; + } +}
\ No newline at end of file diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransportFactory.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransportFactory.cs new file mode 100644 index 000000000..16e27ac82 --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransportFactory.cs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation(ASF) under one +// or more contributor license agreements.See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership.The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Thrift.Transport +{ + /// <summary> + /// From Mark Slee & Aditya Agarwal of Facebook: + /// Factory class used to create wrapped instance of Transports. + /// This is used primarily in servers, which get Transports from + /// a ServerTransport and then may want to mutate them (i.e. create + /// a BufferedTransport from the underlying base transport) + /// </summary> + // ReSharper disable once InconsistentNaming + public class TTransportFactory + { + public virtual TTransport GetTransport(TTransport trans) + { + return trans; + } + } +} diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/thrift.snk b/src/jaegertracing/thrift/lib/netstd/Thrift/thrift.snk Binary files differnew file mode 100644 index 000000000..97bc5812b --- /dev/null +++ b/src/jaegertracing/thrift/lib/netstd/Thrift/thrift.snk |