summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md21
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq42
-rwxr-xr-xfluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py150
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt3
-rwxr-xr-xfluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py124
5 files changed, 340 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md
new file mode 100644
index 00000000..a4ce80bd
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/README.md
@@ -0,0 +1,21 @@
+# Stats tools
+
+These tools are suitable for parsing librdkafka's statistics
+as emitted by the `stats_cb` when `statistics.interval.ms` is set.
+
+ * [to_csv.py](to_csv.py) - selectively convert stats JSON to CSV.
+ * [graph.py](graph.py) - graph CSV files.
+ * [filter.jq](filter.jq) - basic `jq` filter.
+
+Install dependencies:
+
+ $ python3 -m pip install -r requirements.txt
+
+
+Examples:
+
+ # Extract stats json from log line (test*.csv files are created)
+ $ grep -F STATS: file.log | sed -e 's/^.*STATS: //' | ./to_csv.py test1
+
+ # Graph toppar graphs (group by partition), but skip some columns.
+ $ ./graph.py --skip '*bytes,*msg_cnt,stateage,*msgs,leader' --group-by 1partition test1_toppars.csv
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq
new file mode 100644
index 00000000..414a2069
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/filter.jq
@@ -0,0 +1,42 @@
+# Usage:
+# cat stats.json | jq -R -f filter.jq
+
+fromjson? |
+{
+ time: .time | (. - (3600*5) | strftime("%Y-%m-%d %H:%M:%S")),
+ brokers:
+ [ .brokers[] | select(.req.Produce > 0) | {
+ (.nodeid | tostring): {
+ "nodeid": .nodeid,
+ "state": .state,
+ "stateage": (.stateage/1000000.0),
+ "connects": .connects,
+ "rtt_p99": .rtt.p99,
+ "throttle": .throttle.cnt,
+ "outbuf_cnt": .outbuf_cnt,
+ "outbuf_msg_cnt": .outbuf_msg_cnt,
+ "waitresp_cnt": .waitresp_cnt,
+ "Produce": .req.Produce,
+ "Metadata": .req.Metadata,
+ "toppar_cnt": (.toppars | length)
+ }
+ }
+ ],
+
+ topics:
+ [ .topics[] | select(.batchcnt.cnt > 0) | {
+ (.topic): {
+ "batchsize_p99": .batchsize.p99,
+ "batchcnt_p99": .batchcnt.p99,
+ "toppars": (.partitions[] | {
+ (.partition | tostring): {
+ leader: .leader,
+ msgq_cnt: .msgq_cnt,
+ xmit_msgq_cnt: .xmit_msgq_cnt,
+ txmsgs: .txmsgs,
+ msgs_inflight: .msgs_inflight
+ }
+ }),
+ }
+ } ]
+} \ No newline at end of file
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py
new file mode 100755
index 00000000..3eeaa154
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/graph.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+#
+# Use pandas + bokeh to create graphs/charts/plots for stats CSV (to_csv.py).
+#
+
+import os
+import pandas as pd
+from bokeh.io import curdoc
+from bokeh.models import ColumnDataSource, HoverTool
+from bokeh.plotting import figure
+from bokeh.palettes import Dark2_5 as palette
+from bokeh.models.formatters import DatetimeTickFormatter
+
+import pandas_bokeh
+import argparse
+import itertools
+from fnmatch import fnmatch
+
+datecolumn = '0time'
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description='Graph CSV files')
+ parser.add_argument('infiles', nargs='+', type=str,
+ help='CSV files to plot.')
+ parser.add_argument('--cols', type=str,
+ help='Columns to plot (CSV list)')
+ parser.add_argument('--skip', type=str,
+ help='Columns to skip (CSV list)')
+ parser.add_argument('--group-by', type=str,
+ help='Group data series by field')
+ parser.add_argument('--chart-cols', type=int, default=3,
+ help='Number of chart columns')
+ parser.add_argument('--plot-width', type=int, default=400,
+ help='Per-plot width')
+ parser.add_argument('--plot-height', type=int, default=300,
+ help='Per-plot height')
+ parser.add_argument('--out', type=str, default='out.html',
+ help='Output file (HTML)')
+ args = parser.parse_args()
+
+ outpath = args.out
+ if args.cols is None:
+ cols = None
+ else:
+ cols = args.cols.split(',')
+ cols.append(datecolumn)
+
+ if args.skip is None:
+ assert cols is None, "--cols and --skip are mutually exclusive"
+ skip = None
+ else:
+ skip = args.skip.split(',')
+
+ group_by = args.group_by
+
+ pandas_bokeh.output_file(outpath)
+ curdoc().theme = 'dark_minimal'
+
+ figs = {}
+ plots = []
+ for infile in args.infiles:
+
+ colors = itertools.cycle(palette)
+
+ cols_to_use = cols
+
+ if skip is not None:
+ # First read available fields
+ avail_cols = list(pd.read_csv(infile, nrows=1))
+
+ cols_to_use = [c for c in avail_cols
+ if len([x for x in skip if fnmatch(c, x)]) == 0]
+
+ df = pd.read_csv(infile,
+ parse_dates=[datecolumn],
+ index_col=datecolumn,
+ usecols=cols_to_use)
+ title = os.path.basename(infile)
+ print(f"{infile}:")
+
+ if group_by is not None:
+
+ grp = df.groupby([group_by])
+
+ # Make one plot per column, skipping the index and group_by cols.
+ for col in df.keys():
+ if col in (datecolumn, group_by):
+ continue
+
+ print("col: ", col)
+
+ for _, dg in grp:
+ print(col, " dg:\n", dg.head())
+ figtitle = f"{title}: {col}"
+ p = figs.get(figtitle, None)
+ if p is None:
+ p = figure(title=f"{title}: {col}",
+ plot_width=args.plot_width,
+ plot_height=args.plot_height,
+ x_axis_type='datetime',
+ tools="hover,box_zoom,wheel_zoom," +
+ "reset,pan,poly_select,tap,save")
+ figs[figtitle] = p
+ plots.append(p)
+
+ p.add_tools(HoverTool(
+ tooltips=[
+ ("index", "$index"),
+ ("time", "@0time{%F}"),
+ ("y", "$y"),
+ ("desc", "$name"),
+ ],
+ formatters={
+ "@0time": "datetime",
+ },
+ mode='vline'))
+
+ p.xaxis.formatter = DatetimeTickFormatter(
+ minutes=['%H:%M'],
+ seconds=['%H:%M:%S'])
+
+ source = ColumnDataSource(dg)
+
+ val = dg[group_by][0]
+ for k in dg:
+ if k != col:
+ continue
+
+ p.line(x=datecolumn, y=k, source=source,
+ legend_label=f"{k}[{val}]",
+ name=f"{k}[{val}]",
+ color=next(colors))
+
+ continue
+
+ else:
+ p = df.plot_bokeh(title=title,
+ kind='line', show_figure=False)
+
+ plots.append(p)
+
+ for p in plots:
+ p.legend.click_policy = "hide"
+
+ grid = []
+ for i in range(0, len(plots), args.chart_cols):
+ grid.append(plots[i:i + args.chart_cols])
+
+ pandas_bokeh.plot_grid(grid)
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt
new file mode 100644
index 00000000..1ea1d84d
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/requirements.txt
@@ -0,0 +1,3 @@
+pandas
+pandas-bokeh
+numpy
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py
new file mode 100755
index 00000000..d5fc9b6e
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/tools/stats/to_csv.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+#
+# Parse librdkafka stats JSON from stdin, one stats object per line, pick out
+# the relevant fields and emit CSV files suitable for plotting with graph.py
+#
+
+import sys
+import json
+from datetime import datetime
+from collections import OrderedDict
+
+
+def parse(linenr, string):
+ try:
+ js = json.loads(string)
+ except Exception:
+ return [], [], [], []
+
+ dt = datetime.utcfromtimestamp(js['time']).strftime('%Y-%m-%d %H:%M:%S')
+
+ top = {'0time': dt}
+ topcollect = ['msg_cnt', 'msg_size']
+ for c in topcollect:
+ top[c] = js[c]
+
+ top['msg_cnt_fill'] = (float(js['msg_cnt']) / js['msg_max']) * 100.0
+ top['msg_size_fill'] = (float(js['msg_size']) / js['msg_size_max']) * 100.0
+
+ collect = ['outbuf_cnt', 'outbuf_msg_cnt', 'tx',
+ 'waitresp_cnt', 'waitresp_msg_cnt', 'wakeups']
+
+ brokers = []
+ for b, d in js['brokers'].items():
+ if d['req']['Produce'] == 0:
+ continue
+
+ out = {'0time': dt, '1nodeid': d['nodeid']}
+ out['stateage'] = int(d['stateage'] / 1000)
+
+ for c in collect:
+ out[c] = d[c]
+
+ out['rtt_p99'] = int(d['rtt']['p99'] / 1000)
+ out['int_latency_p99'] = int(d['int_latency']['p99'] / 1000)
+ out['outbuf_latency_p99'] = int(d['outbuf_latency']['p99'] / 1000)
+ out['throttle_p99'] = d['throttle']['p99']
+ out['throttle_cnt'] = d['throttle']['cnt']
+ out['latency_p99'] = (out['int_latency_p99'] +
+ out['outbuf_latency_p99'] +
+ out['rtt_p99'])
+ out['toppars_cnt'] = len(d['toppars'])
+ out['produce_req'] = d['req']['Produce']
+
+ brokers.append(out)
+
+ tcollect = []
+ tpcollect = ['leader', 'msgq_cnt', 'msgq_bytes',
+ 'xmit_msgq_cnt', 'xmit_msgq_bytes',
+ 'txmsgs', 'txbytes', 'msgs_inflight']
+
+ topics = []
+ toppars = []
+ for t, d in js['topics'].items():
+
+ tout = {'0time': dt, '1topic': t}
+ for c in tcollect:
+ tout[c] = d[c]
+ tout['batchsize_p99'] = d['batchsize']['p99']
+ tout['batchcnt_p99'] = d['batchcnt']['p99']
+
+ for tp, d2 in d['partitions'].items():
+ if d2['txmsgs'] == 0:
+ continue
+
+ tpout = {'0time': dt, '1partition': d2['partition']}
+
+ for c in tpcollect:
+ tpout[c] = d2[c]
+
+ toppars.append(tpout)
+
+ topics.append(tout)
+
+ return [top], brokers, topics, toppars
+
+
+class CsvWriter(object):
+ def __init__(self, outpfx, name):
+ self.f = open(f"{outpfx}_{name}.csv", "w")
+ self.cnt = 0
+
+ def write(self, d):
+ od = OrderedDict(sorted(d.items()))
+ if self.cnt == 0:
+ # Write heading
+ self.f.write(','.join(od.keys()) + '\n')
+
+ self.f.write(','.join(map(str, od.values())) + '\n')
+ self.cnt += 1
+
+ def write_list(self, a_list_of_dicts):
+ for d in a_list_of_dicts:
+ self.write(d)
+
+
+out = sys.argv[1]
+
+w_top = CsvWriter(out, 'top')
+w_brokers = CsvWriter(out, 'brokers')
+w_topics = CsvWriter(out, 'topics')
+w_toppars = CsvWriter(out, 'toppars')
+
+
+for linenr, string in enumerate(sys.stdin):
+ try:
+ top, brokers, topics, toppars = parse(linenr, string)
+ except Exception as e:
+ print(f"SKIP {linenr+1}: {e}")
+ continue
+
+ w_top.write_list(top)
+ w_brokers.write_list(brokers)
+ w_topics.write_list(topics)
+ w_toppars.write_list(toppars)