#!/usr/bin/env python3 # # # Run librdkafka regression tests on with different SASL parameters # and broker verisons. # # Requires: # trivup python module # gradle in your PATH from cluster_testing import ( LibrdkafkaTestCluster, print_report_summary, print_test_report_summary, read_scenario_conf) from LibrdkafkaTestApp import LibrdkafkaTestApp import os import sys import argparse import json import tempfile def test_it(version, deploy=True, conf={}, rdkconf={}, tests=None, debug=False, scenario="default"): """ @brief Create, deploy and start a Kafka cluster using Kafka \\p version Then run librdkafka's regression tests. """ cluster = LibrdkafkaTestCluster( version, conf, debug=debug, scenario=scenario) # librdkafka's regression tests, as an App. rdkafka = LibrdkafkaTestApp(cluster, version, _rdkconf, tests=tests, scenario=scenario) rdkafka.do_cleanup = False rdkafka.local_tests = False if deploy: cluster.deploy() cluster.start(timeout=30) print( '# Connect to cluster with bootstrap.servers %s' % cluster.bootstrap_servers()) rdkafka.start() print( '# librdkafka regression tests started, logs in %s' % rdkafka.root_path()) try: rdkafka.wait_stopped(timeout=60 * 30) rdkafka.dbg( 'wait stopped: %s, runtime %ds' % (rdkafka.state, rdkafka.runtime())) except KeyboardInterrupt: print('# Aborted by user') report = rdkafka.report() if report is not None: report['root_path'] = rdkafka.root_path() cluster.stop(force=True) cluster.cleanup() return report def handle_report(report, version, suite): """ Parse test report and return tuple (Passed(bool), Reason(str)) """ test_cnt = report.get('tests_run', 0) if test_cnt == 0: return (False, 'No tests run') passed = report.get('tests_passed', 0) failed = report.get('tests_failed', 0) if 'all' in suite.get('expect_fail', []) or version in suite.get( 'expect_fail', []): expect_fail = True else: expect_fail = False if expect_fail: if failed == test_cnt: return (True, 'All %d/%d tests failed as expected' % (failed, test_cnt)) else: return (False, '%d/%d tests failed: expected all to fail' % (failed, test_cnt)) else: if failed > 0: return (False, '%d/%d tests passed: expected all to pass' % (passed, test_cnt)) else: return (True, 'All %d/%d tests passed as expected' % (passed, test_cnt)) if __name__ == '__main__': parser = argparse.ArgumentParser( description='Run librdkafka test suit using SASL on a ' 'trivupped cluster') parser.add_argument('--conf', type=str, dest='conf', default=None, help='trivup JSON config object (not file)') parser.add_argument('--rdkconf', type=str, dest='rdkconf', default=None, help='trivup JSON config object (not file) ' 'for LibrdkafkaTestApp') parser.add_argument('--scenario', type=str, dest='scenario', default='default', help='Test scenario (see scenarios/ directory)') parser.add_argument('--tests', type=str, dest='tests', default=None, help='Test to run (e.g., "0002")') parser.add_argument('--no-ssl', action='store_false', dest='ssl', default=True, help='Don\'t run SSL tests') parser.add_argument('--no-sasl', action='store_false', dest='sasl', default=True, help='Don\'t run SASL tests') parser.add_argument('--no-oidc', action='store_false', dest='oidc', default=True, help='Don\'t run OAuth/OIDC tests') parser.add_argument('--no-plaintext', action='store_false', dest='plaintext', default=True, help='Don\'t run PLAINTEXT tests') parser.add_argument('--report', type=str, dest='report', default=None, help='Write test suites report to this filename') parser.add_argument('--debug', action='store_true', dest='debug', default=False, help='Enable trivup debugging') parser.add_argument('--suite', type=str, default=None, help='Only run matching suite(s) (substring match)') parser.add_argument('versions', type=str, default=None, nargs='*', help='Limit broker versions to these') args = parser.parse_args() conf = dict() rdkconf = dict() if args.conf is not None: conf.update(json.loads(args.conf)) if args.rdkconf is not None: rdkconf.update(json.loads(args.rdkconf)) if args.tests is not None: tests = args.tests.split(',') else: tests = None conf.update(read_scenario_conf(args.scenario)) # Test version,supported mechs + suite matrix versions = list() if len(args.versions): for v in args.versions: versions.append( (v, ['SCRAM-SHA-512', 'PLAIN', 'GSSAPI', 'OAUTHBEARER'])) else: versions = [('3.1.0', ['SCRAM-SHA-512', 'PLAIN', 'GSSAPI', 'OAUTHBEARER']), ('2.1.0', ['SCRAM-SHA-512', 'PLAIN', 'GSSAPI', 'OAUTHBEARER']), ('0.10.2.0', ['SCRAM-SHA-512', 'PLAIN', 'GSSAPI']), ('0.9.0.1', ['GSSAPI']), ('0.8.2.2', [])] sasl_plain_conf = {'sasl_mechanisms': 'PLAIN', 'sasl_users': 'myuser=mypassword'} sasl_scram_conf = {'sasl_mechanisms': 'SCRAM-SHA-512', 'sasl_users': 'myuser=mypassword'} ssl_sasl_plain_conf = {'sasl_mechanisms': 'PLAIN', 'sasl_users': 'myuser=mypassword', 'security.protocol': 'SSL'} sasl_oauthbearer_conf = {'sasl_mechanisms': 'OAUTHBEARER', 'sasl_oauthbearer_config': 'scope=requiredScope principal=admin'} sasl_oauth_oidc_conf = {'sasl_mechanisms': 'OAUTHBEARER', 'sasl_oauthbearer_method': 'OIDC'} sasl_kerberos_conf = {'sasl_mechanisms': 'GSSAPI', 'sasl_servicename': 'kafka'} suites = [{'name': 'SASL PLAIN', 'run': (args.sasl and args.plaintext), 'conf': sasl_plain_conf, 'tests': ['0001'], 'expect_fail': ['0.9.0.1', '0.8.2.2']}, {'name': 'SASL SCRAM', 'run': (args.sasl and args.plaintext), 'conf': sasl_scram_conf, 'expect_fail': ['0.9.0.1', '0.8.2.2']}, {'name': 'PLAINTEXT (no SASL)', 'run': args.plaintext, 'tests': ['0001']}, {'name': 'SSL (no SASL)', 'run': args.ssl, 'conf': {'security.protocol': 'SSL'}, 'expect_fail': ['0.8.2.2']}, {'name': 'SASL_SSL PLAIN', 'run': (args.sasl and args.ssl and args.plaintext), 'conf': ssl_sasl_plain_conf, 'expect_fail': ['0.9.0.1', '0.8.2.2']}, {'name': 'SASL PLAIN with wrong username', 'run': (args.sasl and args.plaintext), 'conf': sasl_plain_conf, 'rdkconf': {'sasl_users': 'wrongjoe=mypassword'}, 'tests': ['0001'], 'expect_fail': ['all']}, {'name': 'SASL OAUTHBEARER', 'run': args.sasl, 'conf': sasl_oauthbearer_conf, 'tests': ['0001'], 'expect_fail': ['0.10.2.0', '0.9.0.1', '0.8.2.2']}, {'name': 'SASL OAUTHBEARER with wrong scope', 'run': args.sasl, 'conf': sasl_oauthbearer_conf, 'rdkconf': {'sasl_oauthbearer_config': 'scope=wrongScope'}, 'tests': ['0001'], 'expect_fail': ['all']}, {'name': 'OAuth/OIDC', 'run': args.oidc, 'tests': ['0001', '0126'], 'conf': sasl_oauth_oidc_conf, 'minver': '3.1.0', 'expect_fail': ['2.8.1', '2.1.0', '0.10.2.0', '0.9.0.1', '0.8.2.2']}, {'name': 'SASL Kerberos', 'run': args.sasl, 'conf': sasl_kerberos_conf, 'expect_fail': ['0.8.2.2']}] pass_cnt = 0 fail_cnt = 0 for version, supported in versions: if len(args.versions) > 0 and version not in args.versions: print('### Skipping version %s' % version) continue for suite in suites: if not suite.get('run', True): continue if args.suite is not None and suite['name'].find(args.suite) == -1: print( f'# Skipping {suite["name"]} due to --suite {args.suite}') continue if 'minver' in suite: minver = [int(x) for x in suite['minver'].split('.')][:3] this_version = [int(x) for x in version.split('.')][:3] if this_version < minver: print( f'# Skipping {suite["name"]} due to version {version} < minimum required version {suite["minver"]}') # noqa: E501 continue _conf = conf.copy() _conf.update(suite.get('conf', {})) _rdkconf = _conf.copy() _rdkconf.update(rdkconf) _rdkconf.update(suite.get('rdkconf', {})) if 'version' not in suite: suite['version'] = dict() # Disable SASL broker config if broker version does # not support the selected mechanism mech = suite.get('conf', dict()).get('sasl_mechanisms', None) if mech is not None and mech not in supported: print('# Disabled SASL for broker version %s' % version) _conf.pop('sasl_mechanisms', None) # Run tests print( '#### Version %s, suite %s: STARTING' % (version, suite['name'])) if tests is None: tests_to_run = suite.get('tests', None) else: tests_to_run = tests report = test_it(version, tests=tests_to_run, conf=_conf, rdkconf=_rdkconf, debug=args.debug, scenario=args.scenario) # Handle test report report['version'] = version passed, reason = handle_report(report, version, suite) report['PASSED'] = passed report['REASON'] = reason if passed: print('\033[42m#### Version %s, suite %s: PASSED: %s\033[0m' % (version, suite['name'], reason)) pass_cnt += 1 else: print('\033[41m#### Version %s, suite %s: FAILED: %s\033[0m' % (version, suite['name'], reason)) print_test_report_summary('%s @ %s' % (suite['name'], version), report) fail_cnt += 1 print('#### Test output: %s/stderr.log' % (report['root_path'])) suite['version'][version] = report # Write test suite report JSON file if args.report is not None: test_suite_report_file = args.report f = open(test_suite_report_file, 'w') else: fd, test_suite_report_file = tempfile.mkstemp(prefix='test_suite_', suffix='.json', dir='.') f = os.fdopen(fd, 'w') full_report = {'suites': suites, 'pass_cnt': pass_cnt, 'fail_cnt': fail_cnt, 'total_cnt': pass_cnt + fail_cnt} f.write(json.dumps(full_report)) f.close() print('\n\n\n') print_report_summary(full_report) print('#### Full test suites report in: %s' % test_suite_report_file) if pass_cnt == 0 or fail_cnt > 0: sys.exit(1) else: sys.exit(0)