summaryrefslogtreecommitdiffstats
path: root/src/blkin/babeltrace-plugins/zipkin
diff options
context:
space:
mode:
Diffstat (limited to 'src/blkin/babeltrace-plugins/zipkin')
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/README.md6
-rwxr-xr-xsrc/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py69
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py0
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py131
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py137
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py453
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py70
-rw-r--r--src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h67
8 files changed, 933 insertions, 0 deletions
diff --git a/src/blkin/babeltrace-plugins/zipkin/README.md b/src/blkin/babeltrace-plugins/zipkin/README.md
new file mode 100644
index 000000000..95cebe87b
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/README.md
@@ -0,0 +1,6 @@
+babeltrace-zipkin plugin
+========================
+
+In order to use this plugin, the traces created by LTTng should follow a
+specific format. This format is provided in zipkin_trace.h file. If this
+format is not followed the traces will be dropped.
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py
new file mode 100755
index 000000000..5677338c9
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py
@@ -0,0 +1,69 @@
+#!/usr/bin/python
+# babeltrace_zipkin.py
+
+import sys
+sys.path.append("../../babeltrace-plugins")
+import sys
+import getopt
+from babeltrace import *
+from zipkin_logic.zipkin_client import ZipkinClient
+HELP = "Usage: python babeltrace_zipkin.py path/to/file -s <server> -p <port>"
+
+
+def main(argv):
+ try:
+ path = argv[0]
+ except:
+ raise TypeError(HELP)
+
+ try:
+ opts, args = getopt.getopt(argv[1:], "hs:p:")
+ except getopt.GetoptError:
+ raise TypeError(HELP)
+
+ server = None
+ port = None
+ for opt, arg in opts:
+ if opt == '-h':
+ raise TypeError(HELP)
+ elif opt == '-s':
+ server = arg
+ elif opt == '-p':
+ port = arg
+
+ if not server:
+ server = "83.212.113.88"
+ if not port:
+ port = 1463
+
+ # Open connection with scribe
+ zipkin = ZipkinClient(port, server)
+
+ # Create TraceCollection and add trace:
+ traces = TraceCollection()
+ trace_handle = traces.add_trace(path, "ctf")
+ if trace_handle is None:
+ raise IOError("Error adding trace")
+
+ for event in traces.events:
+ name = event.name
+ try:
+ provider, kind = name.split(":")
+ if provider != "zipkin":
+ raise
+ except:
+ continue
+
+ #create a zipkin trace from event info
+ trace = zipkin.create_trace(event)
+
+ #create a zipkin annotation from event info
+ annotation = zipkin.create_annotation(event, kind)
+
+ #record the trace
+ zipkin.record(trace, annotation)
+
+ zipkin.close()
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py
new file mode 100644
index 000000000..70874c0b6
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py
@@ -0,0 +1,131 @@
+# Copyright 2012 Rackspace Hosting, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import struct
+import socket
+
+from thrift.protocol import TBinaryProtocol
+from thrift.transport import TTransport
+
+import ttypes
+
+
+def hex_str(n):
+ return '%0.16x' % (n,)
+
+
+def json_formatter(traces, *json_args, **json_kwargs):
+ json_traces = []
+
+ for (trace, annotations) in traces:
+ json_trace = {
+ 'trace_id': hex_str(trace.trace_id),
+ 'span_id': hex_str(trace.span_id),
+ 'name': trace.name,
+ 'annotations': []
+ }
+
+ if trace.parent_span_id:
+ json_trace['parent_span_id'] = hex_str(trace.parent_span_id)
+
+ for annotation in annotations:
+ json_annotation = {
+ 'key': annotation.name,
+ 'value': annotation.value,
+ 'type': annotation.annotation_type
+ }
+
+ if annotation.endpoint:
+ json_annotation['host'] = {
+ 'ipv4': annotation.endpoint.ipv4,
+ 'port': annotation.endpoint.port,
+ 'service_name': annotation.endpoint.service_name
+ }
+
+ json_trace['annotations'].append(json_annotation)
+
+ json_traces.append(json_trace)
+
+ return json.dumps(json_traces, *json_args, **json_kwargs)
+
+
+def ipv4_to_int(ipv4):
+ return struct.unpack('!i', socket.inet_aton(ipv4))[0]
+
+
+def base64_thrift(thrift_obj):
+ trans = TTransport.TMemoryBuffer()
+ tbp = TBinaryProtocol.TBinaryProtocol(trans)
+
+ thrift_obj.write(tbp)
+ res = trans.getvalue().encode('base64').strip()
+ res = res.replace("\n","")
+ #print res
+ #print len(res)
+ return res
+ #return trans.getvalue().encode('base64').strip()
+
+
+def binary_annotation_formatter(annotation, host=None):
+ annotation_types = {
+ 'string': ttypes.AnnotationType.STRING,
+ 'bytes': ttypes.AnnotationType.BYTES,
+ }
+
+ annotation_type = annotation_types[annotation.annotation_type]
+
+ value = annotation.value
+
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+
+ return ttypes.BinaryAnnotation(
+ annotation.name,
+ value,
+ annotation_type,
+ host)
+
+
+def base64_thrift_formatter(trace, annotations):
+ thrift_annotations = []
+ binary_annotations = []
+
+ for annotation in annotations:
+ host = None
+ if annotation.endpoint:
+ host = ttypes.Endpoint(
+ ipv4=ipv4_to_int(annotation.endpoint.ipv4),
+ port=annotation.endpoint.port,
+ service_name=annotation.endpoint.service_name)
+
+ if annotation.annotation_type == 'timestamp':
+ thrift_annotations.append(ttypes.Annotation(
+ timestamp=annotation.value,
+ value=annotation.name,
+ host=host))
+ else:
+ binary_annotations.append(
+ binary_annotation_formatter(annotation, host))
+
+ thrift_trace = ttypes.Span(
+ trace_id=trace.trace_id,
+ name=trace.name,
+ id=trace.span_id,
+ parent_id=trace.parent_span_id,
+ annotations=thrift_annotations,
+ binary_annotations=binary_annotations
+ )
+
+ return base64_thrift(thrift_trace)
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py
new file mode 100644
index 000000000..e0a3ed2c3
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py
@@ -0,0 +1,137 @@
+import math
+import time
+import random
+
+
+class Trace(object):
+ """
+ An L{ITrace} provider which delegates to zero or more L{ITracers} and
+ allows setting a default L{IEndpoint} to associate with L{IAnnotation}s
+
+ @ivar _tracers: C{list} of one or more L{ITracer} providers.
+ @ivar _endpoint: An L{IEndpoint} provider.
+ """
+ def __init__(self, name, trace_id=None, span_id=None,
+ parent_span_id=None, tracers=None):
+ """
+ @param name: C{str} describing the current span.
+ @param trace_id: C{int} or C{None}
+ @param span_id: C{int} or C{None}
+ @param parent_span_id: C{int} or C{None}
+
+ @param tracers: C{list} of L{ITracer} providers, primarily useful
+ for unit testing.
+ """
+ self.name = name
+ # If no trace_id and span_id are given we want to generate new
+ # 64-bit integer ids.
+ self.trace_id = trace_id
+ self.span_id = span_id
+
+ # If no parent_span_id is given then we assume there is no parent span
+ # and leave it as None.
+ self.parent_span_id = parent_span_id
+
+ # If no tracers are given we get the global list of tracers.
+ self._tracers = tracers
+
+ # By default no endpoint will be associated with annotations recorded
+ # to this trace.
+ self._endpoint = None
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return (
+ '{0.__class__.__name__}({0.name!r}, trace_id={0.trace_id!r}, '
+ 'span_id={0.span_id!r}, parent_span_id={0.parent_span_id!r})'
+ ).format(self)
+
+ def set_endpoint(self, endpoint):
+ """
+ Set a default L{IEndpoint} provider for the current L{Trace}.
+ All annotations recorded after this endpoint is set will use it,
+ unless they provide their own endpoint.
+ """
+ self._endpoint = endpoint
+
+
+class Endpoint(object):
+
+ def __init__(self, ipv4, port, service_name):
+ """
+ @param ipv4: C{str} ipv4 address.
+ @param port: C{int} port number.
+ @param service_name: C{str} service name.
+ """
+ self.ipv4 = ipv4
+ self.port = port
+ self.service_name = service_name
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return ('{0.__class__.__name__}({0.ipv4!r}, {0.port!r}, '
+ '{0.service_name!r})').format(self)
+
+
+class Annotation(object):
+
+ def __init__(self, name, value, annotation_type, endpoint=None):
+ """
+ @param name: C{str} name of this annotation.
+
+ @param value: A value of the appropriate type based on
+ C{annotation_type}.
+
+ @param annotation_type: C{str} the expected type of our C{value}.
+
+ @param endpoint: An optional L{IEndpoint} provider to associate with
+ this annotation or C{None}
+ """
+ self.name = name
+ self.value = value
+ self.annotation_type = annotation_type
+ self.endpoint = endpoint
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return (
+ '{0.__class__.__name__}({0.name!r}, {0.value!r}, '
+ '{0.annotation_type!r}, {0.endpoint})'
+ ).format(self)
+
+ @classmethod
+ def timestamp(cls, name, timestamp=None):
+ if timestamp is None:
+ timestamp = math.trunc(time.time() * 1000 * 1000)
+
+ return cls(name, timestamp, 'timestamp')
+
+ @classmethod
+ def client_send(cls, timestamp=None):
+ return cls.timestamp(constants.CLIENT_SEND, timestamp)
+
+ @classmethod
+ def client_recv(cls, timestamp=None):
+ return cls.timestamp(constants.CLIENT_RECV, timestamp)
+
+ @classmethod
+ def server_send(cls, timestamp=None):
+ return cls.timestamp(constants.SERVER_SEND, timestamp)
+
+ @classmethod
+ def server_recv(cls, timestamp=None):
+ return cls.timestamp(constants.SERVER_RECV, timestamp)
+
+ @classmethod
+ def string(cls, name, value):
+ return cls(name, value, 'string')
+
+ @classmethod
+ def bytes(cls, name, value):
+ return cls(name, value, 'bytes')
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py
new file mode 100644
index 000000000..8605559b7
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py
@@ -0,0 +1,453 @@
+#
+# Autogenerated by Thrift Compiler (0.8.0)
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+# options string: py:twisted
+#
+
+from thrift.Thrift import TType, TMessageType, TException
+
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol, TProtocol
+try:
+ from thrift.protocol import fastbinary
+except:
+ fastbinary = None
+
+
+class AnnotationType:
+ BOOL = 0
+ BYTES = 1
+ I16 = 2
+ I32 = 3
+ I64 = 4
+ DOUBLE = 5
+ STRING = 6
+
+ _VALUES_TO_NAMES = {
+ 0: "BOOL",
+ 1: "BYTES",
+ 2: "I16",
+ 3: "I32",
+ 4: "I64",
+ 5: "DOUBLE",
+ 6: "STRING",
+ }
+
+ _NAMES_TO_VALUES = {
+ "BOOL": 0,
+ "BYTES": 1,
+ "I16": 2,
+ "I32": 3,
+ "I64": 4,
+ "DOUBLE": 5,
+ "STRING": 6,
+ }
+
+
+class Endpoint:
+ """
+ Attributes:
+ - ipv4
+ - port
+ - service_name
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'ipv4', None, None, ), # 1
+ (2, TType.I16, 'port', None, None, ), # 2
+ (3, TType.STRING, 'service_name', None, None, ), # 3
+ )
+
+ def __init__(self, ipv4=None, port=None, service_name=None,):
+ self.ipv4 = ipv4
+ self.port = port
+ self.service_name = service_name
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.ipv4 = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I16:
+ self.port = iprot.readI16();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.service_name = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Endpoint')
+ if self.ipv4 is not None:
+ oprot.writeFieldBegin('ipv4', TType.I32, 1)
+ oprot.writeI32(self.ipv4)
+ oprot.writeFieldEnd()
+ if self.port is not None:
+ oprot.writeFieldBegin('port', TType.I16, 2)
+ oprot.writeI16(self.port)
+ oprot.writeFieldEnd()
+ if self.service_name is not None:
+ oprot.writeFieldBegin('service_name', TType.STRING, 3)
+ oprot.writeString(self.service_name)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Annotation:
+ """
+ Attributes:
+ - timestamp
+ - value
+ - host
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'timestamp', None, None, ), # 1
+ (2, TType.STRING, 'value', None, None, ), # 2
+ (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3
+ )
+
+ def __init__(self, timestamp=None, value=None, host=None,):
+ self.timestamp = timestamp
+ self.value = value
+ self.host = host
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.timestamp = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRUCT:
+ self.host = Endpoint()
+ self.host.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Annotation')
+ if self.timestamp is not None:
+ oprot.writeFieldBegin('timestamp', TType.I64, 1)
+ oprot.writeI64(self.timestamp)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value)
+ oprot.writeFieldEnd()
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRUCT, 3)
+ self.host.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class BinaryAnnotation:
+ """
+ Attributes:
+ - key
+ - value
+ - annotation_type
+ - host
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'key', None, None, ), # 1
+ (2, TType.STRING, 'value', None, None, ), # 2
+ (3, TType.I32, 'annotation_type', None, None, ), # 3
+ (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4
+ )
+
+ def __init__(self, key=None, value=None, annotation_type=None, host=None,):
+ self.key = key
+ self.value = value
+ self.annotation_type = annotation_type
+ self.host = host
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.key = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.annotation_type = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRUCT:
+ self.host = Endpoint()
+ self.host.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('BinaryAnnotation')
+ if self.key is not None:
+ oprot.writeFieldBegin('key', TType.STRING, 1)
+ oprot.writeString(self.key)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value)
+ oprot.writeFieldEnd()
+ if self.annotation_type is not None:
+ oprot.writeFieldBegin('annotation_type', TType.I32, 3)
+ oprot.writeI32(self.annotation_type)
+ oprot.writeFieldEnd()
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRUCT, 4)
+ self.host.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Span:
+ """
+ Attributes:
+ - trace_id
+ - name
+ - id
+ - parent_id
+ - annotations
+ - binary_annotations
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'trace_id', None, None, ), # 1
+ None, # 2
+ (3, TType.STRING, 'name', None, None, ), # 3
+ (4, TType.I64, 'id', None, None, ), # 4
+ (5, TType.I64, 'parent_id', None, None, ), # 5
+ (6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6
+ None, # 7
+ (8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8
+ )
+
+ def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,):
+ self.trace_id = trace_id
+ self.name = name
+ self.id = id
+ self.parent_id = parent_id
+ self.annotations = annotations
+ self.binary_annotations = binary_annotations
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.trace_id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.name = iprot.readString();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I64:
+ self.id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.I64:
+ self.parent_id = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.LIST:
+ self.annotations = []
+ (_etype3, _size0) = iprot.readListBegin()
+ for _i4 in xrange(_size0):
+ _elem5 = Annotation()
+ _elem5.read(iprot)
+ self.annotations.append(_elem5)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.LIST:
+ self.binary_annotations = []
+ (_etype9, _size6) = iprot.readListBegin()
+ for _i10 in xrange(_size6):
+ _elem11 = BinaryAnnotation()
+ _elem11.read(iprot)
+ self.binary_annotations.append(_elem11)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Span')
+ if self.trace_id is not None:
+ oprot.writeFieldBegin('trace_id', TType.I64, 1)
+ oprot.writeI64(self.trace_id)
+ oprot.writeFieldEnd()
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 3)
+ oprot.writeString(self.name)
+ oprot.writeFieldEnd()
+ if self.id is not None:
+ oprot.writeFieldBegin('id', TType.I64, 4)
+ oprot.writeI64(self.id)
+ oprot.writeFieldEnd()
+ if self.parent_id is not None:
+ oprot.writeFieldBegin('parent_id', TType.I64, 5)
+ oprot.writeI64(self.parent_id)
+ oprot.writeFieldEnd()
+ if self.annotations is not None:
+ oprot.writeFieldBegin('annotations', TType.LIST, 6)
+ oprot.writeListBegin(TType.STRUCT, len(self.annotations))
+ for iter12 in self.annotations:
+ iter12.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.binary_annotations is not None:
+ oprot.writeFieldBegin('binary_annotations', TType.LIST, 8)
+ oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations))
+ for iter13 in self.binary_annotations:
+ iter13.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py
new file mode 100644
index 000000000..28118facb
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py
@@ -0,0 +1,70 @@
+#!/usr/bin/python
+
+from scribe_client import ScribeClient
+from trace import Annotation, Trace, Endpoint
+from collections import defaultdict
+from formatters import base64_thrift_formatter
+
+
+class ZipkinClient(ScribeClient):
+
+ DEFAULT_END_ANNOTATIONS = ("ss", "cr", "end")
+
+ def __init__(self, port, host):
+ super(ZipkinClient, self).__init__(port, host)
+ self._annotations_for_trace = defaultdict(list)
+
+ def create_trace(self, event):
+ service = event["trace_name"]
+ trace_id = event["trace_id"]
+ span_id = event["span_id"]
+ parent_span = event["parent_span_id"]
+ if parent_span == 0:
+ parent_span = None
+ trace = Trace(service, trace_id, span_id, parent_span)
+ return trace
+
+ def create_annotation(self, event, kind):
+ if kind == "keyval_string":
+ key = event["key"]
+ val = event["val"]
+ annotation = Annotation.string(key, val)
+ elif kind == "keyval_integer":
+ key = event["key"]
+ val = str(event["val"])
+ annotation = Annotation.string(key, val)
+ elif kind == "timestamp":
+ timestamp = event.timestamp
+ #timestamp has different digit length
+ timestamp = str(timestamp)
+ timestamp = timestamp[:-3]
+ event_name = event["event"]
+ annotation = Annotation.timestamp(event_name, int(timestamp))
+
+ # create and set endpoint
+ port = event["port_no"]
+ service = event["service_name"]
+ ip = event["ip"]
+ endpoint = Endpoint(ip, int(port), service)
+ annotation.endpoint = endpoint
+
+ print annotation
+ return annotation
+
+ def record(self, trace, annotation):
+ self.scribe_log(trace, [annotation])
+ '''
+ trace_key = (trace.trace_id, trace.span_id)
+ self._annotations_for_trace[trace_key].append(annotation)
+ if (annotation.name in self.DEFAULT_END_ANNOTATIONS):
+ saved_annotations = self._annotations_for_trace[trace_key]
+ del self._annotations_for_trace[trace_key]
+ self.scribe_log(trace, saved_annotations)
+ print "Record event"
+ '''
+
+ def scribe_log(self, trace, annotations):
+ trace._endpoint = None
+ message = base64_thrift_formatter(trace, annotations)
+ category = 'zipkin'
+ return self.log(category, message)
diff --git a/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h
new file mode 100644
index 000000000..4abc87b87
--- /dev/null
+++ b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h
@@ -0,0 +1,67 @@
+/*
+ * Zipkin lttng-ust tracepoint provider.
+ */
+
+#undef TRACEPOINT_PROVIDER
+#define TRACEPOINT_PROVIDER zipkin
+
+#undef TRACEPOINT_INCLUDE
+#define TRACEPOINT_INCLUDE "./zipkin_trace.h"
+
+#if !defined(_ZIPKIN_H) || defined(TRACEPOINT_HEADER_MULTI_READ)
+#define _ZIPKIN_H
+
+#include <lttng/tracepoint.h>
+
+TRACEPOINT_EVENT(
+ zipkin,
+ keyval,
+ TP_ARGS(char *, service, char *, trace_name,
+ int, port, char *, ip, long, trace,
+ long, span, long, parent_span,
+ char *, key, char *, val ),
+
+ TP_FIELDS(
+ ctf_string(trace_name, trace_name)
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ ctf_string(key, key)
+ ctf_string(val, val)
+ )
+)
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ keyval,
+ TRACE_WARNING)
+
+
+TRACEPOINT_EVENT(
+ zipkin,
+ timestamp,
+ TP_ARGS(char *, service, char *, trace_name,
+ int, port, char *, ip, long, trace,
+ long, span, long, parent_span,
+ char *, event),
+
+ TP_FIELDS(
+ ctf_string(trace_name, trace_name)
+ ctf_string(service_name, service)
+ ctf_integer(int, port_no, port)
+ ctf_string(ip, ip)
+ ctf_integer(long, trace_id, trace)
+ ctf_integer(long, span_id, span)
+ ctf_integer(long, parent_span_id, parent_span)
+ ctf_string(event, event)
+ )
+)
+TRACEPOINT_LOGLEVEL(
+ zipkin,
+ timestamp,
+ TRACE_WARNING)
+#endif /* _ZIPKIN_H */
+
+#include <lttng/tracepoint-event.h>