summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/broker_version_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/broker_version_tests.py')
-rwxr-xr-xfluent-bit/lib/librdkafka-2.1.0/tests/broker_version_tests.py297
1 files changed, 297 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/broker_version_tests.py b/fluent-bit/lib/librdkafka-2.1.0/tests/broker_version_tests.py
new file mode 100755
index 000000000..717da28d5
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/broker_version_tests.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python3
+#
+#
+# Run librdkafka regression tests on with different SASL parameters
+# and broker verisons.
+#
+# Requires:
+# trivup python module
+# gradle in your PATH
+
+from cluster_testing import (
+ LibrdkafkaTestCluster,
+ print_report_summary,
+ read_scenario_conf)
+from LibrdkafkaTestApp import LibrdkafkaTestApp
+
+import subprocess
+import tempfile
+import os
+import sys
+import argparse
+import json
+
+
+def test_it(version, deploy=True, conf={}, rdkconf={}, tests=None,
+ interact=False, debug=False, scenario="default"):
+ """
+ @brief Create, deploy and start a Kafka cluster using Kafka \\p version
+ Then run librdkafka's regression tests.
+ """
+
+ cluster = LibrdkafkaTestCluster(version, conf,
+ num_brokers=int(conf.get('broker_cnt', 3)),
+ debug=debug, scenario=scenario)
+
+ # librdkafka's regression tests, as an App.
+ _rdkconf = conf.copy() # Base rdkconf on cluster conf + rdkconf
+ _rdkconf.update(rdkconf)
+ rdkafka = LibrdkafkaTestApp(cluster, version, _rdkconf, tests=tests,
+ scenario=scenario)
+ rdkafka.do_cleanup = False
+
+ if deploy:
+ cluster.deploy()
+
+ cluster.start(timeout=30)
+
+ if conf.get('test_mode', '') == 'bash':
+ cmd = 'bash --rcfile <(cat ~/.bashrc; echo \'PS1="[TRIVUP:%s@%s] \\u@\\h:\\w$ "\')' % ( # noqa: E501
+ cluster.name, version)
+ subprocess.call(
+ cmd,
+ env=rdkafka.env,
+ shell=True,
+ executable='/bin/bash')
+ report = None
+
+ else:
+ rdkafka.start()
+ print(
+ '# librdkafka regression tests started, logs in %s' %
+ rdkafka.root_path())
+ rdkafka.wait_stopped(timeout=60 * 30)
+
+ report = rdkafka.report()
+ report['root_path'] = rdkafka.root_path()
+
+ if report.get('tests_failed', 0) > 0 and interact:
+ print(
+ '# Connect to cluster with bootstrap.servers %s' %
+ cluster.bootstrap_servers())
+ print('# Exiting the shell will bring down the cluster. '
+ 'Good luck.')
+ subprocess.call(
+ 'bash --rcfile <(cat ~/.bashrc; echo \'PS1="[TRIVUP:%s@%s] \\u@\\h:\\w$ "\')' % # noqa: E501
+ (cluster.name, version), env=rdkafka.env, shell=True,
+ executable='/bin/bash')
+
+ cluster.stop(force=True)
+
+ cluster.cleanup()
+ return report
+
+
+def handle_report(report, version, suite):
+ """ Parse test report and return tuple (Passed(bool), Reason(str)) """
+ test_cnt = report.get('tests_run', 0)
+
+ if test_cnt == 0:
+ return (False, 'No tests run')
+
+ passed = report.get('tests_passed', 0)
+ failed = report.get('tests_failed', 0)
+ if 'all' in suite.get('expect_fail', []) or version in suite.get(
+ 'expect_fail', []):
+ expect_fail = True
+ else:
+ expect_fail = False
+
+ if expect_fail:
+ if failed == test_cnt:
+ return (True, 'All %d/%d tests failed as expected' %
+ (failed, test_cnt))
+ else:
+ return (False, '%d/%d tests failed: expected all to fail' %
+ (failed, test_cnt))
+ else:
+ if failed > 0:
+ return (False, '%d/%d tests passed: expected all to pass' %
+ (passed, test_cnt))
+ else:
+ return (True, 'All %d/%d tests passed as expected' %
+ (passed, test_cnt))
+
+
+if __name__ == '__main__':
+
+ parser = argparse.ArgumentParser(
+ description='Run librdkafka tests on a range of broker versions')
+
+ parser.add_argument('--debug', action='store_true', default=False,
+ help='Enable trivup debugging')
+ parser.add_argument('--conf', type=str, dest='conf', default=None,
+ help='trivup JSON config object (not file)')
+ parser.add_argument('--rdkconf', type=str, dest='rdkconf', default=None,
+ help='trivup JSON config object (not file) '
+ 'for LibrdkafkaTestApp')
+ parser.add_argument('--scenario', type=str, dest='scenario',
+ default='default',
+ help='Test scenario (see scenarios/ directory)')
+ parser.add_argument('--tests', type=str, dest='tests', default=None,
+ help='Test to run (e.g., "0002")')
+ parser.add_argument('--report', type=str, dest='report', default=None,
+ help='Write test suites report to this filename')
+ parser.add_argument('--interact', action='store_true', dest='interact',
+ default=False,
+ help='On test failure start a shell before bringing '
+ 'the cluster down.')
+ parser.add_argument('versions', type=str, nargs='*',
+ default=['0.8.1.1', '0.8.2.2', '0.9.0.1', '2.3.0'],
+ help='Broker versions to test')
+ parser.add_argument('--interactive', action='store_true',
+ dest='interactive',
+ default=False,
+ help='Start a shell instead of running tests')
+ parser.add_argument(
+ '--root',
+ type=str,
+ default=os.environ.get(
+ 'TRIVUP_ROOT',
+ 'tmp'),
+ help='Root working directory')
+ parser.add_argument(
+ '--port',
+ default=None,
+ help='Base TCP port to start allocating from')
+ parser.add_argument(
+ '--kafka-src',
+ dest='kafka_path',
+ type=str,
+ default=None,
+ help='Path to Kafka git repo checkout (used for version=trunk)')
+ parser.add_argument(
+ '--brokers',
+ dest='broker_cnt',
+ type=int,
+ default=3,
+ help='Number of Kafka brokers')
+ parser.add_argument('--ssl', dest='ssl', action='store_true',
+ default=False,
+ help='Enable SSL endpoints')
+ parser.add_argument(
+ '--sasl',
+ dest='sasl',
+ type=str,
+ default=None,
+ help='SASL mechanism (PLAIN, GSSAPI)')
+
+ args = parser.parse_args()
+
+ conf = dict()
+ rdkconf = dict()
+
+ if args.conf is not None:
+ args.conf = json.loads(args.conf)
+ else:
+ args.conf = {}
+
+ if args.port is not None:
+ args.conf['port_base'] = int(args.port)
+ if args.kafka_path is not None:
+ args.conf['kafka_path'] = args.kafka_path
+ if args.ssl:
+ args.conf['security.protocol'] = 'SSL'
+ if args.sasl:
+ if args.sasl == 'PLAIN' and 'sasl_users' not in args.conf:
+ args.conf['sasl_users'] = 'testuser=testpass'
+ args.conf['sasl_mechanisms'] = args.sasl
+ args.conf['sasl_servicename'] = 'kafka'
+ if args.interactive:
+ args.conf['test_mode'] = 'bash'
+ args.conf['broker_cnt'] = args.broker_cnt
+
+ conf.update(args.conf)
+ if args.rdkconf is not None:
+ rdkconf.update(json.loads(args.rdkconf))
+
+ conf.update(read_scenario_conf(args.scenario))
+
+ if args.tests is not None:
+ tests = args.tests.split(',')
+ elif 'tests' in conf:
+ tests = conf.get('tests', '').split(',')
+ else:
+ tests = None
+
+ # Test version + suite matrix
+ if 'versions' in conf:
+ versions = conf.get('versions')
+ else:
+ versions = args.versions
+ suites = [{'name': 'standard'}]
+
+ pass_cnt = 0
+ fail_cnt = 0
+ for version in versions:
+ for suite in suites:
+ _conf = conf.copy()
+ _conf.update(suite.get('conf', {}))
+ _rdkconf = rdkconf.copy()
+ _rdkconf.update(suite.get('rdkconf', {}))
+
+ if 'version' not in suite:
+ suite['version'] = dict()
+
+ # Run tests
+ print('#### Version %s, suite %s, scenario %s: STARTING' %
+ (version, suite['name'], args.scenario))
+ report = test_it(version, tests=tests, conf=_conf,
+ rdkconf=_rdkconf,
+ interact=args.interact, debug=args.debug,
+ scenario=args.scenario)
+
+ if not report:
+ continue
+
+ # Handle test report
+ report['version'] = version
+ passed, reason = handle_report(report, version, suite)
+ report['PASSED'] = passed
+ report['REASON'] = reason
+
+ if passed:
+ print('\033[42m#### Version %s, suite %s: PASSED: %s\033[0m' %
+ (version, suite['name'], reason))
+ pass_cnt += 1
+ else:
+ print('\033[41m#### Version %s, suite %s: FAILED: %s\033[0m' %
+ (version, suite['name'], reason))
+ fail_cnt += 1
+
+ # Emit hopefully relevant parts of the log on failure
+ subprocess.call(
+ "grep --color=always -B100 -A10 FAIL %s" %
+ (os.path.join(
+ report['root_path'],
+ 'stderr.log')),
+ shell=True)
+
+ print('#### Test output: %s/stderr.log' % (report['root_path']))
+
+ suite['version'][version] = report
+
+ # Write test suite report JSON file
+ if args.report is not None:
+ test_suite_report_file = args.report
+ f = open(test_suite_report_file, 'w')
+ else:
+ fd, test_suite_report_file = tempfile.mkstemp(prefix='test_suite_',
+ suffix='.json',
+ dir='.')
+ f = os.fdopen(fd, 'w')
+
+ full_report = {'suites': suites, 'pass_cnt': pass_cnt,
+ 'fail_cnt': fail_cnt, 'total_cnt': pass_cnt + fail_cnt}
+
+ f.write(json.dumps(full_report))
+ f.close()
+
+ print('\n\n\n')
+ print_report_summary(full_report)
+ print('#### Full test suites report in: %s' % test_suite_report_file)
+
+ if pass_cnt == 0 or fail_cnt > 0:
+ sys.exit(1)
+ else:
+ sys.exit(0)