diff options
Diffstat (limited to 'src/blkin/babeltrace-plugins/zipkin')
8 files changed, 933 insertions, 0 deletions
diff --git a/src/blkin/babeltrace-plugins/zipkin/README.md b/src/blkin/babeltrace-plugins/zipkin/README.md new file mode 100644 index 000000000..95cebe87b --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/README.md @@ -0,0 +1,6 @@ +babeltrace-zipkin plugin +======================== + +In order to use this plugin, the traces created by LTTng should follow a +specific format. This format is provided in zipkin_trace.h file. If this +format is not followed the traces will be dropped. diff --git a/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py new file mode 100755 index 000000000..5677338c9 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/babeltrace_zipkin.py @@ -0,0 +1,69 @@ +#!/usr/bin/python +# babeltrace_zipkin.py + +import sys +sys.path.append("../../babeltrace-plugins") +import sys +import getopt +from babeltrace import * +from zipkin_logic.zipkin_client import ZipkinClient +HELP = "Usage: python babeltrace_zipkin.py path/to/file -s <server> -p <port>" + + +def main(argv): + try: + path = argv[0] + except: + raise TypeError(HELP) + + try: + opts, args = getopt.getopt(argv[1:], "hs:p:") + except getopt.GetoptError: + raise TypeError(HELP) + + server = None + port = None + for opt, arg in opts: + if opt == '-h': + raise TypeError(HELP) + elif opt == '-s': + server = arg + elif opt == '-p': + port = arg + + if not server: + server = "83.212.113.88" + if not port: + port = 1463 + + # Open connection with scribe + zipkin = ZipkinClient(port, server) + + # Create TraceCollection and add trace: + traces = TraceCollection() + trace_handle = traces.add_trace(path, "ctf") + if trace_handle is None: + raise IOError("Error adding trace") + + for event in traces.events: + name = event.name + try: + provider, kind = name.split(":") + if provider != "zipkin": + raise + except: + continue + + #create a zipkin trace from event info + trace = zipkin.create_trace(event) + + #create a zipkin annotation from event info + annotation = zipkin.create_annotation(event, kind) + + #record the trace + zipkin.record(trace, annotation) + + zipkin.close() + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/__init__.py diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py new file mode 100644 index 000000000..70874c0b6 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/formatters.py @@ -0,0 +1,131 @@ +# Copyright 2012 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import struct +import socket + +from thrift.protocol import TBinaryProtocol +from thrift.transport import TTransport + +import ttypes + + +def hex_str(n): + return '%0.16x' % (n,) + + +def json_formatter(traces, *json_args, **json_kwargs): + json_traces = [] + + for (trace, annotations) in traces: + json_trace = { + 'trace_id': hex_str(trace.trace_id), + 'span_id': hex_str(trace.span_id), + 'name': trace.name, + 'annotations': [] + } + + if trace.parent_span_id: + json_trace['parent_span_id'] = hex_str(trace.parent_span_id) + + for annotation in annotations: + json_annotation = { + 'key': annotation.name, + 'value': annotation.value, + 'type': annotation.annotation_type + } + + if annotation.endpoint: + json_annotation['host'] = { + 'ipv4': annotation.endpoint.ipv4, + 'port': annotation.endpoint.port, + 'service_name': annotation.endpoint.service_name + } + + json_trace['annotations'].append(json_annotation) + + json_traces.append(json_trace) + + return json.dumps(json_traces, *json_args, **json_kwargs) + + +def ipv4_to_int(ipv4): + return struct.unpack('!i', socket.inet_aton(ipv4))[0] + + +def base64_thrift(thrift_obj): + trans = TTransport.TMemoryBuffer() + tbp = TBinaryProtocol.TBinaryProtocol(trans) + + thrift_obj.write(tbp) + res = trans.getvalue().encode('base64').strip() + res = res.replace("\n","") + #print res + #print len(res) + return res + #return trans.getvalue().encode('base64').strip() + + +def binary_annotation_formatter(annotation, host=None): + annotation_types = { + 'string': ttypes.AnnotationType.STRING, + 'bytes': ttypes.AnnotationType.BYTES, + } + + annotation_type = annotation_types[annotation.annotation_type] + + value = annotation.value + + if isinstance(value, unicode): + value = value.encode('utf-8') + + return ttypes.BinaryAnnotation( + annotation.name, + value, + annotation_type, + host) + + +def base64_thrift_formatter(trace, annotations): + thrift_annotations = [] + binary_annotations = [] + + for annotation in annotations: + host = None + if annotation.endpoint: + host = ttypes.Endpoint( + ipv4=ipv4_to_int(annotation.endpoint.ipv4), + port=annotation.endpoint.port, + service_name=annotation.endpoint.service_name) + + if annotation.annotation_type == 'timestamp': + thrift_annotations.append(ttypes.Annotation( + timestamp=annotation.value, + value=annotation.name, + host=host)) + else: + binary_annotations.append( + binary_annotation_formatter(annotation, host)) + + thrift_trace = ttypes.Span( + trace_id=trace.trace_id, + name=trace.name, + id=trace.span_id, + parent_id=trace.parent_span_id, + annotations=thrift_annotations, + binary_annotations=binary_annotations + ) + + return base64_thrift(thrift_trace) diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py new file mode 100644 index 000000000..e0a3ed2c3 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/trace.py @@ -0,0 +1,137 @@ +import math +import time +import random + + +class Trace(object): + """ + An L{ITrace} provider which delegates to zero or more L{ITracers} and + allows setting a default L{IEndpoint} to associate with L{IAnnotation}s + + @ivar _tracers: C{list} of one or more L{ITracer} providers. + @ivar _endpoint: An L{IEndpoint} provider. + """ + def __init__(self, name, trace_id=None, span_id=None, + parent_span_id=None, tracers=None): + """ + @param name: C{str} describing the current span. + @param trace_id: C{int} or C{None} + @param span_id: C{int} or C{None} + @param parent_span_id: C{int} or C{None} + + @param tracers: C{list} of L{ITracer} providers, primarily useful + for unit testing. + """ + self.name = name + # If no trace_id and span_id are given we want to generate new + # 64-bit integer ids. + self.trace_id = trace_id + self.span_id = span_id + + # If no parent_span_id is given then we assume there is no parent span + # and leave it as None. + self.parent_span_id = parent_span_id + + # If no tracers are given we get the global list of tracers. + self._tracers = tracers + + # By default no endpoint will be associated with annotations recorded + # to this trace. + self._endpoint = None + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return ( + '{0.__class__.__name__}({0.name!r}, trace_id={0.trace_id!r}, ' + 'span_id={0.span_id!r}, parent_span_id={0.parent_span_id!r})' + ).format(self) + + def set_endpoint(self, endpoint): + """ + Set a default L{IEndpoint} provider for the current L{Trace}. + All annotations recorded after this endpoint is set will use it, + unless they provide their own endpoint. + """ + self._endpoint = endpoint + + +class Endpoint(object): + + def __init__(self, ipv4, port, service_name): + """ + @param ipv4: C{str} ipv4 address. + @param port: C{int} port number. + @param service_name: C{str} service name. + """ + self.ipv4 = ipv4 + self.port = port + self.service_name = service_name + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return ('{0.__class__.__name__}({0.ipv4!r}, {0.port!r}, ' + '{0.service_name!r})').format(self) + + +class Annotation(object): + + def __init__(self, name, value, annotation_type, endpoint=None): + """ + @param name: C{str} name of this annotation. + + @param value: A value of the appropriate type based on + C{annotation_type}. + + @param annotation_type: C{str} the expected type of our C{value}. + + @param endpoint: An optional L{IEndpoint} provider to associate with + this annotation or C{None} + """ + self.name = name + self.value = value + self.annotation_type = annotation_type + self.endpoint = endpoint + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return ( + '{0.__class__.__name__}({0.name!r}, {0.value!r}, ' + '{0.annotation_type!r}, {0.endpoint})' + ).format(self) + + @classmethod + def timestamp(cls, name, timestamp=None): + if timestamp is None: + timestamp = math.trunc(time.time() * 1000 * 1000) + + return cls(name, timestamp, 'timestamp') + + @classmethod + def client_send(cls, timestamp=None): + return cls.timestamp(constants.CLIENT_SEND, timestamp) + + @classmethod + def client_recv(cls, timestamp=None): + return cls.timestamp(constants.CLIENT_RECV, timestamp) + + @classmethod + def server_send(cls, timestamp=None): + return cls.timestamp(constants.SERVER_SEND, timestamp) + + @classmethod + def server_recv(cls, timestamp=None): + return cls.timestamp(constants.SERVER_RECV, timestamp) + + @classmethod + def string(cls, name, value): + return cls(name, value, 'string') + + @classmethod + def bytes(cls, name, value): + return cls(name, value, 'bytes') diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py new file mode 100644 index 000000000..8605559b7 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/ttypes.py @@ -0,0 +1,453 @@ +# +# Autogenerated by Thrift Compiler (0.8.0) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py:twisted +# + +from thrift.Thrift import TType, TMessageType, TException + +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol, TProtocol +try: + from thrift.protocol import fastbinary +except: + fastbinary = None + + +class AnnotationType: + BOOL = 0 + BYTES = 1 + I16 = 2 + I32 = 3 + I64 = 4 + DOUBLE = 5 + STRING = 6 + + _VALUES_TO_NAMES = { + 0: "BOOL", + 1: "BYTES", + 2: "I16", + 3: "I32", + 4: "I64", + 5: "DOUBLE", + 6: "STRING", + } + + _NAMES_TO_VALUES = { + "BOOL": 0, + "BYTES": 1, + "I16": 2, + "I32": 3, + "I64": 4, + "DOUBLE": 5, + "STRING": 6, + } + + +class Endpoint: + """ + Attributes: + - ipv4 + - port + - service_name + """ + + thrift_spec = ( + None, # 0 + (1, TType.I32, 'ipv4', None, None, ), # 1 + (2, TType.I16, 'port', None, None, ), # 2 + (3, TType.STRING, 'service_name', None, None, ), # 3 + ) + + def __init__(self, ipv4=None, port=None, service_name=None,): + self.ipv4 = ipv4 + self.port = port + self.service_name = service_name + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I32: + self.ipv4 = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I16: + self.port = iprot.readI16(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.service_name = iprot.readString(); + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Endpoint') + if self.ipv4 is not None: + oprot.writeFieldBegin('ipv4', TType.I32, 1) + oprot.writeI32(self.ipv4) + oprot.writeFieldEnd() + if self.port is not None: + oprot.writeFieldBegin('port', TType.I16, 2) + oprot.writeI16(self.port) + oprot.writeFieldEnd() + if self.service_name is not None: + oprot.writeFieldBegin('service_name', TType.STRING, 3) + oprot.writeString(self.service_name) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Annotation: + """ + Attributes: + - timestamp + - value + - host + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'timestamp', None, None, ), # 1 + (2, TType.STRING, 'value', None, None, ), # 2 + (3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3 + ) + + def __init__(self, timestamp=None, value=None, host=None,): + self.timestamp = timestamp + self.value = value + self.host = host + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.timestamp = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Annotation') + if self.timestamp is not None: + oprot.writeFieldBegin('timestamp', TType.I64, 1) + oprot.writeI64(self.timestamp) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 3) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class BinaryAnnotation: + """ + Attributes: + - key + - value + - annotation_type + - host + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'key', None, None, ), # 1 + (2, TType.STRING, 'value', None, None, ), # 2 + (3, TType.I32, 'annotation_type', None, None, ), # 3 + (4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4 + ) + + def __init__(self, key=None, value=None, annotation_type=None, host=None,): + self.key = key + self.value = value + self.annotation_type = annotation_type + self.host = host + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.key = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.value = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I32: + self.annotation_type = iprot.readI32(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRUCT: + self.host = Endpoint() + self.host.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('BinaryAnnotation') + if self.key is not None: + oprot.writeFieldBegin('key', TType.STRING, 1) + oprot.writeString(self.key) + oprot.writeFieldEnd() + if self.value is not None: + oprot.writeFieldBegin('value', TType.STRING, 2) + oprot.writeString(self.value) + oprot.writeFieldEnd() + if self.annotation_type is not None: + oprot.writeFieldBegin('annotation_type', TType.I32, 3) + oprot.writeI32(self.annotation_type) + oprot.writeFieldEnd() + if self.host is not None: + oprot.writeFieldBegin('host', TType.STRUCT, 4) + self.host.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class Span: + """ + Attributes: + - trace_id + - name + - id + - parent_id + - annotations + - binary_annotations + """ + + thrift_spec = ( + None, # 0 + (1, TType.I64, 'trace_id', None, None, ), # 1 + None, # 2 + (3, TType.STRING, 'name', None, None, ), # 3 + (4, TType.I64, 'id', None, None, ), # 4 + (5, TType.I64, 'parent_id', None, None, ), # 5 + (6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6 + None, # 7 + (8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8 + ) + + def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,): + self.trace_id = trace_id + self.name = name + self.id = id + self.parent_id = parent_id + self.annotations = annotations + self.binary_annotations = binary_annotations + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.I64: + self.trace_id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.name = iprot.readString(); + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.parent_id = iprot.readI64(); + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.annotations = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in xrange(_size0): + _elem5 = Annotation() + _elem5.read(iprot) + self.annotations.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.LIST: + self.binary_annotations = [] + (_etype9, _size6) = iprot.readListBegin() + for _i10 in xrange(_size6): + _elem11 = BinaryAnnotation() + _elem11.read(iprot) + self.binary_annotations.append(_elem11) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('Span') + if self.trace_id is not None: + oprot.writeFieldBegin('trace_id', TType.I64, 1) + oprot.writeI64(self.trace_id) + oprot.writeFieldEnd() + if self.name is not None: + oprot.writeFieldBegin('name', TType.STRING, 3) + oprot.writeString(self.name) + oprot.writeFieldEnd() + if self.id is not None: + oprot.writeFieldBegin('id', TType.I64, 4) + oprot.writeI64(self.id) + oprot.writeFieldEnd() + if self.parent_id is not None: + oprot.writeFieldBegin('parent_id', TType.I64, 5) + oprot.writeI64(self.parent_id) + oprot.writeFieldEnd() + if self.annotations is not None: + oprot.writeFieldBegin('annotations', TType.LIST, 6) + oprot.writeListBegin(TType.STRUCT, len(self.annotations)) + for iter12 in self.annotations: + iter12.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.binary_annotations is not None: + oprot.writeFieldBegin('binary_annotations', TType.LIST, 8) + oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations)) + for iter13 in self.binary_annotations: + iter13.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py new file mode 100644 index 000000000..28118facb --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/src/zipkin_logic/zipkin_client.py @@ -0,0 +1,70 @@ +#!/usr/bin/python + +from scribe_client import ScribeClient +from trace import Annotation, Trace, Endpoint +from collections import defaultdict +from formatters import base64_thrift_formatter + + +class ZipkinClient(ScribeClient): + + DEFAULT_END_ANNOTATIONS = ("ss", "cr", "end") + + def __init__(self, port, host): + super(ZipkinClient, self).__init__(port, host) + self._annotations_for_trace = defaultdict(list) + + def create_trace(self, event): + service = event["trace_name"] + trace_id = event["trace_id"] + span_id = event["span_id"] + parent_span = event["parent_span_id"] + if parent_span == 0: + parent_span = None + trace = Trace(service, trace_id, span_id, parent_span) + return trace + + def create_annotation(self, event, kind): + if kind == "keyval_string": + key = event["key"] + val = event["val"] + annotation = Annotation.string(key, val) + elif kind == "keyval_integer": + key = event["key"] + val = str(event["val"]) + annotation = Annotation.string(key, val) + elif kind == "timestamp": + timestamp = event.timestamp + #timestamp has different digit length + timestamp = str(timestamp) + timestamp = timestamp[:-3] + event_name = event["event"] + annotation = Annotation.timestamp(event_name, int(timestamp)) + + # create and set endpoint + port = event["port_no"] + service = event["service_name"] + ip = event["ip"] + endpoint = Endpoint(ip, int(port), service) + annotation.endpoint = endpoint + + print annotation + return annotation + + def record(self, trace, annotation): + self.scribe_log(trace, [annotation]) + ''' + trace_key = (trace.trace_id, trace.span_id) + self._annotations_for_trace[trace_key].append(annotation) + if (annotation.name in self.DEFAULT_END_ANNOTATIONS): + saved_annotations = self._annotations_for_trace[trace_key] + del self._annotations_for_trace[trace_key] + self.scribe_log(trace, saved_annotations) + print "Record event" + ''' + + def scribe_log(self, trace, annotations): + trace._endpoint = None + message = base64_thrift_formatter(trace, annotations) + category = 'zipkin' + return self.log(category, message) diff --git a/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h new file mode 100644 index 000000000..4abc87b87 --- /dev/null +++ b/src/blkin/babeltrace-plugins/zipkin/zipkin_trace.h @@ -0,0 +1,67 @@ +/* + * Zipkin lttng-ust tracepoint provider. + */ + +#undef TRACEPOINT_PROVIDER +#define TRACEPOINT_PROVIDER zipkin + +#undef TRACEPOINT_INCLUDE +#define TRACEPOINT_INCLUDE "./zipkin_trace.h" + +#if !defined(_ZIPKIN_H) || defined(TRACEPOINT_HEADER_MULTI_READ) +#define _ZIPKIN_H + +#include <lttng/tracepoint.h> + +TRACEPOINT_EVENT( + zipkin, + keyval, + TP_ARGS(char *, service, char *, trace_name, + int, port, char *, ip, long, trace, + long, span, long, parent_span, + char *, key, char *, val ), + + TP_FIELDS( + ctf_string(trace_name, trace_name) + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + ctf_string(key, key) + ctf_string(val, val) + ) +) +TRACEPOINT_LOGLEVEL( + zipkin, + keyval, + TRACE_WARNING) + + +TRACEPOINT_EVENT( + zipkin, + timestamp, + TP_ARGS(char *, service, char *, trace_name, + int, port, char *, ip, long, trace, + long, span, long, parent_span, + char *, event), + + TP_FIELDS( + ctf_string(trace_name, trace_name) + ctf_string(service_name, service) + ctf_integer(int, port_no, port) + ctf_string(ip, ip) + ctf_integer(long, trace_id, trace) + ctf_integer(long, span_id, span) + ctf_integer(long, parent_span_id, parent_span) + ctf_string(event, event) + ) +) +TRACEPOINT_LOGLEVEL( + zipkin, + timestamp, + TRACE_WARNING) +#endif /* _ZIPKIN_H */ + +#include <lttng/tracepoint-event.h> |