summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py
blob: d5fc9b6e7cfd3ebba807878975ed2bbd01152be3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
#
# Parse librdkafka stats JSON from stdin, one stats object per line, pick out
# the relevant fields and emit CSV files suitable for plotting with graph.py
#

import sys
import json
from datetime import datetime
from collections import OrderedDict


def parse(linenr, string):
    try:
        js = json.loads(string)
    except Exception:
        return [], [], [], []

    dt = datetime.utcfromtimestamp(js['time']).strftime('%Y-%m-%d %H:%M:%S')

    top = {'0time': dt}
    topcollect = ['msg_cnt', 'msg_size']
    for c in topcollect:
        top[c] = js[c]

    top['msg_cnt_fill'] = (float(js['msg_cnt']) / js['msg_max']) * 100.0
    top['msg_size_fill'] = (float(js['msg_size']) / js['msg_size_max']) * 100.0

    collect = ['outbuf_cnt', 'outbuf_msg_cnt', 'tx',
               'waitresp_cnt', 'waitresp_msg_cnt', 'wakeups']

    brokers = []
    for b, d in js['brokers'].items():
        if d['req']['Produce'] == 0:
            continue

        out = {'0time': dt, '1nodeid': d['nodeid']}
        out['stateage'] = int(d['stateage'] / 1000)

        for c in collect:
            out[c] = d[c]

        out['rtt_p99'] = int(d['rtt']['p99'] / 1000)
        out['int_latency_p99'] = int(d['int_latency']['p99'] / 1000)
        out['outbuf_latency_p99'] = int(d['outbuf_latency']['p99'] / 1000)
        out['throttle_p99'] = d['throttle']['p99']
        out['throttle_cnt'] = d['throttle']['cnt']
        out['latency_p99'] = (out['int_latency_p99'] +
                              out['outbuf_latency_p99'] +
                              out['rtt_p99'])
        out['toppars_cnt'] = len(d['toppars'])
        out['produce_req'] = d['req']['Produce']

        brokers.append(out)

    tcollect = []
    tpcollect = ['leader', 'msgq_cnt', 'msgq_bytes',
                 'xmit_msgq_cnt', 'xmit_msgq_bytes',
                 'txmsgs', 'txbytes', 'msgs_inflight']

    topics = []
    toppars = []
    for t, d in js['topics'].items():

        tout = {'0time': dt, '1topic': t}
        for c in tcollect:
            tout[c] = d[c]
        tout['batchsize_p99'] = d['batchsize']['p99']
        tout['batchcnt_p99'] = d['batchcnt']['p99']

        for tp, d2 in d['partitions'].items():
            if d2['txmsgs'] == 0:
                continue

            tpout = {'0time': dt, '1partition': d2['partition']}

            for c in tpcollect:
                tpout[c] = d2[c]

            toppars.append(tpout)

        topics.append(tout)

    return [top], brokers, topics, toppars


class CsvWriter(object):
    def __init__(self, outpfx, name):
        self.f = open(f"{outpfx}_{name}.csv", "w")
        self.cnt = 0

    def write(self, d):
        od = OrderedDict(sorted(d.items()))
        if self.cnt == 0:
            # Write heading
            self.f.write(','.join(od.keys()) + '\n')

        self.f.write(','.join(map(str, od.values())) + '\n')
        self.cnt += 1

    def write_list(self, a_list_of_dicts):
        for d in a_list_of_dicts:
            self.write(d)


out = sys.argv[1]

w_top = CsvWriter(out, 'top')
w_brokers = CsvWriter(out, 'brokers')
w_topics = CsvWriter(out, 'topics')
w_toppars = CsvWriter(out, 'toppars')


for linenr, string in enumerate(sys.stdin):
    try:
        top, brokers, topics, toppars = parse(linenr, string)
    except Exception as e:
        print(f"SKIP {linenr+1}: {e}")
        continue

    w_top.write_list(top)
    w_brokers.write_list(brokers)
    w_topics.write_list(topics)
    w_toppars.write_list(toppars)