import logging import json import tempfile import random import threading import subprocess import socket import time import os from http import server as http_server from random import randint from .tests import get_realm, \ ZonegroupConns, \ zonegroup_meta_checkpoint, \ zone_meta_checkpoint, \ zone_bucket_checkpoint, \ zone_data_checkpoint, \ zonegroup_bucket_checkpoint, \ check_bucket_eq, \ gen_bucket_name, \ get_user, \ get_tenant from .zone_ps import PSTopic, \ PSTopicS3, \ PSNotification, \ PSSubscription, \ PSNotificationS3, \ print_connection_info, \ delete_all_s3_topics, \ put_object_tagging, \ get_object_tagging, \ delete_all_objects from .multisite import User from nose import SkipTest from nose.tools import assert_not_equal, assert_equal import boto.s3.tagging # configure logging for the tests module log = logging.getLogger(__name__) skip_push_tests = True #################################### # utility functions for pubsub tests #################################### def set_contents_from_string(key, content): try: key.set_contents_from_string(content) except Exception as e: print('Error: ' + str(e)) # HTTP endpoint functions # multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/ class HTTPPostHandler(http_server.BaseHTTPRequestHandler): """HTTP POST hanler class storing the received events in its http server""" def do_POST(self): """implementation of POST handler""" try: content_length = int(self.headers['Content-Length']) body = self.rfile.read(content_length) log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body)) self.server.append(json.loads(body)) except: log.error('HTTP Server received empty event') self.send_response(400) else: if self.headers.get('Expect') == '100-continue': self.send_response(100) else: self.send_response(200) finally: if self.server.delay > 0: time.sleep(self.server.delay) self.end_headers() class HTTPServerWithEvents(http_server.HTTPServer): """HTTP server used by the handler to store events""" def __init__(self, addr, handler, worker_id, delay=0): http_server.HTTPServer.__init__(self, addr, handler, False) self.worker_id = worker_id self.events = [] self.delay = delay def append(self, event): self.events.append(event) class HTTPServerThread(threading.Thread): """thread for running the HTTP server. reusing the same socket for all threads""" def __init__(self, i, sock, addr, delay=0): threading.Thread.__init__(self) self.i = i self.daemon = True self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay) self.httpd.socket = sock # prevent the HTTP server from re-binding every handler self.httpd.server_bind = self.server_close = lambda self: None self.start() def run(self): try: log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address) self.httpd.serve_forever() log.info('HTTP Server (%d) ended', self.i) except Exception as error: # could happen if the server r/w to a closing socket during shutdown log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error)) def close(self): self.httpd.shutdown() def get_events(self): return self.httpd.events def reset_events(self): self.httpd.events = [] class StreamingHTTPServer: """multi-threaded http server class also holding list of events received into the handler each thread has its own server, and all servers share the same socket""" def __init__(self, host, port, num_workers=100, delay=0): addr = (host, port) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(addr) self.sock.listen(num_workers) self.workers = [HTTPServerThread(i, self.sock, addr, delay) for i in range(num_workers)] def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): """verify stored s3 records agains a list of keys""" events = [] for worker in self.workers: events += worker.get_events() worker.reset_events() verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) def verify_events(self, keys, exact_match=False, deletions=False): """verify stored events agains a list of keys""" events = [] for worker in self.workers: events += worker.get_events() worker.reset_events() verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions) def get_and_reset_events(self): events = [] for worker in self.workers: events += worker.get_events() worker.reset_events() return events def close(self): """close all workers in the http server and wait for it to finish""" # make sure that the shared socket is closed # this is needed in case that one of the threads is blocked on the socket self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() # wait for server threads to finish for worker in self.workers: worker.close() worker.join() # AMQP endpoint functions class AMQPReceiver(object): """class for receiving and storing messages on a topic from the AMQP broker""" def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None): import pika import ssl if ca_location: ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile=ca_location) ssl_options = pika.SSLOptions(ssl_context) rabbitmq_port = 5671 else: rabbitmq_port = 5672 ssl_options = None if external_endpoint_address: params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options) else: hostname = get_ip() params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options) remaining_retries = 10 while remaining_retries > 0: try: connection = pika.BlockingConnection(params) break except Exception as error: remaining_retries -= 1 print('failed to connect to rabbitmq (remaining retries ' + str(remaining_retries) + '): ' + str(error)) time.sleep(1) if remaining_retries == 0: raise Exception('failed to connect to rabbitmq - no retries left') self.channel = connection.channel() self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) result = self.channel.queue_declare('', exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic) self.channel.basic_consume(queue=queue_name, on_message_callback=self.on_message, auto_ack=True) self.events = [] self.topic = topic def on_message(self, ch, method, properties, body): """callback invoked when a new message arrive on the topic""" log.info('AMQP received event for topic %s:\n %s', self.topic, body) self.events.append(json.loads(body)) # TODO create a base class for the AMQP and HTTP cases def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): """verify stored s3 records agains a list of keys""" verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) self.events = [] def verify_events(self, keys, exact_match=False, deletions=False): """verify stored events agains a list of keys""" verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) self.events = [] def get_and_reset_events(self): tmp = self.events self.events = [] return tmp def amqp_receiver_thread_runner(receiver): """main thread function for the amqp receiver""" try: log.info('AMQP receiver started') receiver.channel.start_consuming() log.info('AMQP receiver ended') except Exception as error: log.info('AMQP receiver ended unexpectedly: %s', str(error)) def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None): """create amqp receiver and thread""" receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location) task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver def stop_amqp_receiver(receiver, task): """stop the receiver thread and wait for it to finis""" try: receiver.channel.stop_consuming() log.info('stopping AMQP receiver') except Exception as error: log.info('failed to gracefuly stop AMQP receiver: %s', str(error)) task.join(5) def check_ps_configured(): """check if at least one pubsub zone exist""" realm = get_realm() zonegroup = realm.master_zonegroup() ps_zones = zonegroup.zones_by_type.get("pubsub") if not ps_zones: raise SkipTest("Requires at least one PS zone") def is_ps_zone(zone_conn): """check if a specific zone is pubsub zone""" if not zone_conn: return False return zone_conn.zone.tier_type() == "pubsub" def verify_events_by_elements(events, keys, exact_match=False, deletions=False): """ verify there is at least one event per element """ err = '' for key in keys: key_found = False if type(events) is list: for event_list in events: if key_found: break for event in event_list['events']: if event['info']['bucket']['name'] == key.bucket.name and \ event['info']['key']['name'] == key.name: if deletions and event['event'] == 'OBJECT_DELETE': key_found = True break elif not deletions and event['event'] == 'OBJECT_CREATE': key_found = True break else: for event in events['events']: if event['info']['bucket']['name'] == key.bucket.name and \ event['info']['key']['name'] == key.name: if deletions and event['event'] == 'OBJECT_DELETE': key_found = True break elif not deletions and event['event'] == 'OBJECT_CREATE': key_found = True break if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) log.error(events) assert False, err if not len(events) == len(keys): err = 'superfluous events are found' log.debug(err) if exact_match: log.error(events) assert False, err def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}): """ verify there is at least one record per element """ err = '' for key in keys: key_found = False object_size = 0 if type(records) is list: for record_list in records: if key_found: break for record in record_list['Records']: if record['s3']['bucket']['name'] == key.bucket.name and \ record['s3']['object']['key'] == key.name: if deletions and record['eventName'].startswith('ObjectRemoved'): key_found = True object_size = record['s3']['object']['size'] break elif not deletions and record['eventName'].startswith('ObjectCreated'): key_found = True object_size = record['s3']['object']['size'] break else: for record in records['Records']: if record['s3']['bucket']['name'] == key.bucket.name and \ record['s3']['object']['key'] == key.name: if deletions and record['eventName'].startswith('ObjectRemoved'): key_found = True object_size = record['s3']['object']['size'] break elif not deletions and record['eventName'].startswith('ObjectCreated'): key_found = True object_size = record['s3']['object']['size'] break if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) assert False, err elif expected_sizes: assert_equal(object_size, expected_sizes.get(key.name)) if not len(records) == len(keys): err = 'superfluous records are found' log.warning(err) if exact_match: for record_list in records: for record in record_list['Records']: log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) assert False, err def init_rabbitmq(): """ start a rabbitmq broker """ hostname = get_ip() #port = str(random.randint(20000, 30000)) #data_dir = './' + port + '_data' #log_dir = './' + port + '_log' #print('') #try: # os.mkdir(data_dir) # os.mkdir(log_dir) #except: # print('rabbitmq directories already exists') #env = {'RABBITMQ_NODE_PORT': port, # 'RABBITMQ_NODENAME': 'rabbit'+ port + '@' + hostname, # 'RABBITMQ_USE_LONGNAME': 'true', # 'RABBITMQ_MNESIA_BASE': data_dir, # 'RABBITMQ_LOG_BASE': log_dir} # TODO: support multiple brokers per host using env # make sure we don't collide with the default try: proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server']) except Exception as error: log.info('failed to execute rabbitmq-server: %s', str(error)) print('failed to execute rabbitmq-server: %s' % str(error)) return None # TODO add rabbitmq checkpoint instead of sleep time.sleep(5) return proc #, port, data_dir, log_dir def clean_rabbitmq(proc): #, data_dir, log_dir) """ stop the rabbitmq broker """ try: subprocess.call(['sudo', 'rabbitmqctl', 'stop']) time.sleep(5) proc.terminate() except: log.info('rabbitmq server already terminated') # TODO: add directory cleanup once multiple brokers are supported #try: # os.rmdir(data_dir) # os.rmdir(log_dir) #except: # log.info('rabbitmq directories already removed') # Kafka endpoint functions kafka_server = 'localhost' class KafkaReceiver(object): """class for receiving and storing messages on a topic from the kafka broker""" def __init__(self, topic, security_type): from kafka import KafkaConsumer remaining_retries = 10 port = 9092 if security_type != 'PLAINTEXT': security_type = 'SSL' port = 9093 while remaining_retries > 0: try: self.consumer = KafkaConsumer(topic, bootstrap_servers = kafka_server+':'+str(port), security_protocol=security_type) print('Kafka consumer created on topic: '+topic) break except Exception as error: remaining_retries -= 1 print('failed to connect to kafka (remaining retries ' + str(remaining_retries) + '): ' + str(error)) time.sleep(1) if remaining_retries == 0: raise Exception('failed to connect to kafka - no retries left') self.events = [] self.topic = topic self.stop = False def verify_s3_events(self, keys, exact_match=False, deletions=False): """verify stored s3 records agains a list of keys""" verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions) self.events = [] def kafka_receiver_thread_runner(receiver): """main thread function for the kafka receiver""" try: log.info('Kafka receiver started') print('Kafka receiver started') while not receiver.stop: for msg in receiver.consumer: receiver.events.append(json.loads(msg.value)) timer.sleep(0.1) log.info('Kafka receiver ended') print('Kafka receiver ended') except Exception as error: log.info('Kafka receiver ended unexpectedly: %s', str(error)) print('Kafka receiver ended unexpectedly: ' + str(error)) def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'): """create kafka receiver and thread""" receiver = KafkaReceiver(topic, security_type) task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,)) task.daemon = True return task, receiver def stop_kafka_receiver(receiver, task): """stop the receiver thread and wait for it to finis""" receiver.stop = True task.join(1) try: receiver.consumer.close() except Exception as error: log.info('failed to gracefuly stop Kafka receiver: %s', str(error)) # follow the instruction here to create and sign a broker certificate: # https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka # the generated broker certificate should be stored in the java keystore for the use of the server # assuming the jks files were copied to $KAFKA_DIR and broker name is "localhost" # following lines must be added to $KAFKA_DIR/config/server.properties # listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094 # sasl.enabled.mechanisms=PLAIN # ssl.keystore.location = $KAFKA_DIR/server.keystore.jks # ssl.keystore.password = abcdefgh # ssl.key.password = abcdefgh # ssl.truststore.location = $KAFKA_DIR/server.truststore.jks # ssl.truststore.password = abcdefgh # notes: # (1) we dont test client authentication, hence, no need to generate client keys # (2) our client is not using the keystore, and the "rootCA.crt" file generated in the process above # should be copied to: $KAFKA_DIR def init_kafka(): """ start kafka/zookeeper """ try: KAFKA_DIR = os.environ['KAFKA_DIR'] except: KAFKA_DIR = '' if KAFKA_DIR == '': log.info('KAFKA_DIR must be set to where kafka is installed') print('KAFKA_DIR must be set to where kafka is installed') return None, None, None DEVNULL = open(os.devnull, 'wb') print('\nStarting zookeeper...') try: zk_proc = subprocess.Popen([KAFKA_DIR+'bin/zookeeper-server-start.sh', KAFKA_DIR+'config/zookeeper.properties'], stdout=DEVNULL) except Exception as error: log.info('failed to execute zookeeper: %s', str(error)) print('failed to execute zookeeper: %s' % str(error)) return None, None, None time.sleep(5) if zk_proc.poll() is not None: print('zookeeper failed to start') return None, None, None print('Zookeeper started') print('Starting kafka...') kafka_log = open('./kafka.log', 'w') try: kafka_env = os.environ.copy() kafka_env['KAFKA_OPTS']='-Djava.security.auth.login.config='+KAFKA_DIR+'config/kafka_server_jaas.conf' kafka_proc = subprocess.Popen([ KAFKA_DIR+'bin/kafka-server-start.sh', KAFKA_DIR+'config/server.properties'], stdout=kafka_log, env=kafka_env) except Exception as error: log.info('failed to execute kafka: %s', str(error)) print('failed to execute kafka: %s' % str(error)) zk_proc.terminate() kafka_log.close() return None, None, None # TODO add kafka checkpoint instead of sleep time.sleep(15) if kafka_proc.poll() is not None: zk_proc.terminate() print('kafka failed to start. details in: ./kafka.log') kafka_log.close() return None, None, None print('Kafka started') return kafka_proc, zk_proc, kafka_log def clean_kafka(kafka_proc, zk_proc, kafka_log): """ stop kafka/zookeeper """ try: kafka_log.close() print('Shutdown Kafka...') kafka_proc.terminate() time.sleep(5) if kafka_proc.poll() is None: print('Failed to shutdown Kafka... killing') kafka_proc.kill() print('Shutdown zookeeper...') zk_proc.terminate() time.sleep(5) if zk_proc.poll() is None: print('Failed to shutdown zookeeper... killing') zk_proc.kill() except: log.info('kafka/zookeeper already terminated') def init_env(require_ps=True): """initialize the environment""" if require_ps: check_ps_configured() realm = get_realm() zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) zonegroup_meta_checkpoint(zonegroup) ps_zone = None master_zone = None for conn in zonegroup_conns.zones: if conn.zone == zonegroup.master_zone: master_zone = conn if is_ps_zone(conn): zone_meta_checkpoint(conn.zone) ps_zone = conn assert_not_equal(master_zone, None) if require_ps: assert_not_equal(ps_zone, None) return master_zone, ps_zone def get_ip(): """ This method returns the "primary" IP on the local box (the one with a default route) source: https://stackoverflow.com/a/28950776/711085 this is needed because on the teuthology machines: socket.getfqdn()/socket.gethostname() return 127.0.0.1 """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # address should not be reachable s.connect(('10.255.255.255', 1)) ip = s.getsockname()[0] finally: s.close() return ip TOPIC_SUFFIX = "_topic" SUB_SUFFIX = "_sub" NOTIFICATION_SUFFIX = "_notif" ############## # pubsub tests ############## def test_ps_info(): """ log information for manual testing """ return SkipTest("only used in manual testing") master_zone, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') print('Zonegroup: ' + zonegroup.name) print('user: ' + get_user()) print('tenant: ' + get_tenant()) print('Master Zone') print_connection_info(master_zone.conn) print('PubSub Zone') print_connection_info(ps_zone.conn) print('Bucket: ' + bucket_name) def test_ps_s3_notification_low_level(): """ test low level implementation of s3 notifications """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create topic topic_name = bucket_name + TOPIC_SUFFIX topic_conf = PSTopic(ps_zone.conn, topic_name) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX generated_topic_name = notification_name+'_'+topic_name topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) zone_meta_checkpoint(ps_zone.zone) # get auto-generated topic generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name) result, status = generated_topic_conf.get_config() parsed_result = json.loads(result) assert_equal(status/100, 2) assert_equal(parsed_result['topic']['name'], generated_topic_name) # get auto-generated notification notification_conf = PSNotification(ps_zone.conn, bucket_name, generated_topic_name) result, status = notification_conf.get_config() parsed_result = json.loads(result) assert_equal(status/100, 2) assert_equal(len(parsed_result['topics']), 1) # get auto-generated subscription sub_conf = PSSubscription(ps_zone.conn, notification_name, generated_topic_name) result, status = sub_conf.get_config() parsed_result = json.loads(result) assert_equal(status/100, 2) assert_equal(parsed_result['topic'], generated_topic_name) # delete s3 notification _, status = s3_notification_conf.del_config(notification=notification_name) assert_equal(status/100, 2) # delete topic _, status = topic_conf.del_config() assert_equal(status/100, 2) # verify low-level cleanup _, status = generated_topic_conf.get_config() assert_equal(status, 404) result, status = notification_conf.get_config() parsed_result = json.loads(result) assert_equal(len(parsed_result['topics']), 0) # TODO should return 404 # assert_equal(status, 404) result, status = sub_conf.get_config() parsed_result = json.loads(result) assert_equal(parsed_result['topic'], '') # TODO should return 404 # assert_equal(status, 404) # cleanup topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) def test_ps_s3_notification_records(): """ test s3 records fetching """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create topic topic_name = bucket_name + TOPIC_SUFFIX topic_conf = PSTopic(ps_zone.conn, topic_name) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) zone_meta_checkpoint(ps_zone.zone) # get auto-generated subscription sub_conf = PSSubscription(ps_zone.conn, notification_name, topic_name) _, status = sub_conf.get_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the events from the subscription result, _ = sub_conf.get_events() records = json.loads(result) for record in records['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match verify_s3_records_by_elements(records, keys, exact_match=False) # cleanup _, status = s3_notification_conf.del_config() topic_conf.del_config() # delete the keys for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) def test_ps_s3_notification(): """ test s3 notification set/get/delete """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) topic_name = bucket_name + TOPIC_SUFFIX # create topic topic_name = bucket_name + TOPIC_SUFFIX topic_conf = PSTopic(ps_zone.conn, topic_name) response, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(response) topic_arn = parsed_result['arn'] # create one s3 notification notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' topic_conf_list = [{'Id': notification_name1, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) # create another s3 notification with the same topic notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' topic_conf_list = [{'Id': notification_name2, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] }] s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) zone_meta_checkpoint(ps_zone.zone) # get all notification on a bucket response, status = s3_notification_conf1.get_config() assert_equal(status/100, 2) assert_equal(len(response['TopicConfigurations']), 2) assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn) assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn) # get specific notification on a bucket response, status = s3_notification_conf1.get_config(notification=notification_name1) assert_equal(status/100, 2) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name1) response, status = s3_notification_conf2.get_config(notification=notification_name2) assert_equal(status/100, 2) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Id'], notification_name2) # delete specific notifications _, status = s3_notification_conf1.del_config(notification=notification_name1) assert_equal(status/100, 2) _, status = s3_notification_conf2.del_config(notification=notification_name2) assert_equal(status/100, 2) # cleanup topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) def test_ps_s3_topic_on_master(): """ test s3 topics set/get/delete on master """ master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX # clean all topics delete_all_s3_topics(master_zone, zonegroup.name) # create s3 topics endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf1.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1') endpoint_address = 'http://127.0.0.1:9001' endpoint_args = 'push-endpoint='+endpoint_address topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf2.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2') endpoint_address = 'http://127.0.0.1:9002' endpoint_args = 'push-endpoint='+endpoint_address topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf3.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3') # get topic 3 result, status = topic_conf3.get_config() assert_equal(status, 200) assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) # Note that endpoint args may be ordered differently in the result result = topic_conf3.get_attributes() assert_equal(topic_arn, result['Attributes']['TopicArn']) json_endpoint = json.loads(result['Attributes']['EndPoint']) assert_equal(endpoint_address, json_endpoint['EndpointAddress']) # delete topic 1 result = topic_conf1.del_config() assert_equal(status, 200) # try to get a deleted topic _, status = topic_conf1.get_config() assert_equal(status, 404) try: topic_conf1.get_attributes() except: print('topic already deleted - this is expected') else: assert False, 'topic 1 should be deleted at this point' # get the remaining 2 topics result, status = topic_conf1.get_list() assert_equal(status, 200) assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2) # delete topics result = topic_conf2.del_config() # TODO: should be 200OK # assert_equal(status, 200) result = topic_conf3.del_config() # TODO: should be 200OK # assert_equal(status, 200) # get topic list, make sure it is empty result, status = topic_conf1.get_list() assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None) def test_ps_s3_topic_with_secret_on_master(): """ test s3 topics with secret set/get/delete on master """ master_zone, _ = init_env(require_ps=False) if master_zone.secure_conn is None: return SkipTest('secure connection is needed to test topic with secrets') realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX # clean all topics delete_all_s3_topics(master_zone, zonegroup.name) # create s3 topics endpoint_address = 'amqp://user:password@127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) try: result = bad_topic_conf.set_config() except Exception as err: print('Error is expected: ' + str(err)) else: assert False, 'user password configuration set allowed only over HTTPS' topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name) _, status = bad_topic_conf.get_config() assert_equal(status/100, 4) # get topic result, status = topic_conf.get_config() assert_equal(status, 200) assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn']) assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress']) _, status = bad_topic_conf.get_config() assert_equal(status/100, 4) _, status = topic_conf.get_list() assert_equal(status/100, 2) # delete topics result = topic_conf.del_config() def test_ps_s3_notification_on_master(): """ test s3 notification set/get/delete on master """ master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() # create bucket bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] }, {'Id': notification_name+'_2', 'TopicArn': topic_arn1, 'Events': ['s3:ObjectRemoved:*'] }, {'Id': notification_name+'_3', 'TopicArn': topic_arn1, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # get notifications on a bucket response, status = s3_notification_conf.get_config(notification=notification_name+'_1') assert_equal(status/100, 2) assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn1) # delete specific notifications _, status = s3_notification_conf.del_config(notification=notification_name+'_1') assert_equal(status/100, 2) # get the remaining 2 notifications on a bucket response, status = s3_notification_conf.get_config() assert_equal(status/100, 2) assert_equal(len(response['TopicConfigurations']), 2) assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn1) assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn1) # delete remaining notifications _, status = s3_notification_conf.del_config() assert_equal(status/100, 2) # make sure that the notifications are now deleted response, status = s3_notification_conf.get_config() try: dummy = response['TopicConfigurations'] except: print('"TopicConfigurations" is not in response') else: assert False, '"TopicConfigurations" should not be in response' # create another s3 notification topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] }] _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # make sure the notification and auto-genrated topic are deleted response, status = topic_conf1.get_list() topics = response['ListTopicsResponse']['ListTopicsResult']['Topics']['member'] before_delete = len(topics) # delete the bucket master_zone.delete_bucket(bucket_name) response, status = topic_conf2.get_list() topics = response['ListTopicsResponse']['ListTopicsResult']['Topics']['member'] after_delete = len(topics) assert_equal(before_delete - after_delete, 3) # cleanup topic_conf1.del_config() topic_conf2.del_config() def ps_s3_notification_filter(on_master): """ test s3 notification filter on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') if on_master: master_zone, _ = init_env(require_ps=False) ps_zone = master_zone else: master_zone, ps_zone = init_env(require_ps=True) ps_zone = ps_zone realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receivers exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' if on_master: topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() else: topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) result, _ = topic_conf.set_config() parsed_result = json.loads(result) topic_arn = parsed_result['arn'] zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'], 'Filter': { 'Key': { 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}] } } }, {'Id': notification_name+'_2', 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'], 'Filter': { 'Key': { 'FilterRules': [{'Name': 'prefix', 'Value': 'world'}, {'Name': 'suffix', 'Value': 'log'}] } } }, {'Id': notification_name+'_3', 'TopicArn': topic_arn, 'Events': [], 'Filter': { 'Key': { 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}] } } }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) result, status = s3_notification_conf.set_config() assert_equal(status/100, 2) if on_master: topic_conf_list = [{'Id': notification_name+'_4', 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], 'Filter': { 'Metadata': { 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'}, {'Name': 'x-amz-meta-hello', 'Value': 'world'}] }, 'Key': { 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}] } } }] try: s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf4.set_config() assert_equal(status/100, 2) skip_notif4 = False except Exception as error: print('note: metadata filter is not supported by boto3 - skipping test') skip_notif4 = True else: print('filtering by attributes only supported on master zone') skip_notif4 = True # get all notifications result, status = s3_notification_conf.get_config() assert_equal(status/100, 2) for conf in result['TopicConfigurations']: filter_name = conf['Filter']['Key']['FilterRules'][0]['Name'] assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name if not skip_notif4: result, status = s3_notification_conf4.get_config(notification=notification_name+'_4') assert_equal(status/100, 2) filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name'] assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello' expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello'] expected_in2 = ['world1.log', 'world2log', 'world3.log'] expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt'] expected_in4 = ['foo', 'bar', 'hello', 'world'] filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt'] filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld'] # create objects in bucket for key_name in expected_in1: key = bucket.new_key(key_name) key.set_contents_from_string('bar') for key_name in expected_in2: key = bucket.new_key(key_name) key.set_contents_from_string('bar') for key_name in expected_in3: key = bucket.new_key(key_name) key.set_contents_from_string('bar') if not skip_notif4: for key_name in expected_in4: key = bucket.new_key(key_name) key.set_metadata('foo', 'bar') key.set_metadata('hello', 'world') key.set_metadata('goodbye', 'cruel world') key.set_contents_from_string('bar') for key_name in filtered: key = bucket.new_key(key_name) key.set_contents_from_string('bar') for key_name in filtered_with_attr: key.set_metadata('foo', 'nobar') key.set_metadata('hello', 'noworld') key.set_metadata('goodbye', 'cruel world') key = bucket.new_key(key_name) key.set_contents_from_string('bar') if on_master: print('wait for 5sec for the messages...') time.sleep(5) else: zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) found_in1 = [] found_in2 = [] found_in3 = [] found_in4 = [] for event in receiver.get_and_reset_events(): notif_id = event['Records'][0]['s3']['configurationId'] key_name = event['Records'][0]['s3']['object']['key'] if notif_id == notification_name+'_1': found_in1.append(key_name) elif notif_id == notification_name+'_2': found_in2.append(key_name) elif notif_id == notification_name+'_3': found_in3.append(key_name) elif not skip_notif4 and notif_id == notification_name+'_4': found_in4.append(key_name) else: assert False, 'invalid notification: ' + notif_id assert_equal(set(found_in1), set(expected_in1)) assert_equal(set(found_in2), set(expected_in2)) assert_equal(set(found_in3), set(expected_in3)) if not skip_notif4: assert_equal(set(found_in4), set(expected_in4)) # cleanup s3_notification_conf.del_config() if not skip_notif4: s3_notification_conf4.del_config() topic_conf.del_config() # delete the bucket for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) stop_amqp_receiver(receiver, task) clean_rabbitmq(proc) def test_ps_s3_notification_filter_on_master(): ps_s3_notification_filter(on_master=True) def test_ps_s3_notification_filter(): ps_s3_notification_filter(on_master=False) def test_ps_s3_notification_errors_on_master(): """ test s3 notification set/get/delete on master """ master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() # create bucket bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification with invalid event name notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Kaboom'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: result, status = s3_notification_conf.set_config() except Exception as error: print(str(error) + ' - is expected') else: assert False, 'invalid event name is expected to fail' # create s3 notification with missing name topic_conf_list = [{'Id': '', 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: print(str(error) + ' - is expected') else: assert False, 'missing notification name is expected to fail' # create s3 notification with invalid topic ARN invalid_topic_arn = 'kaboom' topic_conf_list = [{'Id': notification_name, 'TopicArn': invalid_topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: print(str(error) + ' - is expected') else: assert False, 'invalid ARN is expected to fail' # create s3 notification with unknown topic ARN invalid_topic_arn = 'arn:aws:sns:a::kaboom' topic_conf_list = [{'Id': notification_name, 'TopicArn': invalid_topic_arn , 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: print(str(error) + ' - is expected') else: assert False, 'unknown topic is expected to fail' # create s3 notification with wrong bucket topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: print(str(error) + ' - is expected') else: assert False, 'unknown bucket is expected to fail' topic_conf.del_config() status = topic_conf.del_config() # deleting an unknown notification is not considered an error assert_equal(status, 200) _, status = topic_conf.get_config() assert_equal(status, 404) # cleanup # delete the bucket master_zone.delete_bucket(bucket_name) def test_objcet_timing(): return SkipTest("only used in manual testing") master_zone, _ = init_env(require_ps=False) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) # create objects in the bucket (async) print('creating objects...') number_of_objects = 1000 client_threads = [] start_time = time.time() content = str(bytearray(os.urandom(1024*1024))) for i in range(number_of_objects): key = bucket.new_key(str(i)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('total number of objects: ' + str(len(list(bucket.list())))) print('deleting objects...') client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # cleanup master_zone.delete_bucket(bucket_name) def test_ps_s3_notification_push_amqp_on_master(): """ test pushing amqp s3 notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' topic_name2 = bucket_name + TOPIC_SUFFIX + '_2' # start amqp receivers exchange = 'ex1' task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1) task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2) task1.start() task2.start() # create two s3 topic endpoint_address = 'amqp://' + hostname # with acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() # without acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, 'Events': [] }, {'Id': notification_name+'_2', 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket (async) number_of_objects = 100 client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver keys = list(bucket.list()) print('total number of objects: ' + str(len(keys))) receiver1.verify_s3_events(keys, exact_match=True) receiver2.verify_s3_events(keys, exact_match=True) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver 1 for deletions receiver1.verify_s3_events(keys, exact_match=True, deletions=True) # check amqp receiver 2 has no deletions try: receiver1.verify_s3_events(keys, exact_match=False, deletions=True) except: pass else: err = 'amqp receiver 2 should have no deletions' assert False, err # cleanup stop_amqp_receiver(receiver1, task1) stop_amqp_receiver(receiver2, task2) s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_s3_persistent_cleanup(): """ test reservation cleanup after gateway crash """ return SkipTest("only used in manual testing") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 200 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) gw = master_zone # create bucket bucket_name = gen_bucket_name() bucket = gw.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf = PSTopicS3(gw.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf = PSNotificationS3(gw.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) # stop gateway while clients are sending os.system("killall -9 radosgw"); zonegroup.master_zone.gateways[0].stop() print('wait for 10 sec for before restarting the gateway') time.sleep(10) zonegroup.master_zone.gateways[0].start() [thr.join() for thr in client_threads] keys = list(bucket.list()) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] # check http receiver events = http_server.get_and_reset_events() print(str(len(events) ) + " events found out of " + str(number_of_objects)) # make sure that things are working now client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] keys = list(bucket.list()) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] print('wait for 180 sec for reservations to be stale before queue deletion') time.sleep(180) # check http receiver events = http_server.get_and_reset_events() print(str(len(events)) + " events found out of " + str(number_of_objects)) # cleanup s3_notification_conf.del_config() topic_conf.del_config() gw.delete_bucket(bucket_name) http_server.close() def test_ps_s3_persistent_gateways_recovery(): """ test gateway recovery of persistent notifications """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() if len(zonegroup.master_zone.gateways) < 2: return SkipTest("this test requires two gateways") # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) gw1 = master_zone gw2 = zonegroup.master_zone.gateways[1] # create bucket bucket_name = gen_bucket_name() bucket = gw1.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create two s3 topics endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf1 = PSTopicS3(gw1.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData=fromgw1') topic_arn1 = topic_conf1.set_config() topic_conf2 = PSTopicS3(gw2.connection, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData=fromgw2') topic_arn2 = topic_conf2.set_config() # create two s3 notifications notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:Put'] }] s3_notification_conf1 = PSNotificationS3(gw1.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, 'Events': ['s3:ObjectRemoved:Delete'] }] s3_notification_conf2 = PSNotificationS3(gw2.connection, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) # stop gateway 2 print('stopping gateway2...') gw2.stop() client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] keys = list(bucket.list()) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] print('wait for 60 sec for before restarting the gateway') time.sleep(60) gw2.start() # check http receiver events = http_server.get_and_reset_events() for key in keys: creations = 0 deletions = 0 for event in events: if event['Records'][0]['eventName'] == 's3:ObjectCreated:Put' and \ key.name == event['Records'][0]['s3']['object']['key']: creations += 1 elif event['Records'][0]['eventName'] == 's3:ObjectRemoved:Delete' and \ key.name == event['Records'][0]['s3']['object']['key']: deletions += 1 assert_equal(creations, 1) assert_equal(deletions, 1) # cleanup s3_notification_conf1.del_config() topic_conf1.del_config() gw1.delete_bucket(bucket_name) time.sleep(10) s3_notification_conf2.del_config() topic_conf2.del_config() http_server.close() def test_ps_s3_persistent_multiple_gateways(): """ test pushing persistent notification via two gateways """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() if len(zonegroup.master_zone.gateways) < 2: return SkipTest("this test requires two gateways") # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) gw1 = master_zone gw2 = zonegroup.master_zone.gateways[1] # create bucket bucket_name = gen_bucket_name() bucket1 = gw1.create_bucket(bucket_name) bucket2 = gw2.connection.get_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create two s3 topics endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic1_opaque = 'fromgw1' topic_conf1 = PSTopicS3(gw1.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque) topic_arn1 = topic_conf1.set_config() topic2_opaque = 'fromgw2' topic_conf2 = PSTopicS3(gw2.connection, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque) topic_arn2 = topic_conf2.set_config() # create two s3 notifications notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': [] }] s3_notification_conf1 = PSNotificationS3(gw1.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, 'Events': [] }] s3_notification_conf2 = PSNotificationS3(gw2.connection, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket1.new_key('gw1_'+str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) key = bucket2.new_key('gw2_'+str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] keys = list(bucket1.list()) delay = 30 print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) events = http_server.get_and_reset_events() for key in keys: topic1_count = 0 topic2_count = 0 for event in events: if event['Records'][0]['eventName'] == 's3:ObjectCreated:Put' and \ key.name == event['Records'][0]['s3']['object']['key'] and \ topic1_opaque == event['Records'][0]['opaqueData']: topic1_count += 1 elif event['Records'][0]['eventName'] == 's3:ObjectCreated:Put' and \ key.name == event['Records'][0]['s3']['object']['key'] and \ topic2_opaque == event['Records'][0]['opaqueData']: topic2_count += 1 assert_equal(topic1_count, 1) assert_equal(topic2_count, 1) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket1.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) events = http_server.get_and_reset_events() for key in keys: topic1_count = 0 topic2_count = 0 for event in events: if event['Records'][0]['eventName'] == 's3:ObjectRemoved:Delete' and \ key.name == event['Records'][0]['s3']['object']['key'] and \ topic1_opaque == event['Records'][0]['opaqueData']: topic1_count += 1 elif event['Records'][0]['eventName'] == 's3:ObjectRemoved:Delete' and \ key.name == event['Records'][0]['s3']['object']['key'] and \ topic2_opaque == event['Records'][0]['opaqueData']: topic2_count += 1 assert_equal(topic1_count, 1) assert_equal(topic2_count, 1) # cleanup s3_notification_conf1.del_config() topic_conf1.del_config() s3_notification_conf2.del_config() topic_conf2.del_config() gw1.delete_bucket(bucket_name) http_server.close() def test_ps_s3_persistent_multiple_endpoints(): """ test pushing persistent notification when one of the endpoints has error """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create two s3 topics endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() endpoint_address = 'http://kaboom:9999' endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create two s3 notifications notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': [] }] s3_notification_conf1 = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2' topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, 'Events': [] }] s3_notification_conf2 = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] keys = list(bucket.list()) delay = 30 print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) http_server.verify_s3_events(keys, exact_match=False, deletions=False) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) http_server.verify_s3_events(keys, exact_match=False, deletions=True) # cleanup s3_notification_conf1.del_config() topic_conf1.del_config() s3_notification_conf2.del_config() topic_conf2.del_config() master_zone.delete_bucket(bucket_name) http_server.close() def persistent_notification(endpoint_type): """ test pushing persistent notification """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX receiver = {} host = get_ip() if endpoint_type == 'http': # create random port for the http server port = random.randint(10000, 20000) # start an http server in a separate thread receiver = StreamingHTTPServer(host, port, num_workers=10) endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' # the http server does not guarantee order, so duplicates are expected exact_match = False elif endpoint_type == 'amqp': proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() endpoint_address = 'amqp://' + host endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true' # amqp broker guarantee ordering exact_match = True else: return SkipTest('Unknown endpoint type: ' + endpoint_type) # create s3 topic topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket (async) number_of_objects = 100 client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') keys = list(bucket.list()) delay = 40 print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True) # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) if endpoint_type == 'http': receiver.close() else: stop_amqp_receiver(receiver, task) clean_rabbitmq(proc) def test_ps_s3_persistent_notification_http(): """ test pushing persistent notification http """ persistent_notification('http') def test_ps_s3_persistent_notification_amqp(): """ test pushing persistent notification amqp """ persistent_notification('amqp') def random_string(length): import string letters = string.ascii_letters return ''.join(random.choice(letters) for i in range(length)) def test_ps_s3_persistent_notification_large(): """ test pushing persistent notification of large notifications """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX receiver = {} host = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() endpoint_address = 'amqp://' + host opaque_data = random_string(1024*2) endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true' # amqp broker guarantee ordering exact_match = True # create s3 topic topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket (async) number_of_objects = 100 client_threads = [] start_time = time.time() for i in range(number_of_objects): key_value = random_string(63) key = bucket.new_key(key_value) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') keys = list(bucket.list()) delay = 40 print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True) # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) stop_amqp_receiver(receiver, task) clean_rabbitmq(proc) def test_ps_s3_persistent_notification_pushback(): """ test pushing persistent notification pushback """ return SkipTest("only used in manual testing") master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread http_server = StreamingHTTPServer(host, port, num_workers=10, delay=0.5) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket (async) for j in range(100): number_of_objects = randint(500, 1000) client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(j)+'-'+str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') keys = list(bucket.list()) delay = 30 print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) # delete objects from the bucket client_threads = [] start_time = time.time() count = 0 for key in bucket.list(): count += 1 thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) if count%100 == 0: [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') client_threads = [] start_time = time.time() print('wait for '+str(delay)+'sec for the messages...') time.sleep(delay) # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) time.sleep(delay) http_server.close() def test_ps_s3_notification_push_kafka(): """ test pushing kafka s3 notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") kafka_proc, zk_proc, kafka_log = init_kafka() if kafka_proc is None or zk_proc is None: return SkipTest('end2end kafka tests require kafka/zookeeper installed') master_zone, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic task, receiver = create_kafka_receiver_thread(topic_name) task.start() # create topic topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='kafka://' + kafka_server, endpoint_args='kafka-ack-level=broker') result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket (async) number_of_objects = 10 client_threads = [] for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) receiver.verify_s3_events(keys, exact_match=True) # delete objects from the bucket client_threads = [] for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) receiver.verify_s3_events(keys, exact_match=True, deletions=True) # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) stop_kafka_receiver(receiver, task) clean_kafka(kafka_proc, zk_proc, kafka_log) def test_ps_s3_notification_push_kafka_on_master(): """ test pushing kafka s3 notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") kafka_proc, zk_proc, kafka_log = init_kafka() if kafka_proc is None or zk_proc is None: return SkipTest('end2end kafka tests require kafka/zookeeper installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic task, receiver = create_kafka_receiver_thread(topic_name+'_1') task.start() # create s3 topic endpoint_address = 'kafka://' + kafka_server # without acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1, 'Events': [] }, {'Id': notification_name + '_2', 'TopicArn': topic_arn2, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket (async) number_of_objects = 10 client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) keys = list(bucket.list()) receiver.verify_s3_events(keys, exact_match=True) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) receiver.verify_s3_events(keys, exact_match=True, deletions=True) # cleanup s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) stop_kafka_receiver(receiver, task) clean_kafka(kafka_proc, zk_proc, kafka_log) def kafka_security(security_type): """ test pushing kafka s3 notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, _ = init_env(require_ps=False) if security_type == 'SSL_SASL' and master_zone.secure_conn is None: return SkipTest("secure connection is needed to test SASL_SSL security") kafka_proc, zk_proc, kafka_log = init_kafka() if kafka_proc is None or zk_proc is None: return SkipTest('end2end kafka tests require kafka/zookeeper installed') realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic task, receiver = create_kafka_receiver_thread(topic_name) task.start() # create s3 topic if security_type == 'SSL_SASL': endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' else: # ssl only endpoint_address = 'kafka://' + kafka_server + ':9093' KAFKA_DIR = os.environ['KAFKA_DIR'] # without acks from broker, with root CA endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt' if security_type == 'SSL_SASL': topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) else: topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) s3_notification_conf.set_config() # create objects in the bucket (async) number_of_objects = 10 client_threads = [] start_time = time.time() for i in range(number_of_objects): key = bucket.new_key(str(i)) content = str(os.urandom(1024*1024)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') try: print('wait for 5sec for the messages...') time.sleep(5) keys = list(bucket.list()) receiver.verify_s3_events(keys, exact_match=True) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) receiver.verify_s3_events(keys, exact_match=True, deletions=True) except Exception as err: assert False, str(err) finally: # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) stop_kafka_receiver(receiver, task) clean_kafka(kafka_proc, zk_proc, kafka_log) def test_ps_s3_notification_push_kafka_security_ssl(): kafka_security('SSL') def test_ps_s3_notification_push_kafka_security_ssl_sasl(): kafka_security('SSL_SASL') def test_ps_s3_notification_multi_delete_on_master(): """ test deletion of multiple keys on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:*'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket client_threads = [] objects_size = {} for i in range(number_of_objects): content = str(os.urandom(randint(1, 1024))) object_size = len(content) key = bucket.new_key(str(i)) objects_size[key.name] = object_size thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] keys = list(bucket.list()) start_time = time.time() delete_all_objects(master_zone.conn, bucket_name) time_diff = time.time() - start_time print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) # check http receiver http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) # cleanup topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_s3_notification_push_http_on_master(): """ test pushing http s3 notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket client_threads = [] objects_size = {} start_time = time.time() for i in range(number_of_objects): content = str(os.urandom(randint(1, 1024))) object_size = len(content) key = bucket.new_key(str(i)) objects_size[key.name] = object_size thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) # check http receiver keys = list(bucket.list()) http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) # delete objects from the bucket client_threads = [] start_time = time.time() for key in bucket.list(): thr = threading.Thread(target = key.delete, args=()) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) # check http receiver http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) # cleanup topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_s3_opaque_data(): """ test that opaque id set in topic, is sent in notification """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() master_zone, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 topic endpoint_address = 'http://'+host+':'+str(port) opaque_data = 'http://1.2.3.4:8888' endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket client_threads = [] content = 'bar' for i in range(number_of_objects): key = bucket.new_key(str(i)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http receiver keys = list(bucket.list()) print('total number of objects: ' + str(len(keys))) events = http_server.get_and_reset_events() for event in events: assert_equal(event['Records'][0]['opaqueData'], opaque_data) # cleanup for key in keys: key.delete() [thr.join() for thr in client_threads] topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_s3_opaque_data_on_master(): """ test that opaque id set in topic, is sent in notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread number_of_objects = 10 http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address opaque_data = 'http://1.2.3.4:8888' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket client_threads = [] start_time = time.time() content = 'bar' for i in range(number_of_objects): key = bucket.new_key(str(i)) thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) [thr.join() for thr in client_threads] time_diff = time.time() - start_time print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) # check http receiver keys = list(bucket.list()) print('total number of objects: ' + str(len(keys))) events = http_server.get_and_reset_events() for event in events: assert_equal(event['Records'][0]['opaqueData'], opaque_data) # cleanup for key in keys: key.delete() [thr.join() for thr in client_threads] topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_topic(): """ test set/get/delete of topic """ _, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) # get topic result, _ = topic_conf.get_config() # verify topic content parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) assert_equal(len(parsed_result['subs']), 0) assert_equal(parsed_result['topic']['arn'], 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name) # delete topic _, status = topic_conf.del_config() assert_equal(status/100, 2) # verift topic is deleted result, status = topic_conf.get_config() assert_equal(status, 404) parsed_result = json.loads(result) assert_equal(parsed_result['Code'], 'NoSuchKey') def test_ps_topic_with_endpoint(): """ test set topic with endpoint""" _, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic dest_endpoint = 'amqp://localhost:7001' dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=dest_endpoint, endpoint_args=dest_args) _, status = topic_conf.set_config() assert_equal(status/100, 2) # get topic result, _ = topic_conf.get_config() # verify topic content parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) assert_equal(parsed_result['topic']['dest']['push_endpoint'], dest_endpoint) # cleanup topic_conf.del_config() def test_ps_notification(): """ test set/get/delete of notification """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # get notification result, _ = notification_conf.get_config() parsed_result = json.loads(result) assert_equal(len(parsed_result['topics']), 1) assert_equal(parsed_result['topics'][0]['topic']['name'], topic_name) # delete notification _, status = notification_conf.del_config() assert_equal(status/100, 2) result, status = notification_conf.get_config() parsed_result = json.loads(result) assert_equal(len(parsed_result['topics']), 0) # TODO should return 404 # assert_equal(status, 404) # cleanup topic_conf.del_config() master_zone.delete_bucket(bucket_name) def test_ps_notification_events(): """ test set/get/delete of notification on specific events""" master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications events = "OBJECT_CREATE,OBJECT_DELETE" notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name, events) _, status = notification_conf.set_config() assert_equal(status/100, 2) # get notification result, _ = notification_conf.get_config() parsed_result = json.loads(result) assert_equal(len(parsed_result['topics']), 1) assert_equal(parsed_result['topics'][0]['topic']['name'], topic_name) assert_not_equal(len(parsed_result['topics'][0]['events']), 0) # TODO add test for invalid event name # cleanup notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) def test_ps_subscription(): """ test set/get/delete of subscription """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) # get the subscription result, _ = sub_conf.get_config() parsed_result = json.loads(result) assert_equal(parsed_result['topic'], topic_name) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') keys = list(bucket.list()) # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the delete events from the subscriptions #result, _ = sub_conf.get_events() #for event in events['events']: # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: check deletions # TODO: use exact match # verify_events_by_elements(events, keys, exact_match=False, deletions=True) # we should see the creations as well as the deletions # delete subscription _, status = sub_conf.del_config() assert_equal(status/100, 2) result, status = sub_conf.get_config() parsed_result = json.loads(result) assert_equal(parsed_result['topic'], '') # TODO should return 404 # assert_equal(status, 404) # cleanup notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) def test_ps_admin(): """ test radosgw-admin commands """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX realm = get_realm() zonegroup = realm.master_zonegroup() # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() result, status = topic_conf.get_config() assert_equal(status, 200) parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) result, status = ps_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + ps_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) assert len(parsed_result['topics']) > 0 result, status = ps_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + ps_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) # create s3 topics endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' topic_conf_s3 = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_conf_s3.set_config() result, status = topic_conf_s3.get_config() assert_equal(status, 200) assert_equal(result['GetTopicResponse']['GetTopicResult']['Topic']['Name'], topic_name) result, status = master_zone.zone.cluster.admin(['topic', 'list', '--uid', get_user()] + master_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) assert len(parsed_result['topics']) > 0 result, status = master_zone.zone.cluster.admin(['topic', 'get', '--uid', get_user(), '--topic', topic_name] + master_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) result, status = ps_zone.zone.cluster.admin(['subscription', 'get', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] + ps_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) assert_equal(parsed_result['name'], bucket_name+SUB_SUFFIX) # create objects in the bucket number_of_objects = 110 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync # get events from subscription zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] + ps_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) marker = parsed_result['next_marker'] events1 = parsed_result['events'] result, status = ps_zone.zone.cluster.admin(['subscription', 'pull', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--marker', marker] + ps_zone.zone.zone_arg()) assert_equal(status, 0) parsed_result = json.loads(result) events2 = parsed_result['events'] keys = list(bucket.list()) verify_events_by_elements({"events": events1+events2}, keys, exact_match=False) # ack an event in the subscription result, status = ps_zone.zone.cluster.admin(['subscription', 'ack', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX, '--event-id', events2[0]['id']] + ps_zone.zone.zone_arg()) assert_equal(status, 0) # remove the subscription result, status = ps_zone.zone.cluster.admin(['subscription', 'rm', '--uid', get_user(), '--subscription', bucket_name+SUB_SUFFIX] + ps_zone.zone.zone_arg()) assert_equal(status, 0) # remove the topics result, status = ps_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name] + ps_zone.zone.zone_arg()) assert_equal(status, 0) result, status = master_zone.zone.cluster.admin(['topic', 'rm', '--uid', get_user(), '--topic', topic_name] + master_zone.zone.zone_arg()) assert_equal(status, 0) # cleanup for key in bucket.list(): key.delete() notification_conf.del_config() master_zone.delete_bucket(bucket_name) def test_ps_incremental_sync(): """ test that events are only sent on incremental sync """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # create objects in the bucket number_of_objects = 10 for i in range(0, number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('foo') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create more objects in the bucket for i in range(number_of_objects, 2*number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() events = json.loads(result) count = 0 for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') count += 1 # make sure we have 10 and not 20 events assert_equal(count, number_of_objects) # cleanup for key in bucket.list(): key.delete() sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) def test_ps_event_type_subscription(): """ test subscriptions for different events """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create topic for objects creation topic_create_name = bucket_name+TOPIC_SUFFIX+'_create' topic_create_conf = PSTopic(ps_zone.conn, topic_create_name) topic_create_conf.set_config() # create topic for objects deletion topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete' topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name) topic_delete_conf.set_config() # create topic for all events topic_name = bucket_name+TOPIC_SUFFIX+'_all' topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # create notifications for objects creation notification_create_conf = PSNotification(ps_zone.conn, bucket_name, topic_create_name, "OBJECT_CREATE") _, status = notification_create_conf.set_config() assert_equal(status/100, 2) # create notifications for objects deletion notification_delete_conf = PSNotification(ps_zone.conn, bucket_name, topic_delete_name, "OBJECT_DELETE") _, status = notification_delete_conf.set_config() assert_equal(status/100, 2) # create notifications for all events notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name, "OBJECT_DELETE,OBJECT_CREATE") _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription for objects creation sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create', topic_create_name) _, status = sub_create_conf.set_config() assert_equal(status/100, 2) # create subscription for objects deletion sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete', topic_delete_name) _, status = sub_delete_conf.set_config() assert_equal(status/100, 2) # create subscription for all events sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all', topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the events from the creation subscription result, _ = sub_create_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') keys = list(bucket.list()) # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(len(events['events']), 0) # get the events from the all events subscription result, _ = sub_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) log.debug("Event (OBJECT_DELETE) synced") # get the events from the creations subscription result, _ = sub_create_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # deletions should not change the creation events # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # get the events from the deletions subscription result, _ = sub_delete_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # only deletions should be listed here # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False, deletions=True) # get the events from the all events subscription result, _ = sub_create_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # both deletions and creations should be here # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False, deletions=False) # verify_events_by_elements(events, keys, exact_match=False, deletions=True) # TODO: (1) test deletions (2) test overall number of events # test subscription deletion when topic is specified _, status = sub_create_conf.del_config(topic=True) assert_equal(status/100, 2) _, status = sub_delete_conf.del_config(topic=True) assert_equal(status/100, 2) _, status = sub_conf.del_config(topic=True) assert_equal(status/100, 2) # cleanup notification_create_conf.del_config() notification_delete_conf.del_config() notification_conf.del_config() topic_create_conf.del_config() topic_delete_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) def test_ps_event_fetching(): """ test incremental fetching of events from a subscription """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 100 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) max_events = 15 total_events_count = 0 next_marker = None all_events = [] while True: # get the events from the subscription result, _ = sub_conf.get_events(max_events, next_marker) events = json.loads(result) total_events_count += len(events['events']) all_events.extend(events['events']) next_marker = events['next_marker'] for event in events['events']: log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') if next_marker == '': break keys = list(bucket.list()) # TODO: use exact match verify_events_by_elements({'events': all_events}, keys, exact_match=False) # cleanup sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) def test_ps_event_acking(): """ test acking of some events in a subscription """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() events = json.loads(result) original_number_of_events = len(events) for event in events['events']: log.debug('Event (before ack) id: "' + str(event['id']) + '"') keys = list(bucket.list()) # TODO: use exact match verify_events_by_elements(events, keys, exact_match=False) # ack half of the events events_to_ack = number_of_objects/2 for event in events['events']: if events_to_ack == 0: break _, status = sub_conf.ack_events(event['id']) assert_equal(status/100, 2) events_to_ack -= 1 # verify that acked events are gone result, _ = sub_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event (after ack) id: "' + str(event['id']) + '"') assert len(events) >= (original_number_of_events - number_of_objects/2) # cleanup sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) def test_ps_creation_triggers(): """ test object creation notifications in using put/copy/post """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket using PUT key = bucket.new_key('put') key.set_contents_from_string('bar') # create objects in the bucket using COPY bucket.copy_key('copy', bucket.name, key.name) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') object_size = 1024 content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() fp.seek(0) uploader = bucket.initiate_multipart_upload('multipart') uploader.upload_part_from_file(fp, 1) uploader.complete_upload() fp.close() # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() events = json.loads(result) for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: verify the specific 3 keys: 'put', 'copy' and 'multipart' assert len(events['events']) >= 3 # cleanup sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") if not external_endpoint_address: hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') else: proc = None master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location) task.start() # create s3 topic if external_endpoint_address: endpoint_address = external_endpoint_address elif ca_location: endpoint_address = 'amqps://' + hostname else: endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl if ca_location: endpoint_args += '&ca-location={}'.format(ca_location) if external_endpoint_address: topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) else: topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) objects_size = {} # create objects in the bucket using PUT content = str(os.urandom(randint(1, 1024))) key_name = 'put' key = bucket.new_key(key_name) objects_size[key_name] = len(content) key.set_contents_from_string(content) # create objects in the bucket using COPY key_name = 'copy' bucket.copy_key(key_name, bucket.name, key.name) objects_size[key_name] = len(content) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') content = bytearray(os.urandom(10*1024*1024)) key_name = 'multipart' objects_size[key_name] = len(content) fp.write(content) fp.flush() fp.seek(0) uploader = bucket.initiate_multipart_upload(key_name) uploader.upload_part_from_file(fp, 1) uploader.complete_upload() fp.close() print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver keys = list(bucket.list()) receiver.verify_s3_events(keys, exact_match=True, expected_sizes=objects_size) # cleanup stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() for key in bucket.list(): key.delete() # delete the bucket master_zone.delete_bucket(bucket_name) if proc: clean_rabbitmq(proc) def test_ps_s3_creation_triggers_on_master(): ps_s3_creation_triggers_on_master() def test_ps_s3_creation_triggers_on_master_external(): from distutils.util import strtobool if 'AMQP_EXTERNAL_ENDPOINT' in os.environ: try: if strtobool(os.environ['AMQP_VERIFY_SSL']): verify_ssl = 'true' else: verify_ssl = 'false' except Exception as e: verify_ssl = 'true' ps_s3_creation_triggers_on_master( external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'], verify_ssl=verify_ssl) else: return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run") def test_ps_s3_creation_triggers_on_master_ssl(): import datetime import textwrap import stat from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from tempfile import TemporaryDirectory with TemporaryDirectory() as tempdir: # modify permissions to ensure that the rabbitmq user can access them os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem') CERTFILE = os.path.join(tempdir, 'server_certificate.pem') KEYFILE = os.path.join(tempdir, 'server_key.pem') RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config') root_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"), ]) root_cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( root_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=3650) ).add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True ).sign(root_key, hashes.SHA256(), default_backend()) with open(CACERTFILE, "wb") as f: f.write(root_cert.public_bytes(serialization.Encoding.PEM)) # Now we want to generate a cert from that root cert_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) with open(KEYFILE, "wb") as f: f.write(cert_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), )) new_subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"), ]) cert = x509.CertificateBuilder().subject_name( new_subject ).issuer_name( root_cert.issuer ).public_key( cert_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=30) ).add_extension( x509.SubjectAlternativeName([x509.DNSName(u"localhost")]), critical=False, ).sign(root_key, hashes.SHA256(), default_backend()) # Write our certificate out to disk. with open(CERTFILE, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) with open(RABBITMQ_CONF_FILE, "w") as f: # use the old style config format to ensure it also runs on older RabbitMQ versions. f.write(textwrap.dedent(f''' [ {{rabbit, [ {{ssl_listeners, [5671]}}, {{ssl_options, [{{cacertfile, "{CACERTFILE}"}}, {{certfile, "{CERTFILE}"}}, {{keyfile, "{KEYFILE}"}}, {{verify, verify_peer}}, {{fail_if_no_peer_cert, false}}]}}]}} ]. ''')) os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0] ps_s3_creation_triggers_on_master(ca_location=CACERTFILE) del os.environ['RABBITMQ_CONFIG_FILE'] def test_ps_s3_multipart_on_master(): """ test multipart object upload on master""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receivers exchange = 'ex1' task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1') task1.start() task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2') task2.start() task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3') task3.start() # create s3 topics endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) topic_arn3 = topic_conf3.set_config() # create s3 notifications notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] }, {'Id': notification_name+'_2', 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:Post'] }, {'Id': notification_name+'_3', 'TopicArn': topic_arn3, 'Events': ['s3:ObjectCreated:CompleteMultipartUpload'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') object_size = 1024 content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() fp.seek(0) uploader = bucket.initiate_multipart_upload('multipart') uploader.upload_part_from_file(fp, 1) uploader.complete_upload() fp.close() print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver events = receiver1.get_and_reset_events() assert_equal(len(events), 3) events = receiver2.get_and_reset_events() assert_equal(len(events), 1) assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:Post') assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_2') events = receiver3.get_and_reset_events() assert_equal(len(events), 1) assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') print(events[0]['Records'][0]['s3']['object']['size']) # cleanup stop_amqp_receiver(receiver1, task1) stop_amqp_receiver(receiver2, task2) stop_amqp_receiver(receiver3, task3) s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() topic_conf3.del_config() for key in bucket.list(): key.delete() # delete the bucket master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_versioned_deletion(): """ test notification of deletion markers """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topics topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1') _, status = topic_conf1.set_config() assert_equal(status/100, 2) topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2') _, status = topic_conf2.set_config() assert_equal(status/100, 2) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) bucket.configure_versioning(True) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications event_type1 = 'OBJECT_DELETE' notification_conf1 = PSNotification(ps_zone.conn, bucket_name, topic_name+'_1', event_type1) _, status = notification_conf1.set_config() assert_equal(status/100, 2) event_type2 = 'DELETE_MARKER_CREATE' notification_conf2 = PSNotification(ps_zone.conn, bucket_name, topic_name+'_2', event_type2) _, status = notification_conf2.set_config() assert_equal(status/100, 2) # create subscriptions sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1', topic_name+'_1') _, status = sub_conf1.set_config() assert_equal(status/100, 2) sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2', topic_name+'_2') _, status = sub_conf2.set_config() assert_equal(status/100, 2) # create objects in the bucket key = bucket.new_key('foo') key.set_contents_from_string('bar') v1 = key.version_id key.set_contents_from_string('kaboom') v2 = key.version_id # create deletion marker delete_marker_key = bucket.delete_key(key.name) # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # delete the deletion marker delete_marker_key.delete() # delete versions bucket.delete_key(key.name, version_id=v2) bucket.delete_key(key.name, version_id=v1) # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the delete events from the subscription result, _ = sub_conf1.get_events() events = json.loads(result) for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(str(event['event']), event_type1) result, _ = sub_conf2.get_events() events = json.loads(result) for event in events['events']: log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') assert_equal(str(event['event']), event_type2) # cleanup # follwing is needed for the cleanup in the case of 3-zones # see: http://tracker.ceph.com/issues/39142 realm = get_realm() zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup) try: zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) master_zone.delete_bucket(bucket_name) except: log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket') sub_conf1.del_config() sub_conf2.del_config() notification_conf1.del_config() notification_conf2.del_config() topic_conf1.del_config() topic_conf2.del_config() def test_ps_s3_metadata_on_master(): """ test s3 notification of metadata on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX meta_key = 'meta1' meta_value = 'This is my metadata value' meta_prefix = 'x-amz-meta-' topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], 'Filter': { 'Metadata': { 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}] } } }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) expected_keys = [] # create objects in the bucket key_name = 'foo' key = bucket.new_key(key_name) key.set_metadata(meta_key, meta_value) key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') expected_keys.append(key_name) # create objects in the bucket using COPY key_name = 'copy_of_foo' bucket.copy_key(key_name, bucket.name, key.name) expected_keys.append(key_name) # create another objects in the bucket using COPY # but override the metadata value key_name = 'another_copy_of_foo' bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'}) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') chunk_size = 1024*1024*5 # 5MB object_size = 10*chunk_size content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() fp.seek(0) key_name = 'multipart_foo' uploader = bucket.initiate_multipart_upload(key_name, metadata={meta_key: meta_value}) for i in range(1,5): uploader.upload_part_from_file(fp, i, size=chunk_size) fp.seek(i*chunk_size) uploader.complete_upload() fp.close() expected_keys.append(key_name) print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver events = receiver.get_and_reset_events() assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) # delete objects for key in bucket.list(): key.delete() print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys)) # all 3 object has metadata when deleted assert_equal(event_count, 3) # cleanup stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_s3_tags_on_master(): """ test s3 notification of tags on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], 'Filter': { 'Tags': { 'FilterRules': [{'Name': 'hello', 'Value': 'world'}] } } }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket with tags tags = 'hello=world&ka=boom' key_name1 = 'key1' put_object_tagging(master_zone.conn, bucket_name, key_name1, tags) tags = 'foo=bar&ka=boom' key_name2 = 'key2' put_object_tagging(master_zone.conn, bucket_name, key_name2, tags) key_name3 = 'key3' key = bucket.new_key(key_name3) key.set_contents_from_string('bar') # create objects in the bucket using COPY bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1) print('wait for 5sec for the messages...') time.sleep(5) expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}] # check amqp receiver filtered_count = 0 for event in receiver.get_and_reset_events(): obj_tags = event['Records'][0]['s3']['object']['tags'] assert_equal(obj_tags[0], expected_tags[0]) filtered_count += 1 assert_equal(filtered_count, 2) # delete the objects for key in bucket.list(): key.delete() print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver filtered_count = 0 for event in receiver.get_and_reset_events(): obj_tags = event['Records'][0]['s3']['object']['tags'] assert_equal(obj_tags[0], expected_tags[0]) filtered_count += 1 assert_equal(filtered_count, 2) # cleanup stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_s3_versioning_on_master(): """ test s3 notification of object versions """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) bucket.configure_versioning(True) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': [] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket key_value = 'foo' key = bucket.new_key(key_value) key.set_contents_from_string('hello') ver1 = key.version_id key.set_contents_from_string('world') ver2 = key.version_id print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver events = receiver.get_and_reset_events() num_of_versions = 0 for event_list in events: for event in event_list['Records']: assert_equal(event['s3']['object']['key'], key_value) version = event['s3']['object']['versionId'] num_of_versions += 1 if version not in (ver1, ver2): print('version mismatch: '+version+' not in: ('+ver1+', '+ver2+')') assert_equal(1, 0) else: print('version ok: '+version+' in: ('+ver1+', '+ver2+')') assert_equal(num_of_versions, 2) # cleanup stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket bucket.delete_key(key.name, version_id=ver2) bucket.delete_key(key.name, version_id=ver1) master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_s3_versioned_deletion_on_master(): """ test s3 notification of deletion markers on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() bucket = master_zone.create_bucket(bucket_name) bucket.configure_versioning(True) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:*'] }, {'Id': notification_name+'_2', 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated'] }, {'Id': notification_name+'_3', 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:Delete'] }] s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket key = bucket.new_key('foo') content = str(os.urandom(512)) size1 = len(content) key.set_contents_from_string(content) ver1 = key.version_id content = str(os.urandom(511)) size2 = len(content) key.set_contents_from_string(content) ver2 = key.version_id # create delete marker (non versioned deletion) delete_marker_key = bucket.delete_key(key.name) time.sleep(1) # versioned deletion bucket.delete_key(key.name, version_id=ver2) bucket.delete_key(key.name, version_id=ver1) print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver events = receiver.get_and_reset_events() delete_events = 0 delete_marker_create_events = 0 for event_list in events: for event in event_list['Records']: size = event['s3']['object']['size'] if event['eventName'] == 's3:ObjectRemoved:Delete': delete_events += 1 assert size in [size1, size2] assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3'] if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated': delete_marker_create_events += 1 assert size == size2 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2'] # 2 key versions were deleted # notified over the same topic via 2 notifications (1,3) assert_equal(delete_events, 2*2) # 1 deletion marker was created # notified over the same topic over 2 notifications (1,2) assert_equal(delete_marker_create_events, 1*2) # cleanup delete_marker_key.delete() stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_push_http(): """ test pushing to http endpoint """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread http_server = StreamingHTTPServer(host, port) # create topic topic_conf = PSTopic(ps_zone.conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name, endpoint='http://'+host+':'+str(port)) _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server keys = list(bucket.list()) # TODO: use exact match http_server.verify_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server # TODO: use exact match http_server.verify_events(keys, deletions=True, exact_match=False) # cleanup sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_s3_push_http(): """ test pushing to http endpoint s3 record format""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create random port for the http server host = get_ip() port = random.randint(10000, 20000) # start an http server in a separate thread http_server = StreamingHTTPServer(host, port) # create topic topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='http://'+host+':'+str(port)) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server keys = list(bucket.list()) # TODO: use exact match http_server.verify_s3_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server # TODO: use exact match http_server.verify_s3_events(keys, deletions=True, exact_match=False) # cleanup s3_notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_push_amqp(): """ test pushing to amqp endpoint """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() topic_conf = PSTopic(ps_zone.conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create notifications notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name, endpoint='amqp://'+hostname, endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker') _, status = sub_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver keys = list(bucket.list()) # TODO: use exact match receiver.verify_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver # TODO: use exact match receiver.verify_events(keys, deletions=True, exact_match=False) # cleanup stop_amqp_receiver(receiver, task) sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_s3_push_amqp(): """ test pushing to amqp endpoint s3 record format""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver keys = list(bucket.list()) # TODO: use exact match receiver.verify_s3_events(keys, exact_match=False) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver # TODO: use exact match receiver.verify_s3_events(keys, deletions=True, exact_match=False) # cleanup stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_delete_bucket(): """ test notification status upon bucket deletion """ master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) topic_name = bucket_name + TOPIC_SUFFIX # create topic topic_name = bucket_name + TOPIC_SUFFIX topic_conf = PSTopic(ps_zone.conn, topic_name) response, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(response) topic_arn = parsed_result['arn'] # create one s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create non-s3 notification notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for bucket sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for bucket sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # delete the bucket master_zone.delete_bucket(bucket_name) # wait for meta sync zone_meta_checkpoint(ps_zone.zone) # get the events from the auto-generated subscription sub_conf = PSSubscription(ps_zone.conn, notification_name, topic_name) result, _ = sub_conf.get_events() records = json.loads(result) # TODO: use exact match verify_s3_records_by_elements(records, keys, exact_match=False) # s3 notification is deleted with bucket _, status = s3_notification_conf.get_config(notification=notification_name) assert_equal(status, 404) # non-s3 notification is deleted with bucket _, status = notification_conf.get_config() assert_equal(status, 404) # cleanup sub_conf.del_config() topic_conf.del_config() def test_ps_missing_topic(): """ test creating a subscription when no topic info exists""" master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create bucket on the first of the rados zones master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_arn = 'arn:aws:sns:::' + topic_name topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) try: s3_notification_conf.set_config() except: log.info('missing topic is expected') else: assert 'missing topic is expected' # cleanup master_zone.delete_bucket(bucket_name) def test_ps_s3_topic_update(): """ test updating topic associated with a notification""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") rabbit_proc = init_rabbitmq() if rabbit_proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create amqp topic hostname = get_ip() exchange = 'ex1' amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name) amqp_task.start() topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # get topic result, _ = topic_conf.get_config() # verify topic content parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) # create http server port = random.randint(10000, 20000) # start an http server in a separate thread http_server = StreamingHTTPServer(hostname, port) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # TODO: use exact match receiver.verify_s3_events(keys, exact_match=False) # update the same topic with new endpoint topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='http://'+ hostname + ':' + str(port)) _, status = topic_conf.set_config() assert_equal(status/100, 2) # get topic result, _ = topic_conf.get_config() # verify topic content parsed_result = json.loads(result) assert_equal(parsed_result['topic']['name'], topic_name) assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint']) # delete current objects and create new objects in the bucket for key in bucket.list(): key.delete() for i in range(number_of_objects): key = bucket.new_key(str(i+100)) key.set_contents_from_string('bar') # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # verify that notifications are still sent to amqp # TODO: use exact match receiver.verify_s3_events(keys, exact_match=False) # update notification to update the endpoint from the topic topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # delete current objects and create new objects in the bucket for key in bucket.list(): key.delete() for i in range(number_of_objects): key = bucket.new_key(str(i+200)) key.set_contents_from_string('bar') # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # check that updates switched to http # TODO: use exact match http_server.verify_s3_events(keys, exact_match=False) # cleanup # delete objects from the bucket stop_amqp_receiver(receiver, amqp_task) for key in bucket.list(): key.delete() s3_notification_conf.del_config() topic_conf.del_config() master_zone.delete_bucket(bucket_name) http_server.close() clean_rabbitmq(rabbit_proc) def test_ps_s3_notification_update(): """ test updating the topic of a notification""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() rabbit_proc = init_rabbitmq() if rabbit_proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX topic_name2 = bucket_name+'http'+TOPIC_SUFFIX # create topics # start amqp receiver in a separate thread exchange = 'ex1' amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) amqp_task.start() # create random port for the http server http_port = random.randint(10000, 20000) # start an http server in a separate thread http_server = StreamingHTTPServer(hostname, http_port) topic_conf1 = PSTopic(ps_zone.conn, topic_name1, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf1.set_config() parsed_result = json.loads(result) topic_arn1 = parsed_result['arn'] assert_equal(status/100, 2) topic_conf2 = PSTopic(ps_zone.conn, topic_name2, endpoint='http://'+hostname+':'+str(http_port)) result, status = topic_conf2.set_config() parsed_result = json.loads(result) topic_arn2 = parsed_result['arn'] assert_equal(status/100, 2) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 notification with topic1 notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # TODO: use exact match receiver.verify_s3_events(keys, exact_match=False); # update notification to use topic2 topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # delete current objects and create new objects in the bucket for key in bucket.list(): key.delete() for i in range(number_of_objects): key = bucket.new_key(str(i+100)) key.set_contents_from_string('bar') # wait for sync zone_meta_checkpoint(ps_zone.zone) zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # check that updates switched to http # TODO: use exact match http_server.verify_s3_events(keys, exact_match=False) # cleanup # delete objects from the bucket stop_amqp_receiver(receiver, amqp_task) for key in bucket.list(): key.delete() s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() master_zone.delete_bucket(bucket_name) http_server.close() clean_rabbitmq(rabbit_proc) def test_ps_s3_multiple_topics_notification(): """ test notification creation with multiple topics""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() rabbit_proc = init_rabbitmq() if rabbit_proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX topic_name2 = bucket_name+'http'+TOPIC_SUFFIX # create topics # start amqp receiver in a separate thread exchange = 'ex1' amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1) amqp_task.start() # create random port for the http server http_port = random.randint(10000, 20000) # start an http server in a separate thread http_server = StreamingHTTPServer(hostname, http_port) topic_conf1 = PSTopic(ps_zone.conn, topic_name1, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf1.set_config() parsed_result = json.loads(result) topic_arn1 = parsed_result['arn'] assert_equal(status/100, 2) topic_conf2 = PSTopic(ps_zone.conn, topic_name2, endpoint='http://'+hostname+':'+str(http_port)) result, status = topic_conf2.set_config() parsed_result = json.loads(result) topic_arn2 = parsed_result['arn'] assert_equal(status/100, 2) # create bucket on the first of the rados zones bucket = master_zone.create_bucket(bucket_name) # wait for sync zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' topic_conf_list = [ { 'Id': notification_name1, 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] }, { 'Id': notification_name2, 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:*'] }] s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) result, _ = s3_notification_conf.get_config() assert_equal(len(result['TopicConfigurations']), 2) assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1) assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2) # get auto-generated subscriptions sub_conf1 = PSSubscription(ps_zone.conn, notification_name1, topic_name1) _, status = sub_conf1.get_config() assert_equal(status/100, 2) sub_conf2 = PSSubscription(ps_zone.conn, notification_name2, topic_name2) _, status = sub_conf2.get_config() assert_equal(status/100, 2) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the events from both of the subscription result, _ = sub_conf1.get_events() records = json.loads(result) for record in records['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match verify_s3_records_by_elements(records, keys, exact_match=False) receiver.verify_s3_events(keys, exact_match=False) result, _ = sub_conf2.get_events() parsed_result = json.loads(result) for record in parsed_result['Records']: log.debug(record) keys = list(bucket.list()) # TODO: use exact match verify_s3_records_by_elements(records, keys, exact_match=False) http_server.verify_s3_events(keys, exact_match=False) # cleanup stop_amqp_receiver(receiver, amqp_task) s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() # delete objects from the bucket for key in bucket.list(): key.delete() master_zone.delete_bucket(bucket_name) http_server.close() clean_rabbitmq(rabbit_proc)