summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs')
-rw-r--r--src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs190
1 files changed, 190 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs
new file mode 100644
index 000000000..799801202
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/netstd/Thrift/Transport/TTransport.cs
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Thrift.Transport
+{
+ //TODO: think about client info
+ // ReSharper disable once InconsistentNaming
+ public abstract class TTransport : IDisposable
+ {
+ //TODO: think how to avoid peek byte
+ private readonly byte[] _peekBuffer = new byte[1];
+ private bool _hasPeekByte;
+ public abstract bool IsOpen { get; }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public async ValueTask<bool> PeekAsync(CancellationToken cancellationToken)
+ {
+ //If we already have a byte read but not consumed, do nothing.
+ if (_hasPeekByte)
+ {
+ return true;
+ }
+
+ //If transport closed we can't peek.
+ if (!IsOpen)
+ {
+ return false;
+ }
+
+ //Try to read one byte. If succeeds we will need to store it for the next read.
+ try
+ {
+ var bytes = await ReadAsync(_peekBuffer, 0, 1, cancellationToken);
+ if (bytes == 0)
+ {
+ return false;
+ }
+ }
+ catch (IOException)
+ {
+ return false;
+ }
+
+ _hasPeekByte = true;
+ return true;
+ }
+
+ public virtual async Task OpenAsync()
+ {
+ await OpenAsync(CancellationToken.None);
+ }
+
+ public abstract Task OpenAsync(CancellationToken cancellationToken);
+
+ public abstract void Close();
+
+ protected static void ValidateBufferArgs(byte[] buffer, int offset, int length)
+ {
+ if (buffer == null)
+ {
+ throw new ArgumentNullException(nameof(buffer));
+ }
+
+#if DEBUG // let it fail with OutOfRange in RELEASE mode
+ if (offset < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset must be >= 0");
+ }
+
+ if (length < 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(length), "Buffer length must be >= 0");
+ }
+
+ if (offset + length > buffer.Length)
+ {
+ throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data");
+ }
+#endif
+ }
+
+ public virtual async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length)
+ {
+ return await ReadAsync(buffer, offset, length, CancellationToken.None);
+ }
+
+ public abstract ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
+
+ public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length)
+ {
+ return await ReadAllAsync(buffer, offset, length, CancellationToken.None);
+ }
+
+ public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ {
+ ValidateBufferArgs(buffer, offset, length);
+
+ if (cancellationToken.IsCancellationRequested)
+ return await Task.FromCanceled<int>(cancellationToken);
+
+ if (length <= 0)
+ return 0;
+
+ // If we previously peeked a byte, we need to use that first.
+ var totalBytes = 0;
+ if (_hasPeekByte)
+ {
+ buffer[offset++] = _peekBuffer[0];
+ _hasPeekByte = false;
+ if (1 == length)
+ {
+ return 1; // we're done
+ }
+ ++totalBytes;
+ }
+
+ var remaining = length - totalBytes;
+ Debug.Assert(remaining > 0); // any other possible cases should have been handled already
+ while (true)
+ {
+ var numBytes = await ReadAsync(buffer, offset, remaining, cancellationToken);
+ totalBytes += numBytes;
+ if (totalBytes >= length)
+ {
+ return totalBytes; // we're done
+ }
+
+ if (numBytes <= 0)
+ {
+ throw new TTransportException(TTransportException.ExceptionType.EndOfFile,
+ "Cannot read, Remote side has closed");
+ }
+
+ remaining -= numBytes;
+ offset += numBytes;
+ }
+ }
+
+ public virtual async Task WriteAsync(byte[] buffer)
+ {
+ await WriteAsync(buffer, CancellationToken.None);
+ }
+
+ public virtual async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
+ {
+ await WriteAsync(buffer, 0, buffer.Length, CancellationToken.None);
+ }
+
+ public virtual async Task WriteAsync(byte[] buffer, int offset, int length)
+ {
+ await WriteAsync(buffer, offset, length, CancellationToken.None);
+ }
+
+ public abstract Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
+
+ public virtual async Task FlushAsync()
+ {
+ await FlushAsync(CancellationToken.None);
+ }
+
+ public abstract Task FlushAsync(CancellationToken cancellationToken);
+
+ protected abstract void Dispose(bool disposing);
+ }
+}