summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs')
-rw-r--r--src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs78
1 files changed, 78 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs
new file mode 100644
index 000000000..e9ab5166a
--- /dev/null
+++ b/src/jaegertracing/thrift/contrib/zeromq/csharp/TZmqClient.cs
@@ -0,0 +1,78 @@
+using System;
+using ZMQ;
+using System.IO;
+using Thrift.Transport;
+
+namespace ZmqClient
+{
+ public class TZmqClient : TTransport
+ {
+ Socket _sock;
+ String _endpoint;
+ MemoryStream _wbuf = new MemoryStream ();
+ MemoryStream _rbuf = new MemoryStream ();
+
+ void debug (string msg)
+ {
+ //Uncomment to enable debug
+// Console.WriteLine (msg);
+ }
+
+ public TZmqClient (Context ctx, String endpoint, SocketType sockType)
+ {
+ _sock = ctx.Socket (sockType);
+ _endpoint = endpoint;
+ }
+
+ public override void Open ()
+ {
+ _sock.Connect (_endpoint);
+ }
+
+ public override void Close ()
+ {
+ throw new NotImplementedException ();
+ }
+
+ public override bool IsOpen {
+ get {
+ throw new NotImplementedException ();
+ }
+ }
+
+ public override int Read (byte[] buf, int off, int len)
+ {
+ debug ("Client_Read");
+ if (off != 0 || len != buf.Length)
+ throw new NotImplementedException ();
+
+ if (_rbuf.Length == 0) {
+ //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
+ debug ("Client_Read Filling buffer..");
+ byte[] tmpBuf = _sock.Recv ();
+ debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
+ _rbuf.Write (tmpBuf, 0, tmpBuf.Length);
+ _rbuf.Position = 0; //For reading
+ }
+ int ret = _rbuf.Read (buf, 0, len);
+ if (_rbuf.Length == _rbuf.Position) //Finished reading
+ _rbuf.SetLength (0);
+ debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
+ return ret;
+ }
+
+ public override void Write (byte[] buf, int off, int len)
+ {
+ debug ("Client_Write");
+ _wbuf.Write (buf, off, len);
+ }
+
+ public override void Flush ()
+ {
+ debug ("Client_Flush");
+ _sock.Send (_wbuf.GetBuffer ());
+ _wbuf = new MemoryStream ();
+ }
+ }
+}
+