diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/librdkafka-2.1.0/tests/tools | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/tools')
6 files changed, 344 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/README.md b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/README.md new file mode 100644 index 000000000..f1ec5681b --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/README.md @@ -0,0 +1,4 @@ +# Tools + +Asorted librdkafka tools. + diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md new file mode 100644 index 000000000..a4ce80bd9 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md @@ -0,0 +1,21 @@ +# Stats tools + +These tools are suitable for parsing librdkafka's statistics +as emitted by the `stats_cb` when `statistics.interval.ms` is set. + + * [to_csv.py](to_csv.py) - selectively convert stats JSON to CSV. + * [graph.py](graph.py) - graph CSV files. + * [filter.jq](filter.jq) - basic `jq` filter. + +Install dependencies: + + $ python3 -m pip install -r requirements.txt + + +Examples: + + # Extract stats json from log line (test*.csv files are created) + $ grep -F STATS: file.log | sed -e 's/^.*STATS: //' | ./to_csv.py test1 + + # Graph toppar graphs (group by partition), but skip some columns. + $ ./graph.py --skip '*bytes,*msg_cnt,stateage,*msgs,leader' --group-by 1partition test1_toppars.csv diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq new file mode 100644 index 000000000..414a20697 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq @@ -0,0 +1,42 @@ +# Usage: +# cat stats.json | jq -R -f filter.jq + +fromjson? | +{ + time: .time | (. - (3600*5) | strftime("%Y-%m-%d %H:%M:%S")), + brokers: + [ .brokers[] | select(.req.Produce > 0) | { + (.nodeid | tostring): { + "nodeid": .nodeid, + "state": .state, + "stateage": (.stateage/1000000.0), + "connects": .connects, + "rtt_p99": .rtt.p99, + "throttle": .throttle.cnt, + "outbuf_cnt": .outbuf_cnt, + "outbuf_msg_cnt": .outbuf_msg_cnt, + "waitresp_cnt": .waitresp_cnt, + "Produce": .req.Produce, + "Metadata": .req.Metadata, + "toppar_cnt": (.toppars | length) + } + } + ], + + topics: + [ .topics[] | select(.batchcnt.cnt > 0) | { + (.topic): { + "batchsize_p99": .batchsize.p99, + "batchcnt_p99": .batchcnt.p99, + "toppars": (.partitions[] | { + (.partition | tostring): { + leader: .leader, + msgq_cnt: .msgq_cnt, + xmit_msgq_cnt: .xmit_msgq_cnt, + txmsgs: .txmsgs, + msgs_inflight: .msgs_inflight + } + }), + } + } ] +}
\ No newline at end of file diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py new file mode 100755 index 000000000..3eeaa1541 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +# +# Use pandas + bokeh to create graphs/charts/plots for stats CSV (to_csv.py). +# + +import os +import pandas as pd +from bokeh.io import curdoc +from bokeh.models import ColumnDataSource, HoverTool +from bokeh.plotting import figure +from bokeh.palettes import Dark2_5 as palette +from bokeh.models.formatters import DatetimeTickFormatter + +import pandas_bokeh +import argparse +import itertools +from fnmatch import fnmatch + +datecolumn = '0time' + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Graph CSV files') + parser.add_argument('infiles', nargs='+', type=str, + help='CSV files to plot.') + parser.add_argument('--cols', type=str, + help='Columns to plot (CSV list)') + parser.add_argument('--skip', type=str, + help='Columns to skip (CSV list)') + parser.add_argument('--group-by', type=str, + help='Group data series by field') + parser.add_argument('--chart-cols', type=int, default=3, + help='Number of chart columns') + parser.add_argument('--plot-width', type=int, default=400, + help='Per-plot width') + parser.add_argument('--plot-height', type=int, default=300, + help='Per-plot height') + parser.add_argument('--out', type=str, default='out.html', + help='Output file (HTML)') + args = parser.parse_args() + + outpath = args.out + if args.cols is None: + cols = None + else: + cols = args.cols.split(',') + cols.append(datecolumn) + + if args.skip is None: + assert cols is None, "--cols and --skip are mutually exclusive" + skip = None + else: + skip = args.skip.split(',') + + group_by = args.group_by + + pandas_bokeh.output_file(outpath) + curdoc().theme = 'dark_minimal' + + figs = {} + plots = [] + for infile in args.infiles: + + colors = itertools.cycle(palette) + + cols_to_use = cols + + if skip is not None: + # First read available fields + avail_cols = list(pd.read_csv(infile, nrows=1)) + + cols_to_use = [c for c in avail_cols + if len([x for x in skip if fnmatch(c, x)]) == 0] + + df = pd.read_csv(infile, + parse_dates=[datecolumn], + index_col=datecolumn, + usecols=cols_to_use) + title = os.path.basename(infile) + print(f"{infile}:") + + if group_by is not None: + + grp = df.groupby([group_by]) + + # Make one plot per column, skipping the index and group_by cols. + for col in df.keys(): + if col in (datecolumn, group_by): + continue + + print("col: ", col) + + for _, dg in grp: + print(col, " dg:\n", dg.head()) + figtitle = f"{title}: {col}" + p = figs.get(figtitle, None) + if p is None: + p = figure(title=f"{title}: {col}", + plot_width=args.plot_width, + plot_height=args.plot_height, + x_axis_type='datetime', + tools="hover,box_zoom,wheel_zoom," + + "reset,pan,poly_select,tap,save") + figs[figtitle] = p + plots.append(p) + + p.add_tools(HoverTool( + tooltips=[ + ("index", "$index"), + ("time", "@0time{%F}"), + ("y", "$y"), + ("desc", "$name"), + ], + formatters={ + "@0time": "datetime", + }, + mode='vline')) + + p.xaxis.formatter = DatetimeTickFormatter( + minutes=['%H:%M'], + seconds=['%H:%M:%S']) + + source = ColumnDataSource(dg) + + val = dg[group_by][0] + for k in dg: + if k != col: + continue + + p.line(x=datecolumn, y=k, source=source, + legend_label=f"{k}[{val}]", + name=f"{k}[{val}]", + color=next(colors)) + + continue + + else: + p = df.plot_bokeh(title=title, + kind='line', show_figure=False) + + plots.append(p) + + for p in plots: + p.legend.click_policy = "hide" + + grid = [] + for i in range(0, len(plots), args.chart_cols): + grid.append(plots[i:i + args.chart_cols]) + + pandas_bokeh.plot_grid(grid) diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt new file mode 100644 index 000000000..1ea1d84d2 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt @@ -0,0 +1,3 @@ +pandas +pandas-bokeh +numpy diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py new file mode 100755 index 000000000..d5fc9b6e7 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +# +# Parse librdkafka stats JSON from stdin, one stats object per line, pick out +# the relevant fields and emit CSV files suitable for plotting with graph.py +# + +import sys +import json +from datetime import datetime +from collections import OrderedDict + + +def parse(linenr, string): + try: + js = json.loads(string) + except Exception: + return [], [], [], [] + + dt = datetime.utcfromtimestamp(js['time']).strftime('%Y-%m-%d %H:%M:%S') + + top = {'0time': dt} + topcollect = ['msg_cnt', 'msg_size'] + for c in topcollect: + top[c] = js[c] + + top['msg_cnt_fill'] = (float(js['msg_cnt']) / js['msg_max']) * 100.0 + top['msg_size_fill'] = (float(js['msg_size']) / js['msg_size_max']) * 100.0 + + collect = ['outbuf_cnt', 'outbuf_msg_cnt', 'tx', + 'waitresp_cnt', 'waitresp_msg_cnt', 'wakeups'] + + brokers = [] + for b, d in js['brokers'].items(): + if d['req']['Produce'] == 0: + continue + + out = {'0time': dt, '1nodeid': d['nodeid']} + out['stateage'] = int(d['stateage'] / 1000) + + for c in collect: + out[c] = d[c] + + out['rtt_p99'] = int(d['rtt']['p99'] / 1000) + out['int_latency_p99'] = int(d['int_latency']['p99'] / 1000) + out['outbuf_latency_p99'] = int(d['outbuf_latency']['p99'] / 1000) + out['throttle_p99'] = d['throttle']['p99'] + out['throttle_cnt'] = d['throttle']['cnt'] + out['latency_p99'] = (out['int_latency_p99'] + + out['outbuf_latency_p99'] + + out['rtt_p99']) + out['toppars_cnt'] = len(d['toppars']) + out['produce_req'] = d['req']['Produce'] + + brokers.append(out) + + tcollect = [] + tpcollect = ['leader', 'msgq_cnt', 'msgq_bytes', + 'xmit_msgq_cnt', 'xmit_msgq_bytes', + 'txmsgs', 'txbytes', 'msgs_inflight'] + + topics = [] + toppars = [] + for t, d in js['topics'].items(): + + tout = {'0time': dt, '1topic': t} + for c in tcollect: + tout[c] = d[c] + tout['batchsize_p99'] = d['batchsize']['p99'] + tout['batchcnt_p99'] = d['batchcnt']['p99'] + + for tp, d2 in d['partitions'].items(): + if d2['txmsgs'] == 0: + continue + + tpout = {'0time': dt, '1partition': d2['partition']} + + for c in tpcollect: + tpout[c] = d2[c] + + toppars.append(tpout) + + topics.append(tout) + + return [top], brokers, topics, toppars + + +class CsvWriter(object): + def __init__(self, outpfx, name): + self.f = open(f"{outpfx}_{name}.csv", "w") + self.cnt = 0 + + def write(self, d): + od = OrderedDict(sorted(d.items())) + if self.cnt == 0: + # Write heading + self.f.write(','.join(od.keys()) + '\n') + + self.f.write(','.join(map(str, od.values())) + '\n') + self.cnt += 1 + + def write_list(self, a_list_of_dicts): + for d in a_list_of_dicts: + self.write(d) + + +out = sys.argv[1] + +w_top = CsvWriter(out, 'top') +w_brokers = CsvWriter(out, 'brokers') +w_topics = CsvWriter(out, 'topics') +w_toppars = CsvWriter(out, 'toppars') + + +for linenr, string in enumerate(sys.stdin): + try: + top, brokers, topics, toppars = parse(linenr, string) + except Exception as e: + print(f"SKIP {linenr+1}: {e}") + continue + + w_top.write_list(top) + w_brokers.write_list(brokers) + w_topics.write_list(topics) + w_toppars.write_list(toppars) |