summaryrefslogtreecommitdiffstats
path: root/qa/tasks/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'qa/tasks/kafka.py')
-rw-r--r--qa/tasks/kafka.py204
1 files changed, 204 insertions, 0 deletions
diff --git a/qa/tasks/kafka.py b/qa/tasks/kafka.py
new file mode 100644
index 000000000..48bf3611f
--- /dev/null
+++ b/qa/tasks/kafka.py
@@ -0,0 +1,204 @@
+"""
+Deploy and configure Kafka for Teuthology
+"""
+import contextlib
+import logging
+import time
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.orchestra import run
+
+log = logging.getLogger(__name__)
+
+def get_kafka_version(config):
+ for client, client_config in config.items():
+ if 'kafka_version' in client_config:
+ kafka_version = client_config.get('kafka_version')
+ return kafka_version
+
+def get_kafka_dir(ctx, config):
+ kafka_version = get_kafka_version(config)
+ current_version = 'kafka-' + kafka_version + '-src'
+ return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
+
+
+@contextlib.contextmanager
+def install_kafka(ctx, config):
+ """
+ Downloading the kafka tar file.
+ """
+ assert isinstance(config, dict)
+ log.info('Installing Kafka...')
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ test_dir=teuthology.get_testdir(ctx)
+ current_version = get_kafka_version(config)
+
+ link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/kafka-' + current_version + '-src.tgz'
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
+ )
+
+ file1 = 'kafka-' + current_version + '-src.tgz'
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', file1],
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Removing packaged dependencies of Kafka...')
+ test_dir=get_kafka_dir(ctx, config)
+ current_version = get_kafka_version(config)
+ for (client,_) in config.items():
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', test_dir],
+ )
+
+ rmfile1 = 'kafka-' + current_version + '-src.tgz'
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=rmfile1)],
+ )
+
+
+@contextlib.contextmanager
+def run_kafka(ctx,config):
+ """
+ This includes two parts:
+ 1. Starting Zookeeper service
+ 2. Starting Kafka service
+ """
+ assert isinstance(config, dict)
+ log.info('Bringing up Zookeeper and Kafka services...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './gradlew', 'jar',
+ '-PscalaVersion=2.13.2'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './zookeeper-server-start.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-server-start.sh',
+ '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Stopping Zookeeper and Kafka Services...')
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-server-stop.sh',
+ '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './zookeeper-server-stop.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
+
+
+@contextlib.contextmanager
+def run_admin_cmds(ctx,config):
+ """
+ Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
+ """
+ assert isinstance(config, dict)
+ log.info('Checking kafka server through producer/consumer commands...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ 'echo', "First", run.Raw('|'),
+ './kafka-console-producer.sh', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-console-consumer.sh', '--topic', 'quickstart-events',
+ '--from-beginning',
+ '--bootstrap-server', 'localhost:9092',
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ pass
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+ """
+ Following is the way how to run kafka::
+ tasks:
+ - kafka:
+ client.0:
+ kafka_version: 2.6.0
+ """
+ assert config is None or isinstance(config, list) \
+ or isinstance(config, dict), \
+ "task kafka only supports a list or dictionary for configuration"
+
+ all_clients = ['client.{id}'.format(id=id_)
+ for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+ if config is None:
+ config = all_clients
+ if isinstance(config, list):
+ config = dict.fromkeys(config)
+
+ log.debug('Kafka config is %s', config)
+
+ with contextutil.nested(
+ lambda: install_kafka(ctx=ctx, config=config),
+ lambda: run_kafka(ctx=ctx, config=config),
+ lambda: run_admin_cmds(ctx=ctx, config=config),
+ ):
+ yield