summaryrefslogtreecommitdiffstats
path: root/src/test/rgw/bucket_notification
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/test/rgw/bucket_notification
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/rgw/bucket_notification')
-rw-r--r--src/test/rgw/bucket_notification/README.rst96
-rw-r--r--src/test/rgw/bucket_notification/__init__.py48
-rw-r--r--src/test/rgw/bucket_notification/api.py234
-rw-r--r--src/test/rgw/bucket_notification/bntests.conf.SAMPLE10
-rwxr-xr-xsrc/test/rgw/bucket_notification/bootstrap45
-rwxr-xr-xsrc/test/rgw/bucket_notification/kafka-security.sh49
-rw-r--r--src/test/rgw/bucket_notification/requirements.txt8
-rw-r--r--src/test/rgw/bucket_notification/setup.py19
-rw-r--r--src/test/rgw/bucket_notification/test_bn.py4128
9 files changed, 4637 insertions, 0 deletions
diff --git a/src/test/rgw/bucket_notification/README.rst b/src/test/rgw/bucket_notification/README.rst
new file mode 100644
index 000000000..9686bef71
--- /dev/null
+++ b/src/test/rgw/bucket_notification/README.rst
@@ -0,0 +1,96 @@
+==========================
+ Bucket Notification Tests
+==========================
+
+You will need to use the sample configuration file named ``bntests.conf.SAMPLE``
+that has been provided at ``/path/to/ceph/src/test/rgw/bucket_notification/``. You can also copy this file to the directory where you are
+running the tests and modify it if needed. This file can be used to run the bucket notification tests on a Ceph cluster started
+with vstart.
+For the tests covering Kafka and RabbitMQ security, the RGW will need to accept use/password without TLS connection between the client and the RGW.
+So, the cluster will have to be started with the following ``rgw_allow_notification_secrets_in_cleartext`` parameter set to ``true``.
+For example::
+
+ MON=1 OSD=1 MDS=0 MGR=1 RGW=1 ../src/vstart.sh -n -d -o "rgw_allow_notification_secrets_in_cleartext=true"
+
+===========
+Kafka Tests
+===========
+
+You also need to install Kafka which can be downloaded from: https://kafka.apache.org/downloads
+
+To test Kafka security, you should first run the ``kafka-security.sh`` script inside the Kafka directory.
+
+Then edit the Kafka server properties file (``/path/to/kafka/config/server.properties``)
+to have the following lines::
+
+ listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
+ ssl.keystore.location=/home/ylifshit/kafka-3.3.1-src/server.keystore.jks
+ ssl.keystore.password=mypassword
+ ssl.key.password=mypassword
+ ssl.truststore.location=/home/ylifshit/kafka-3.3.1-src/server.truststore.jks
+ ssl.truststore.password=mypassword
+ sasl.enabled.mechanisms=PLAIN
+ listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+ username="alice" \
+ password="alice-secret" \
+ user_alice="alice-secret";
+
+After following the above steps, start the Zookeeper and Kafka services.
+For starting Zookeeper service run::
+
+ bin/zookeeper-server-start.sh config/zookeeper.properties
+
+and then start the Kafka service::
+
+ bin/kafka-server-start.sh config/server.properties
+
+If you want to run Zookeeper and Kafka services in the background add ``-daemon`` at the end of the command like::
+
+ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
+
+and::
+
+ bin/kafka-server-start.sh -daemon config/server.properties
+
+After running vstart, Zookeeper, and Kafka services you're ready to run the Kafka tests::
+
+ BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_test'
+
+To run the Kafka security test, you also need to provide the test with the location of the Kafka directory::
+
+ KAFKA_DIR=/path/to/kafkaBNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'kafka_ssl_test'
+
+==============
+RabbitMQ Tests
+==============
+
+You need to install RabbitMQ in the following way::
+
+ sudo dnf install rabbitmq-server
+
+Then you need to run the following command::
+
+ sudo chkconfig rabbitmq-server on
+
+Finally, to start the RabbitMQ server you need to run the following command::
+
+ sudo /sbin/service rabbitmq-server start
+
+To confirm that the RabbitMQ server is running you can run the following command to check the status of the server::
+
+ sudo /sbin/service rabbitmq-server status
+
+After running vstart and RabbitMQ server you're ready to run the AMQP tests::
+
+ BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_test'
+
+After running the tests you need to stop the vstart cluster (``/path/to/ceph/src/stop.sh``) and the RabbitMQ server by running the following command::
+
+ sudo /sbin/service rabbitmq-server stop
+
+To run the RabbitMQ SSL security tests use the following::
+
+ BNTESTS_CONF=bntests.conf python -m nose -s /path/to/ceph/src/test/rgw/bucket_notification/test_bn.py -v -a 'amqp_ssl_test'
+
+During these tests, the test script will restart the RabbitMQ server with the correct security configuration (``sudo`` privileges will be needed).
+
diff --git a/src/test/rgw/bucket_notification/__init__.py b/src/test/rgw/bucket_notification/__init__.py
new file mode 100644
index 000000000..6785fce92
--- /dev/null
+++ b/src/test/rgw/bucket_notification/__init__.py
@@ -0,0 +1,48 @@
+import configparser
+import os
+
+def setup():
+ cfg = configparser.RawConfigParser()
+ try:
+ path = os.environ['BNTESTS_CONF']
+ except KeyError:
+ raise RuntimeError(
+ 'To run tests, point environment '
+ + 'variable BNTESTS_CONF to a config file.',
+ )
+ cfg.read(path)
+
+ if not cfg.defaults():
+ raise RuntimeError('Your config file is missing the DEFAULT section!')
+ if not cfg.has_section("s3 main"):
+ raise RuntimeError('Your config file is missing the "s3 main" section!')
+
+ defaults = cfg.defaults()
+
+ global default_host
+ default_host = defaults.get("host")
+
+ global default_port
+ default_port = int(defaults.get("port"))
+
+ global main_access_key
+ main_access_key = cfg.get('s3 main',"access_key")
+
+ global main_secret_key
+ main_secret_key = cfg.get('s3 main',"secret_key")
+
+def get_config_host():
+ global default_host
+ return default_host
+
+def get_config_port():
+ global default_port
+ return default_port
+
+def get_access_key():
+ global main_access_key
+ return main_access_key
+
+def get_secret_key():
+ global main_secret_key
+ return main_secret_key
diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py
new file mode 100644
index 000000000..fe38576fb
--- /dev/null
+++ b/src/test/rgw/bucket_notification/api.py
@@ -0,0 +1,234 @@
+import logging
+import ssl
+import urllib
+import hmac
+import hashlib
+import base64
+import xmltodict
+from http import client as http_client
+from urllib import parse as urlparse
+from time import gmtime, strftime
+import boto3
+from botocore.client import Config
+import os
+import subprocess
+
+log = logging.getLogger('bucket_notification.tests')
+
+NO_HTTP_BODY = ''
+
+def put_object_tagging(conn, bucket_name, key, tags):
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+ return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
+
+def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
+ """generic request sending to pubsub radogw
+ should cover: topics, notificatios and subscriptions
+ """
+ url_params = ''
+ if parameters is not None:
+ url_params = urlparse.urlencode(parameters)
+ # remove 'None' from keys with no values
+ url_params = url_params.replace('=None', '')
+ url_params = '?' + url_params
+ if extra_parameters is not None:
+ url_params = url_params + '&' + extra_parameters
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
+ if sign_parameters:
+ string_to_sign += url_params
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest()).decode('ascii')
+ headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': conn.host+':'+str(conn.port)}
+ http_conn = http_client.HTTPConnection(conn.host, conn.port)
+ if log.getEffectiveLevel() <= 10:
+ http_conn.set_debuglevel(5)
+ http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ return data.decode('utf-8'), status
+
+
+def delete_all_objects(conn, bucket_name):
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+
+ objects = []
+ for key in client.list_objects(Bucket=bucket_name)['Contents']:
+ objects.append({'Key': key['Key']})
+ # delete objects from the bucket
+ response = client.delete_objects(Bucket=bucket_name,
+ Delete={'Objects': objects})
+
+
+class PSTopicS3:
+ """class to set/list/get/delete a topic
+ POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
+ POST ?Action=ListTopics
+ POST ?Action=GetTopic&TopicArn=<topic-arn>
+ POST ?Action=DeleteTopic&TopicArn=<topic-arn>
+ """
+ def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
+ self.conn = conn
+ self.topic_name = topic_name.strip()
+ assert self.topic_name
+ self.topic_arn = ''
+ self.attributes = {}
+ if endpoint_args is not None:
+ self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
+ if opaque_data is not None:
+ self.attributes['OpaqueData'] = opaque_data
+ protocol = 'https' if conn.is_secure else 'http'
+ self.client = boto3.client('sns',
+ endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ verify='./cert.pem')
+
+ def get_config(self):
+ """get topic info"""
+ parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
+ body = urlparse.urlencode(parameters)
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+ resource = '/'
+ method = 'POST'
+ string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+ log.debug('StringTosign: %s', string_to_sign)
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest()).decode('ascii')
+ headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': self.conn.host+':'+str(self.conn.port),
+ 'Content-Type': content_type}
+ if self.conn.is_secure:
+ http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
+ context=ssl.create_default_context(cafile='./cert.pem'))
+ else:
+ http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn.request(method, resource, body, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ dict_response = xmltodict.parse(data)
+ return dict_response, status
+
+ def set_config(self):
+ """set topic"""
+ result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
+ self.topic_arn = result['TopicArn']
+ return self.topic_arn
+
+ def del_config(self, topic_arn=None):
+ """delete topic"""
+ result = self.client.delete_topic(TopicArn=(topic_arn if topic_arn is not None else self.topic_arn))
+ return result['ResponseMetadata']['HTTPStatusCode']
+
+ def get_list(self):
+ """list all topics"""
+ # note that boto3 supports list_topics(), however, the result only show ARNs
+ parameters = {'Action': 'ListTopics'}
+ body = urlparse.urlencode(parameters)
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+ resource = '/'
+ method = 'POST'
+ string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+ log.debug('StringTosign: %s', string_to_sign)
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest()).decode('ascii')
+ headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': self.conn.host+':'+str(self.conn.port),
+ 'Content-Type': content_type}
+ if self.conn.is_secure:
+ http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
+ context=ssl.create_default_context(cafile='./cert.pem'))
+ else:
+ http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn.request(method, resource, body, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ dict_response = xmltodict.parse(data)
+ return dict_response, status
+
+class PSNotificationS3:
+ """class to set/get/delete an S3 notification
+ PUT /<bucket>?notification
+ GET /<bucket>?notification[=<notification>]
+ DELETE /<bucket>?notification[=<notification>]
+ """
+ def __init__(self, conn, bucket_name, topic_conf_list):
+ self.conn = conn
+ assert bucket_name.strip()
+ self.bucket_name = bucket_name
+ self.resource = '/'+bucket_name
+ self.topic_conf_list = topic_conf_list
+ self.client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+
+ def send_request(self, method, parameters=None):
+ """send request to radosgw"""
+ return make_request(self.conn, method, self.resource,
+ parameters=parameters, sign_parameters=True)
+
+ def get_config(self, notification=None):
+ """get notification info"""
+ parameters = None
+ if notification is None:
+ response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
+ status = response['ResponseMetadata']['HTTPStatusCode']
+ return response, status
+ parameters = {'notification': notification}
+ response, status = self.send_request('GET', parameters=parameters)
+ dict_response = xmltodict.parse(response)
+ return dict_response, status
+
+ def set_config(self):
+ """set notification"""
+ response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
+ NotificationConfiguration={
+ 'TopicConfigurations': self.topic_conf_list
+ })
+ status = response['ResponseMetadata']['HTTPStatusCode']
+ return response, status
+
+ def del_config(self, notification=None):
+ """delete notification"""
+ parameters = {'notification': notification}
+
+ return self.send_request('DELETE', parameters)
+
+
+test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/../'
+
+def bash(cmd, **kwargs):
+ log.debug('running command: %s', ' '.join(cmd))
+ kwargs['stdout'] = subprocess.PIPE
+ process = subprocess.Popen(cmd, **kwargs)
+ s = process.communicate()[0].decode('utf-8')
+ return (s, process.returncode)
+
+def admin(args, **kwargs):
+ """ radosgw-admin command """
+ cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
+ return bash(cmd, **kwargs)
+
diff --git a/src/test/rgw/bucket_notification/bntests.conf.SAMPLE b/src/test/rgw/bucket_notification/bntests.conf.SAMPLE
new file mode 100644
index 000000000..eb3291daf
--- /dev/null
+++ b/src/test/rgw/bucket_notification/bntests.conf.SAMPLE
@@ -0,0 +1,10 @@
+[DEFAULT]
+port = 8000
+host = localhost
+
+[s3 main]
+access_key = 0555b35654ad1656d804
+secret_key = h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==
+display_name = M. Tester
+user_id = testid
+email = tester@ceph.com
diff --git a/src/test/rgw/bucket_notification/bootstrap b/src/test/rgw/bucket_notification/bootstrap
new file mode 100755
index 000000000..4d4a5a748
--- /dev/null
+++ b/src/test/rgw/bucket_notification/bootstrap
@@ -0,0 +1,45 @@
+#!/bin/sh
+set -e
+
+if [ -f /etc/debian_version ]; then
+ for package in python3-pip python3-dev python3-xmltodict python3-pika libevent-dev libxml2-dev libxslt-dev zlib1g-dev; do
+ if [ "$(dpkg --status -- $package 2>/dev/null|sed -n 's/^Status: //p')" != "install ok installed" ]; then
+ # add a space after old values
+ missing="${missing:+$missing }$package"
+ fi
+ done
+ if [ -n "$missing" ]; then
+ echo "$0: missing required DEB packages. Installing via sudo." 1>&2
+ sudo apt-get -y install $missing
+ fi
+fi
+if [ -f /etc/redhat-release ]; then
+ for package in python3-pip python3-devel python3-xmltodict python3-pika libevent-devel libxml2-devel libxslt-devel zlib-devel; do
+ if [ "$(rpm -qa $package 2>/dev/null)" == "" ]; then
+ missing="${missing:+$missing }$package"
+ fi
+ done
+ if [ -n "$missing" ]; then
+ echo "$0: missing required RPM packages. Installing via sudo." 1>&2
+ sudo yum -y install $missing
+ fi
+fi
+
+python3 -m venv --system-site-packages virtualenv
+
+# avoid pip bugs
+./virtualenv/bin/pip install --upgrade pip
+#pip3 install --upgrade setuptools cffi # address pip issue: https://github.com/pypa/pip/issues/6264
+
+# work-around change in pip 1.5
+#./virtualenv/bin/pip install six
+#./virtualenv/bin/pip install -I nose
+#./virtualenv/bin/pip install setuptools
+
+./virtualenv/bin/pip install -U -r requirements.txt
+
+# forbid setuptools from using the network because it'll try to use
+# easy_install, and we really wanted pip; next line will fail if pip
+# requirements.txt does not match setup.py requirements -- sucky but
+# good enough for now
+./virtualenv/bin/python setup.py develop
diff --git a/src/test/rgw/bucket_notification/kafka-security.sh b/src/test/rgw/bucket_notification/kafka-security.sh
new file mode 100755
index 000000000..6c6f3e261
--- /dev/null
+++ b/src/test/rgw/bucket_notification/kafka-security.sh
@@ -0,0 +1,49 @@
+FQDN=localhost
+KEYFILE=server.keystore.jks
+TRUSTFILE=server.truststore.jks
+CAFILE=y-ca.crt
+CAKEYFILE=y-ca.key
+REQFILE=$FQDN.req
+CERTFILE=$FQDN.crt
+MYPW=mypassword
+VALIDITY=36500
+
+rm -f $KEYFILE
+rm -f $TRUSTFILE
+rm -f $CAFILE
+rm -f $REQFILE
+rm -f $CERTFILE
+
+echo "########## create the request in key store '$KEYFILE'"
+keytool -keystore $KEYFILE -alias localhost \
+ -dname "CN=$FQDN, OU=Michigan Engineering, O=Red Hat Inc, \
+ L=Ann Arbor, ST=Michigan, C=US" \
+ -storepass $MYPW -keypass $MYPW \
+ -validity $VALIDITY -genkey -keyalg RSA -ext SAN=DNS:"$FQDN"
+
+echo "########## create the CA '$CAFILE'"
+openssl req -new -nodes -x509 -keyout $CAKEYFILE -out $CAFILE \
+ -days $VALIDITY -subj \
+ '/C=US/ST=Michigan/L=Ann Arbor/O=Red Hat Inc/OU=Michigan Engineering/CN=yuval-1'
+
+echo "########## store the CA in trust store '$TRUSTFILE'"
+keytool -keystore $TRUSTFILE -storepass $MYPW -alias CARoot \
+ -noprompt -importcert -file $CAFILE
+
+echo "########## create a request '$REQFILE' for signing in key store '$KEYFILE'"
+keytool -storepass $MYPW -keystore $KEYFILE \
+ -alias localhost -certreq -file $REQFILE
+
+echo "########## sign and create certificate '$CERTFILE'"
+openssl x509 -req -CA $CAFILE -CAkey $CAKEYFILE -CAcreateserial \
+ -days $VALIDITY \
+ -in $REQFILE -out $CERTFILE
+
+echo "########## store CA '$CAFILE' in key store '$KEYFILE'"
+keytool -storepass $MYPW -keystore $KEYFILE -alias CARoot \
+ -noprompt -importcert -file $CAFILE
+
+echo "########## store certificate '$CERTFILE' in key store '$KEYFILE'"
+keytool -storepass $MYPW -keystore $KEYFILE -alias localhost \
+ -import -file $CERTFILE
+
diff --git a/src/test/rgw/bucket_notification/requirements.txt b/src/test/rgw/bucket_notification/requirements.txt
new file mode 100644
index 000000000..a3cff2bed
--- /dev/null
+++ b/src/test/rgw/bucket_notification/requirements.txt
@@ -0,0 +1,8 @@
+nose >=1.0.0
+boto >=2.6.0
+boto3 >=1.0.0
+configparser >=5.0.0
+kafka-python >=2.0.0
+pika
+cloudevents
+xmltodict
diff --git a/src/test/rgw/bucket_notification/setup.py b/src/test/rgw/bucket_notification/setup.py
new file mode 100644
index 000000000..189ab27b4
--- /dev/null
+++ b/src/test/rgw/bucket_notification/setup.py
@@ -0,0 +1,19 @@
+#!/usr/bin/python
+from setuptools import setup, find_packages
+
+setup(
+ name='bn_tests',
+ version='0.0.1',
+ packages=find_packages(),
+
+ author='Kalpesh Pandya',
+ author_email='kapandya@redhat.com',
+ description='Bucket Notification compatibility tests',
+ license='MIT',
+ keywords='bn web testing',
+
+ install_requires=[
+ 'boto >=2.0b4',
+ 'boto3 >=1.0.0'
+ ],
+ )
diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py
new file mode 100644
index 000000000..87a2acb76
--- /dev/null
+++ b/src/test/rgw/bucket_notification/test_bn.py
@@ -0,0 +1,4128 @@
+import logging
+import json
+import tempfile
+import random
+import threading
+import subprocess
+import socket
+import time
+import os
+import string
+import boto
+from botocore.exceptions import ClientError
+from http import server as http_server
+from random import randint
+import hashlib
+from nose.plugins.attrib import attr
+import boto3
+import datetime
+from cloudevents.http import from_http
+from dateutil import parser
+
+from boto.s3.connection import S3Connection
+
+from . import(
+ get_config_host,
+ get_config_port,
+ get_access_key,
+ get_secret_key
+ )
+
+from .api import PSTopicS3, \
+ PSNotificationS3, \
+ delete_all_objects, \
+ put_object_tagging, \
+ admin
+
+from nose import SkipTest
+from nose.tools import assert_not_equal, assert_equal, assert_in
+import boto.s3.tagging
+
+# configure logging for the tests module
+log = logging.getLogger(__name__)
+
+TOPIC_SUFFIX = "_topic"
+NOTIFICATION_SUFFIX = "_notif"
+
+
+num_buckets = 0
+run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
+
+def gen_bucket_name():
+ global num_buckets
+
+ num_buckets += 1
+ return run_prefix + '-' + str(num_buckets)
+
+
+def set_contents_from_string(key, content):
+ try:
+ key.set_contents_from_string(content)
+ except Exception as e:
+ print('Error: ' + str(e))
+
+
+class HTTPPostHandler(http_server.BaseHTTPRequestHandler):
+ """HTTP POST hanler class storing the received events in its http server"""
+ def do_POST(self):
+ """implementation of POST handler"""
+ content_length = int(self.headers['Content-Length'])
+ body = self.rfile.read(content_length)
+ if self.server.cloudevents:
+ event = from_http(self.headers, body)
+ record = json.loads(body)['Records'][0]
+ assert_equal(event['specversion'], '1.0')
+ assert_equal(event['id'], record['responseElements']['x-amz-request-id'] + '.' + record['responseElements']['x-amz-id-2'])
+ assert_equal(event['source'], 'ceph:s3.' + record['awsRegion'] + '.' + record['s3']['bucket']['name'])
+ assert_equal(event['type'], 'com.amazonaws.' + record['eventName'])
+ assert_equal(event['datacontenttype'], 'application/json')
+ assert_equal(event['subject'], record['s3']['object']['key'])
+ assert_equal(parser.parse(event['time']), parser.parse(record['eventTime']))
+ log.info('HTTP Server (%d) received event: %s', self.server.worker_id, str(body))
+ self.server.append(json.loads(body))
+ if self.headers.get('Expect') == '100-continue':
+ self.send_response(100)
+ else:
+ self.send_response(200)
+ if self.server.delay > 0:
+ time.sleep(self.server.delay)
+ self.end_headers()
+
+
+class HTTPServerWithEvents(http_server.HTTPServer):
+ """HTTP server used by the handler to store events"""
+ def __init__(self, addr, handler, worker_id, delay=0, cloudevents=False):
+ http_server.HTTPServer.__init__(self, addr, handler, False)
+ self.worker_id = worker_id
+ self.events = []
+ self.delay = delay
+ self.cloudevents = cloudevents
+
+ def append(self, event):
+ self.events.append(event)
+
+class HTTPServerThread(threading.Thread):
+ """thread for running the HTTP server. reusing the same socket for all threads"""
+ def __init__(self, i, sock, addr, delay=0, cloudevents=False):
+ threading.Thread.__init__(self)
+ self.i = i
+ self.daemon = True
+ self.httpd = HTTPServerWithEvents(addr, HTTPPostHandler, i, delay, cloudevents)
+ self.httpd.socket = sock
+ # prevent the HTTP server from re-binding every handler
+ self.httpd.server_bind = self.server_close = lambda self: None
+ self.start()
+
+ def run(self):
+ try:
+ log.info('HTTP Server (%d) started on: %s', self.i, self.httpd.server_address)
+ self.httpd.serve_forever()
+ log.info('HTTP Server (%d) ended', self.i)
+ except Exception as error:
+ # could happen if the server r/w to a closing socket during shutdown
+ log.info('HTTP Server (%d) ended unexpectedly: %s', self.i, str(error))
+
+ def close(self):
+ self.httpd.shutdown()
+
+ def get_events(self):
+ return self.httpd.events
+
+ def reset_events(self):
+ self.httpd.events = []
+
+class StreamingHTTPServer:
+ """multi-threaded http server class also holding list of events received into the handler
+ each thread has its own server, and all servers share the same socket"""
+ def __init__(self, host, port, num_workers=100, delay=0, cloudevents=False):
+ addr = (host, port)
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.sock.bind(addr)
+ self.sock.listen(num_workers)
+ self.workers = [HTTPServerThread(i, self.sock, addr, delay, cloudevents) for i in range(num_workers)]
+
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
+ """verify stored s3 records agains a list of keys"""
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ worker.reset_events()
+ verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
+
+ def verify_events(self, keys, exact_match=False, deletions=False):
+ """verify stored events agains a list of keys"""
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ worker.reset_events()
+ verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+
+ def get_and_reset_events(self):
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ worker.reset_events()
+ return events
+
+ def close(self):
+ """close all workers in the http server and wait for it to finish"""
+ # make sure that the shared socket is closed
+ # this is needed in case that one of the threads is blocked on the socket
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+ # wait for server threads to finish
+ for worker in self.workers:
+ worker.close()
+ worker.join()
+
+# AMQP endpoint functions
+
+class AMQPReceiver(object):
+ """class for receiving and storing messages on a topic from the AMQP broker"""
+ def __init__(self, exchange, topic, external_endpoint_address=None, ca_location=None):
+ import pika
+ import ssl
+
+ if ca_location:
+ ssl_context = ssl.create_default_context()
+ ssl_context.load_verify_locations(cafile=ca_location)
+ ssl_options = pika.SSLOptions(ssl_context)
+ rabbitmq_port = 5671
+ else:
+ rabbitmq_port = 5672
+ ssl_options = None
+
+ if external_endpoint_address:
+ if ssl_options:
+ # this is currently not working due to: https://github.com/pika/pika/issues/1192
+ params = pika.URLParameters(external_endpoint_address, ssl_options=ssl_options)
+ else:
+ params = pika.URLParameters(external_endpoint_address)
+ else:
+ hostname = get_ip()
+ params = pika.ConnectionParameters(host=hostname, port=rabbitmq_port, ssl_options=ssl_options)
+
+ remaining_retries = 10
+ while remaining_retries > 0:
+ try:
+ connection = pika.BlockingConnection(params)
+ break
+ except Exception as error:
+ remaining_retries -= 1
+ print('failed to connect to rabbitmq (remaining retries '
+ + str(remaining_retries) + '): ' + str(error))
+ time.sleep(1)
+
+ if remaining_retries == 0:
+ raise Exception('failed to connect to rabbitmq - no retries left')
+
+ self.channel = connection.channel()
+ self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
+ result = self.channel.queue_declare('', exclusive=True)
+ queue_name = result.method.queue
+ self.channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=topic)
+ self.channel.basic_consume(queue=queue_name,
+ on_message_callback=self.on_message,
+ auto_ack=True)
+ self.events = []
+ self.topic = topic
+
+ def on_message(self, ch, method, properties, body):
+ """callback invoked when a new message arrive on the topic"""
+ log.info('AMQP received event for topic %s:\n %s', self.topic, body)
+ self.events.append(json.loads(body))
+
+ # TODO create a base class for the AMQP and HTTP cases
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
+ """verify stored s3 records agains a list of keys"""
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
+ self.events = []
+
+ def verify_events(self, keys, exact_match=False, deletions=False):
+ """verify stored events agains a list of keys"""
+ verify_events_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+ self.events = []
+
+ def get_and_reset_events(self):
+ tmp = self.events
+ self.events = []
+ return tmp
+
+
+def amqp_receiver_thread_runner(receiver):
+ """main thread function for the amqp receiver"""
+ try:
+ log.info('AMQP receiver started')
+ receiver.channel.start_consuming()
+ log.info('AMQP receiver ended')
+ except Exception as error:
+ log.info('AMQP receiver ended unexpectedly: %s', str(error))
+
+
+def create_amqp_receiver_thread(exchange, topic, external_endpoint_address=None, ca_location=None):
+ """create amqp receiver and thread"""
+ receiver = AMQPReceiver(exchange, topic, external_endpoint_address, ca_location)
+ task = threading.Thread(target=amqp_receiver_thread_runner, args=(receiver,))
+ task.daemon = True
+ return task, receiver
+
+def stop_amqp_receiver(receiver, task):
+ """stop the receiver thread and wait for it to finis"""
+ try:
+ receiver.channel.stop_consuming()
+ log.info('stopping AMQP receiver')
+ except Exception as error:
+ log.info('failed to gracefuly stop AMQP receiver: %s', str(error))
+ task.join(5)
+
+
+def init_rabbitmq():
+ """ start a rabbitmq broker """
+ hostname = get_ip()
+ try:
+ # first try to stop any existing process
+ subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
+ time.sleep(5)
+ proc = subprocess.Popen(['sudo', '--preserve-env=RABBITMQ_CONFIG_FILE', 'rabbitmq-server'])
+ except Exception as error:
+ log.info('failed to execute rabbitmq-server: %s', str(error))
+ print('failed to execute rabbitmq-server: %s' % str(error))
+ return None
+ # TODO add rabbitmq checkpoint instead of sleep
+ time.sleep(5)
+ return proc
+
+
+def clean_rabbitmq(proc):
+ """ stop the rabbitmq broker """
+ try:
+ subprocess.call(['sudo', 'rabbitmqctl', 'stop'])
+ time.sleep(5)
+ proc.terminate()
+ except:
+ log.info('rabbitmq server already terminated')
+
+
+def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
+ """ verify there is at least one event per element """
+ err = ''
+ for key in keys:
+ key_found = False
+ if type(events) is list:
+ for event_list in events:
+ if key_found:
+ break
+ for event in event_list['events']:
+ if event['info']['bucket']['name'] == key.bucket.name and \
+ event['info']['key']['name'] == key.name:
+ if deletions and event['event'] == 'OBJECT_DELETE':
+ key_found = True
+ break
+ elif not deletions and event['event'] == 'OBJECT_CREATE':
+ key_found = True
+ break
+ else:
+ for event in events['events']:
+ if event['info']['bucket']['name'] == key.bucket.name and \
+ event['info']['key']['name'] == key.name:
+ if deletions and event['event'] == 'OBJECT_DELETE':
+ key_found = True
+ break
+ elif not deletions and event['event'] == 'OBJECT_CREATE':
+ key_found = True
+ break
+
+ if not key_found:
+ err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
+ log.error(events)
+ assert False, err
+
+ if not len(events) == len(keys):
+ err = 'superfluous events are found'
+ log.debug(err)
+ if exact_match:
+ log.error(events)
+ assert False, err
+
+META_PREFIX = 'x-amz-meta-'
+
+def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}, etags=[]):
+ """ verify there is at least one record per element """
+ err = ''
+ for key in keys:
+ key_found = False
+ object_size = 0
+ if type(records) is list:
+ for record_list in records:
+ if key_found:
+ break
+ for record in record_list['Records']:
+ assert_in('eTag', record['s3']['object'])
+ if record['s3']['bucket']['name'] == key.bucket.name and \
+ record['s3']['object']['key'] == key.name:
+ # Assertion Error needs to be fixed
+ #assert_equal(key.etag[1:-1], record['s3']['object']['eTag'])
+ if etags:
+ assert_in(key.etag[1:-1], etags)
+ if len(record['s3']['object']['metadata']) > 0:
+ for meta in record['s3']['object']['metadata']:
+ assert(meta['key'].startswith(META_PREFIX))
+ if deletions and record['eventName'].startswith('ObjectRemoved'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+ elif not deletions and record['eventName'].startswith('ObjectCreated'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+ else:
+ for record in records['Records']:
+ assert_in('eTag', record['s3']['object'])
+ if record['s3']['bucket']['name'] == key.bucket.name and \
+ record['s3']['object']['key'] == key.name:
+ assert_equal(key.etag, record['s3']['object']['eTag'])
+ if etags:
+ assert_in(key.etag[1:-1], etags)
+ if len(record['s3']['object']['metadata']) > 0:
+ for meta in record['s3']['object']['metadata']:
+ assert(meta['key'].startswith(META_PREFIX))
+ if deletions and record['eventName'].startswith('ObjectRemoved'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+ elif not deletions and record['eventName'].startswith('ObjectCreated'):
+ key_found = True
+ object_size = record['s3']['object']['size']
+ break
+
+ if not key_found:
+ err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
+ assert False, err
+ elif expected_sizes:
+ assert_equal(object_size, expected_sizes.get(key.name))
+
+ if not len(records) == len(keys):
+ err = 'superfluous records are found'
+ log.warning(err)
+ if exact_match:
+ for record_list in records:
+ for record in record_list['Records']:
+ log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
+ assert False, err
+
+
+# Kafka endpoint functions
+
+kafka_server = 'localhost'
+
+class KafkaReceiver(object):
+ """class for receiving and storing messages on a topic from the kafka broker"""
+ def __init__(self, topic, security_type):
+ from kafka import KafkaConsumer
+ remaining_retries = 10
+ port = 9092
+ if security_type != 'PLAINTEXT':
+ security_type = 'SSL'
+ port = 9093
+ while remaining_retries > 0:
+ try:
+ self.consumer = KafkaConsumer(topic,
+ bootstrap_servers = kafka_server+':'+str(port),
+ security_protocol=security_type,
+ consumer_timeout_ms=16000)
+ print('Kafka consumer created on topic: '+topic)
+ break
+ except Exception as error:
+ remaining_retries -= 1
+ print('failed to connect to kafka (remaining retries '
+ + str(remaining_retries) + '): ' + str(error))
+ time.sleep(1)
+
+ if remaining_retries == 0:
+ raise Exception('failed to connect to kafka - no retries left')
+
+ self.events = []
+ self.topic = topic
+ self.stop = False
+
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, etags=[]):
+ """verify stored s3 records agains a list of keys"""
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags)
+ self.events = []
+
+def kafka_receiver_thread_runner(receiver):
+ """main thread function for the kafka receiver"""
+ try:
+ log.info('Kafka receiver started')
+ print('Kafka receiver started')
+ while not receiver.stop:
+ for msg in receiver.consumer:
+ receiver.events.append(json.loads(msg.value))
+ time.sleep(0.1)
+ log.info('Kafka receiver ended')
+ print('Kafka receiver ended')
+ except Exception as error:
+ log.info('Kafka receiver ended unexpectedly: %s', str(error))
+ print('Kafka receiver ended unexpectedly: ' + str(error))
+
+
+def create_kafka_receiver_thread(topic, security_type='PLAINTEXT'):
+ """create kafka receiver and thread"""
+ receiver = KafkaReceiver(topic, security_type)
+ task = threading.Thread(target=kafka_receiver_thread_runner, args=(receiver,))
+ task.daemon = True
+ return task, receiver
+
+def stop_kafka_receiver(receiver, task):
+ """stop the receiver thread and wait for it to finis"""
+ receiver.stop = True
+ task.join(1)
+ try:
+ receiver.consumer.unsubscribe()
+ receiver.consumer.close()
+ except Exception as error:
+ log.info('failed to gracefuly stop Kafka receiver: %s', str(error))
+
+
+def get_ip():
+ return 'localhost'
+
+
+def get_ip_http():
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ try:
+ # address should not be reachable
+ s.connect(('10.255.255.255', 1))
+ ip = s.getsockname()[0]
+ finally:
+ s.close()
+ return ip
+
+
+def connection():
+ hostname = get_config_host()
+ port_no = get_config_port()
+ vstart_access_key = get_access_key()
+ vstart_secret_key = get_secret_key()
+
+ conn = S3Connection(aws_access_key_id=vstart_access_key,
+ aws_secret_access_key=vstart_secret_key,
+ is_secure=False, port=port_no, host=hostname,
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+
+ return conn
+
+
+def connection2():
+ hostname = get_config_host()
+ port_no = 8001
+ vstart_access_key = get_access_key()
+ vstart_secret_key = get_secret_key()
+
+ conn = S3Connection(aws_access_key_id=vstart_access_key,
+ aws_secret_access_key=vstart_secret_key,
+ is_secure=False, port=port_no, host=hostname,
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+
+ return conn
+
+
+def another_user(tenant=None):
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = 'superman' + str(time.time())
+ if tenant:
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ else:
+ _, result = admin(['user', 'create', '--uid', uid, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+
+ assert_equal(result, 0)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ return conn
+
+##############
+# bucket notifications tests
+##############
+
+
+@attr('basic_test')
+def test_ps_s3_topic_on_master():
+ """ test s3 topics set/get/delete on master """
+ tenant = 'kaboom'
+ conn = another_user(tenant)
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topics
+ endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ # clean all topics
+ try:
+ result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
+ topics = []
+ if result is not None:
+ topics = result['member']
+ for topic in topics:
+ topic_conf1.del_config(topic_arn=topic['TopicArn'])
+ except Exception as err:
+ print('failed to do topic cleanup: ' + str(err))
+
+ topic_arn = topic_conf1.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+
+ endpoint_address = 'http://127.0.0.1:9001'
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf2.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+ endpoint_address = 'http://127.0.0.1:9002'
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf3.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+
+ # get topic 3
+ result, status = topic_conf3.get_config()
+ assert_equal(status, 200)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+ assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+
+ # Note that endpoint args may be ordered differently in the result
+ # delete topic 1
+ result = topic_conf1.del_config()
+ assert_equal(status, 200)
+
+ # try to get a deleted topic
+ _, status = topic_conf1.get_config()
+ assert_equal(status, 404)
+
+ # get the remaining 2 topics
+ result, status = topic_conf1.get_list()
+ assert_equal(status, 200)
+ assert_equal(len(result['ListTopicsResponse']['ListTopicsResult']['Topics']['member']), 2)
+
+ # delete topics
+ result = topic_conf2.del_config()
+ assert_equal(status, 200)
+ result = topic_conf3.del_config()
+ assert_equal(status, 200)
+
+ # get topic list, make sure it is empty
+ result, status = topic_conf1.get_list()
+ assert_equal(result['ListTopicsResponse']['ListTopicsResult']['Topics'], None)
+
+
+@attr('basic_test')
+def test_ps_s3_topic_admin_on_master():
+ """ test s3 topics set/get/delete on master """
+ tenant = 'kaboom'
+ conn = another_user(tenant)
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topics
+ endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ # clean all topics
+ try:
+ result = topic_conf1.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
+ topics = []
+ if result is not None:
+ topics = result['member']
+ for topic in topics:
+ topic_conf1.del_config(topic_arn=topic['TopicArn'])
+ except Exception as err:
+ print('failed to do topic cleanup: ' + str(err))
+
+ topic_arn1 = topic_conf1.set_config()
+ assert_equal(topic_arn1,
+ 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_1')
+
+ endpoint_address = 'http://127.0.0.1:9001'
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ assert_equal(topic_arn2,
+ 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
+ endpoint_address = 'http://127.0.0.1:9002'
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf3.set_config()
+ assert_equal(topic_arn3,
+ 'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
+
+ # get topic 3 via commandline
+ result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['arn'], topic_arn3)
+
+ # delete topic 3
+ _, result = admin(['topic', 'rm', '--topic', topic_name+'_3', '--tenant', tenant])
+ assert_equal(result, 0)
+
+ # try to get a deleted topic
+ _, result = admin(['topic', 'get', '--topic', topic_name+'_3', '--tenant', tenant])
+ print('"topic not found" error is expected')
+ assert_equal(result, 2)
+
+ # get the remaining 2 topics
+ result = admin(['topic', 'list', '--tenant', tenant])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['topics']), 2)
+
+ # delete topics
+ _, result = admin(['topic', 'rm', '--topic', topic_name+'_1', '--tenant', tenant])
+ assert_equal(result, 0)
+ _, result = admin(['topic', 'rm', '--topic', topic_name+'_2', '--tenant', tenant])
+ assert_equal(result, 0)
+
+ # get topic list, make sure it is empty
+ result = admin(['topic', 'list', '--tenant', tenant])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['topics']), 0)
+
+
+@attr('basic_test')
+def test_ps_s3_notification_configuration_admin_on_master():
+ """ test s3 notification list/get/delete on master """
+ conn = connection()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topics
+ endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ # clean all topics
+ try:
+ result = topic_conf.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
+ topics = []
+ if result is not None:
+ topics = result['member']
+ for topic in topics:
+ topic_conf.del_config(topic_arn=topic['TopicArn'])
+ except Exception as err:
+ print('failed to do topic cleanup: ' + str(err))
+
+ topic_arn = topic_conf.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1')
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {'Id': notification_name+'_2',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ },
+ {'Id': notification_name+'_3',
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # list notification
+ result = admin(['notification', 'list', '--bucket', bucket_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 3)
+ assert_equal(result[1], 0)
+
+ # get notification 1
+ result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Id'], notification_name+'_1')
+ assert_equal(result[1], 0)
+
+ # remove notification 3
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'])
+ assert_equal(result, 0)
+
+ # list notification
+ result = admin(['notification', 'list', '--bucket', bucket_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 2)
+ assert_equal(result[1], 0)
+
+ # delete notifications
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name])
+ assert_equal(result, 0)
+
+ # list notification, make sure it is empty
+ result = admin(['notification', 'list', '--bucket', bucket_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 0)
+ assert_equal(result[1], 0)
+
+
+@attr('modification_required')
+def test_ps_s3_topic_with_secret_on_master():
+ """ test s3 topics with secret set/get/delete on master """
+ return SkipTest('secure connection is needed to test topic with secrets')
+
+ conn = connection1()
+ if conn.secure_conn is None:
+ return SkipTest('secure connection is needed to test topic with secrets')
+
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # clean all topics
+ delete_all_s3_topics(conn, zonegroup)
+
+ # create s3 topics
+ endpoint_address = 'amqp://user:password@127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ bad_topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ try:
+ result = bad_topic_conf.set_config()
+ except Exception as err:
+ print('Error is expected: ' + str(err))
+ else:
+ assert False, 'user password configuration set allowed only over HTTPS'
+ topic_conf = PSTopicS3(conn.secure_conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup + ':' + get_tenant() + ':' + topic_name)
+
+ _, status = bad_topic_conf.get_config()
+ assert_equal(status/100, 4)
+
+ # get topic
+ result, status = topic_conf.get_config()
+ assert_equal(status, 200)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+ assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+
+ _, status = bad_topic_conf.get_config()
+ assert_equal(status/100, 4)
+
+ _, status = topic_conf.get_list()
+ assert_equal(status/100, 2)
+
+ # delete topics
+ result = topic_conf.del_config()
+
+
+@attr('basic_test')
+def test_ps_s3_notification_on_master():
+ """ test s3 notification set/get/delete on master """
+ conn = connection()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ # create bucket
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {'Id': notification_name+'_2',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ },
+ {'Id': notification_name+'_3',
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # get notifications on a bucket
+ response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
+ assert_equal(status/100, 2)
+ assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+
+ # delete specific notifications
+ _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
+ assert_equal(status/100, 2)
+
+ # get the remaining 2 notifications on a bucket
+ response, status = s3_notification_conf.get_config()
+ assert_equal(status/100, 2)
+ assert_equal(len(response['TopicConfigurations']), 2)
+ assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
+ assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
+
+ # delete remaining notifications
+ _, status = s3_notification_conf.del_config()
+ assert_equal(status/100, 2)
+
+ # make sure that the notifications are now deleted
+ _, status = s3_notification_conf.get_config()
+
+ # cleanup
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('basic_test')
+def test_ps_s3_notification_on_master_empty_config():
+ """ test s3 notification set/get/delete on master with empty config """
+ hostname = get_ip()
+
+ conn = connection()
+
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # get notifications on a bucket
+ response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
+ assert_equal(status/100, 2)
+ assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+
+ # create s3 notification again with empty configuration to check if it deletes or not
+ topic_conf_list = []
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # make sure that the notification is now deleted
+ response, status = s3_notification_conf.get_config()
+ try:
+ check = response['NotificationConfiguration']
+ except KeyError as e:
+ assert_equal(status/100, 2)
+ else:
+ assert False
+
+ # cleanup
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_notification_filter_on_master():
+ """ test s3 notification filter on master """
+
+ hostname = get_ip()
+
+ conn = connection()
+ ps_zone = conn
+
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*'],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
+ }
+ }
+ },
+ {'Id': notification_name+'_2',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*'],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
+ {'Name': 'suffix', 'Value': 'log'}]
+ }
+ }
+ },
+ {'Id': notification_name+'_3',
+ 'TopicArn': topic_arn,
+ 'Events': [],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
+ }
+ }
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ result, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ topic_conf_list = [{'Id': notification_name+'_4',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Metadata': {
+ 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
+ {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
+ },
+ 'Key': {
+ 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
+ }
+ }
+ }]
+
+ try:
+ s3_notification_conf4 = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf4.set_config()
+ assert_equal(status/100, 2)
+ skip_notif4 = False
+ except Exception as error:
+ print('note: metadata filter is not supported by boto3 - skipping test')
+ skip_notif4 = True
+
+
+ # get all notifications
+ result, status = s3_notification_conf.get_config()
+ assert_equal(status/100, 2)
+ for conf in result['TopicConfigurations']:
+ filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
+ assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
+
+ if not skip_notif4:
+ result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
+ assert_equal(status/100, 2)
+ filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
+ assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
+
+ expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
+ expected_in2 = ['world1.log', 'world2log', 'world3.log']
+ expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
+ expected_in4 = ['foo', 'bar', 'hello', 'world']
+ filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
+ filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
+ # create objects in bucket
+ for key_name in expected_in1:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ for key_name in expected_in2:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ for key_name in expected_in3:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ if not skip_notif4:
+ for key_name in expected_in4:
+ key = bucket.new_key(key_name)
+ key.set_metadata('foo', 'bar')
+ key.set_metadata('hello', 'world')
+ key.set_metadata('goodbye', 'cruel world')
+ key.set_contents_from_string('bar')
+ for key_name in filtered:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ for key_name in filtered_with_attr:
+ key.set_metadata('foo', 'nobar')
+ key.set_metadata('hello', 'noworld')
+ key.set_metadata('goodbye', 'cruel world')
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ found_in1 = []
+ found_in2 = []
+ found_in3 = []
+ found_in4 = []
+
+ for event in receiver.get_and_reset_events():
+ notif_id = event['Records'][0]['s3']['configurationId']
+ key_name = event['Records'][0]['s3']['object']['key']
+ awsRegion = event['Records'][0]['awsRegion']
+ assert_equal(awsRegion, zonegroup)
+ bucket_arn = event['Records'][0]['s3']['bucket']['arn']
+ assert_equal(bucket_arn, "arn:aws:s3:"+awsRegion+"::"+bucket_name)
+ if notif_id == notification_name+'_1':
+ found_in1.append(key_name)
+ elif notif_id == notification_name+'_2':
+ found_in2.append(key_name)
+ elif notif_id == notification_name+'_3':
+ found_in3.append(key_name)
+ elif not skip_notif4 and notif_id == notification_name+'_4':
+ found_in4.append(key_name)
+ else:
+ assert False, 'invalid notification: ' + notif_id
+
+ assert_equal(set(found_in1), set(expected_in1))
+ assert_equal(set(found_in2), set(expected_in2))
+ assert_equal(set(found_in3), set(expected_in3))
+ if not skip_notif4:
+ assert_equal(set(found_in4), set(expected_in4))
+
+ # cleanup
+ s3_notification_conf.del_config()
+ if not skip_notif4:
+ s3_notification_conf4.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ for key in bucket.list():
+ key.delete()
+ conn.delete_bucket(bucket_name)
+ stop_amqp_receiver(receiver, task)
+
+
+@attr('basic_test')
+def test_ps_s3_notification_errors_on_master():
+ """ test s3 notification set/get/delete on master """
+ conn = connection()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ # create bucket
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification with invalid event name
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Kaboom']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ try:
+ result, status = s3_notification_conf.set_config()
+ except Exception as error:
+ print(str(error) + ' - is expected')
+ else:
+ assert False, 'invalid event name is expected to fail'
+
+ # create s3 notification with missing name
+ topic_conf_list = [{'Id': '',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print(str(error) + ' - is expected')
+ else:
+ assert False, 'missing notification name is expected to fail'
+
+ # create s3 notification with invalid topic ARN
+ invalid_topic_arn = 'kaboom'
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': invalid_topic_arn,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print(str(error) + ' - is expected')
+ else:
+ assert False, 'invalid ARN is expected to fail'
+
+ # create s3 notification with unknown topic ARN
+ invalid_topic_arn = 'arn:aws:sns:a::kaboom'
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': invalid_topic_arn ,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print(str(error) + ' - is expected')
+ else:
+ assert False, 'unknown topic is expected to fail'
+
+ # create s3 notification with wrong bucket
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, 'kaboom', topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print(str(error) + ' - is expected')
+ else:
+ assert False, 'unknown bucket is expected to fail'
+
+ topic_conf.del_config()
+
+ status = topic_conf.del_config()
+ # deleting an unknown notification is not considered an error
+ assert_equal(status, 200)
+
+ _, status = topic_conf.get_config()
+ assert_equal(status, 404)
+
+ # cleanup
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+@attr('basic_test')
+def test_ps_s3_notification_permissions():
+ """ test s3 notification set/get/delete permissions """
+ conn1 = connection()
+ conn2 = another_user()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ # create bucket
+ bucket = conn1.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # one user create a notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf1.set_config()
+ assert_equal(status, 200)
+ # another user try to fetch it
+ s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf2.get_config()
+ assert False, "'AccessDenied' error is expected"
+ except ClientError as error:
+ assert_equal(error.response['Error']['Code'], 'AccessDenied')
+ # other user try to delete the notification
+ _, status = s3_notification_conf2.del_config()
+ assert_equal(status, 403)
+
+ # bucket policy is added by the 1st user
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn1.host+':'+str(conn1.port),
+ aws_access_key_id=conn1.aws_access_key_id,
+ aws_secret_access_key=conn1.aws_secret_access_key)
+ bucket_policy = json.dumps({
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Sid": "Statement",
+ "Effect": "Allow",
+ "Principal": "*",
+ "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"],
+ "Resource": f"arn:aws:s3:::{bucket_name}"
+ }
+ ]
+ })
+ response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
+ assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2)
+ result = client.get_bucket_policy(Bucket=bucket_name)
+ print(result['Policy'])
+
+ # 2nd user try to fetch it again
+ _, status = s3_notification_conf2.get_config()
+ assert_equal(status, 200)
+
+ # 2nd user try to delete it again
+ result, status = s3_notification_conf2.del_config()
+ assert_equal(status, 200)
+
+ # 2nd user try to add another notification
+ topic_conf_list = [{'Id': notification_name+"2",
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+ result, status = s3_notification_conf2.set_config()
+ assert_equal(status, 200)
+
+ # cleanup
+ s3_notification_conf1.del_config()
+ s3_notification_conf2.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn1.delete_bucket(bucket_name)
+
+@attr('amqp_test')
+def test_ps_s3_notification_push_amqp_on_master():
+ """ test pushing amqp s3 notification on master """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
+ task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
+ task1.start()
+ task2.start()
+
+ # create two s3 topic
+ endpoint_address = 'amqp://' + hostname
+ # with acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+ topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ },
+ {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 100
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ receiver1.verify_s3_events(keys, exact_match=True)
+ receiver2.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver 1 for deletions
+ receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+ # check amqp receiver 2 has no deletions
+ try:
+ receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
+ except:
+ pass
+ else:
+ err = 'amqp receiver 2 should have no deletions'
+ assert False, err
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ stop_amqp_receiver(receiver2, task2)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('manual_test')
+def test_ps_s3_notification_push_amqp_idleness_check():
+ """ test pushing amqp s3 notification and checking for connection idleness """
+ return SkipTest("only used in manual testing")
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
+ task1.start()
+
+ # create two s3 topic
+ endpoint_address = 'amqp://' + hostname
+ # with acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ receiver1.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver 1 for deletions
+ receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+
+ print('waiting for 40sec for checking idleness')
+ time.sleep(40)
+
+ os.system("netstat -nnp | grep 5672");
+
+ # do the process of uploading an object and checking for notification again
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ receiver1.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + amqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver 1 for deletions
+ receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+
+ os.system("netstat -nnp | grep 5672");
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('kafka_test')
+def test_ps_s3_notification_push_kafka_on_master():
+ """ test pushing kafka s3 notification on master """
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create consumer on the topic
+
+ try:
+ s3_notification_conf = None
+ topic_conf1 = None
+ topic_conf2 = None
+ receiver = None
+ task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'kafka://' + kafka_server
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none'
+ topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ },
+ {'Id': notification_name + '_2', 'TopicArn': topic_arn2,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ etags = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+ except Exception as e:
+ print(e)
+ assert False
+ finally:
+ # cleanup
+ if s3_notification_conf is not None:
+ s3_notification_conf.del_config()
+ if topic_conf1 is not None:
+ topic_conf1.del_config()
+ if topic_conf2 is not None:
+ topic_conf2.del_config()
+ # delete the bucket
+ for key in bucket.list():
+ key.delete()
+ conn.delete_bucket(bucket_name)
+ if receiver is not None:
+ stop_kafka_receiver(receiver, task)
+
+
+@attr('http_test')
+def test_ps_s3_notification_multi_delete_on_master():
+ """ test deletion of multiple keys on master """
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ objects_size = {}
+ for i in range(number_of_objects):
+ content = str(os.urandom(randint(1, 1024)))
+ object_size = len(content)
+ key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ keys = list(bucket.list())
+
+ start_time = time.time()
+ delete_all_objects(conn, bucket_name)
+ time_diff = time.time() - start_time
+ print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
+
+ # cleanup
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('http_test')
+def test_ps_s3_notification_push_http_on_master():
+ """ test pushing http s3 notification on master """
+ hostname = get_ip_http()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ objects_size = {}
+ start_time = time.time()
+ for i in range(number_of_objects):
+ content = str(os.urandom(randint(1, 1024)))
+ object_size = len(content)
+ key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
+
+ # cleanup
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('http_test')
+def test_ps_s3_notification_push_cloudevents_on_master():
+ """ test pushing cloudevents notification on master """
+ hostname = get_ip_http()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects, cloudevents=True)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&cloudevents=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ objects_size = {}
+ start_time = time.time()
+ for i in range(number_of_objects):
+ content = str(os.urandom(randint(1, 1024)))
+ object_size = len(content)
+ key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
+
+ # cleanup
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('http_test')
+def test_ps_s3_opaque_data_on_master():
+ """ test that opaque id set in topic, is sent in notification on master """
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ start_time = time.time()
+ content = 'bar'
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+@attr('http_test')
+def test_ps_s3_lifecycle_on_master():
+ """ test that when object is deleted due to lifecycle policy, notification is sent on master """
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectLifecycle:Expiration:*']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ obj_prefix = 'ooo'
+ client_threads = []
+ start_time = time.time()
+ content = 'bar'
+ for i in range(number_of_objects):
+ key = bucket.new_key(obj_prefix + str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # create lifecycle policy
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+ yesterday = datetime.date.today() - datetime.timedelta(days=1)
+ response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
+ LifecycleConfiguration={'Rules': [
+ {
+ 'ID': 'rule1',
+ 'Expiration': {'Date': yesterday.isoformat()},
+ 'Filter': {'Prefix': obj_prefix},
+ 'Status': 'Enabled',
+ }
+ ]
+ }
+ )
+
+ # start lifecycle processing
+ admin(['lc', 'process'])
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver does not have messages
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ event_keys = []
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current')
+ event_keys.append(event['Records'][0]['s3']['object']['key'])
+ for key in keys:
+ key_found = False
+ for event_key in event_keys:
+ if event_key == key:
+ key_found = True
+ break
+ if not key_found:
+ err = 'no lifecycle event found for key: ' + str(key)
+ log.error(events)
+ assert False, err
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
+ """ test object creation s3 notifications in using put/copy/post on master"""
+
+ if not external_endpoint_address:
+ hostname = 'localhost'
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ else:
+ proc = None
+
+ conn = connection()
+ hostname = 'localhost'
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name, external_endpoint_address, ca_location)
+ task.start()
+
+ # create s3 topic
+ if external_endpoint_address:
+ endpoint_address = external_endpoint_address
+ elif ca_location:
+ endpoint_address = 'amqps://' + hostname
+ else:
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker&verify-ssl='+verify_ssl
+ if ca_location:
+ endpoint_args += '&ca-location={}'.format(ca_location)
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:CompleteMultipartUpload']
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ objects_size = {}
+ # create objects in the bucket using PUT
+ content = str(os.urandom(randint(1, 1024)))
+ key_name = 'put'
+ key = bucket.new_key(key_name)
+ objects_size[key_name] = len(content)
+ key.set_contents_from_string(content)
+ # create objects in the bucket using COPY
+ key_name = 'copy'
+ bucket.copy_key(key_name, bucket.name, key.name)
+ objects_size[key_name] = len(content)
+
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.NamedTemporaryFile(mode='w+b')
+ content = bytearray(os.urandom(10*1024*1024))
+ key_name = 'multipart'
+ objects_size[key_name] = len(content)
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
+ uploader = bucket.initiate_multipart_upload(key_name)
+ uploader.upload_part_from_file(fp, 1)
+ uploader.complete_upload()
+ fp.close()
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, expected_sizes=objects_size)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ for key in bucket.list():
+ key.delete()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ if proc:
+ clean_rabbitmq(proc)
+
+
+@attr('amqp_test')
+def test_ps_s3_creation_triggers_on_master():
+ ps_s3_creation_triggers_on_master(external_endpoint_address="amqp://localhost:5672")
+
+
+@attr('amqp_ssl_test')
+def test_ps_s3_creation_triggers_on_master_external():
+
+ from distutils.util import strtobool
+
+ if 'AMQP_EXTERNAL_ENDPOINT' in os.environ:
+ try:
+ if strtobool(os.environ['AMQP_VERIFY_SSL']):
+ verify_ssl = 'true'
+ else:
+ verify_ssl = 'false'
+ except Exception as e:
+ verify_ssl = 'true'
+
+ ps_s3_creation_triggers_on_master(
+ external_endpoint_address=os.environ['AMQP_EXTERNAL_ENDPOINT'],
+ verify_ssl=verify_ssl)
+ else:
+ return SkipTest("Set AMQP_EXTERNAL_ENDPOINT to a valid external AMQP endpoint url for this test to run")
+
+
+def generate_private_key(tempdir):
+
+ import datetime
+ import stat
+ from cryptography import x509
+ from cryptography.x509.oid import NameOID
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import serialization
+ from cryptography.hazmat.primitives.asymmetric import rsa
+
+ # modify permissions to ensure that the broker user can access them
+ os.chmod(tempdir, mode=stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+ CACERTFILE = os.path.join(tempdir, 'ca_certificate.pem')
+ CERTFILE = os.path.join(tempdir, 'server_certificate.pem')
+ KEYFILE = os.path.join(tempdir, 'server_key.pem')
+
+ root_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ subject = issuer = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ x509.NameAttribute(NameOID.COMMON_NAME, u"RFI CA"),
+ ])
+ root_cert = x509.CertificateBuilder().subject_name(
+ subject
+ ).issuer_name(
+ issuer
+ ).public_key(
+ root_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=3650)
+ ).add_extension(
+ x509.BasicConstraints(ca=True, path_length=None), critical=True
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ with open(CACERTFILE, "wb") as f:
+ f.write(root_cert.public_bytes(serialization.Encoding.PEM))
+
+ # Now we want to generate a cert from that root
+ cert_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend(),
+ )
+ with open(KEYFILE, "wb") as f:
+ f.write(cert_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=serialization.NoEncryption(),
+ ))
+ new_subject = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"UK"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Oxfordshire"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Harwell"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Rosalind Franklin Institute"),
+ ])
+ cert = x509.CertificateBuilder().subject_name(
+ new_subject
+ ).issuer_name(
+ root_cert.issuer
+ ).public_key(
+ cert_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=30)
+ ).add_extension(
+ x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
+ critical=False,
+ ).sign(root_key, hashes.SHA256(), default_backend())
+ # Write our certificate out to disk.
+ with open(CERTFILE, "wb") as f:
+ f.write(cert.public_bytes(serialization.Encoding.PEM))
+
+ print("\n\n********private key generated********")
+ print(CACERTFILE, CERTFILE, KEYFILE)
+ print("\n\n")
+ return CACERTFILE, CERTFILE, KEYFILE
+
+
+@attr('amqp_ssl_test')
+def test_ps_s3_creation_triggers_on_master_ssl():
+
+ import textwrap
+ from tempfile import TemporaryDirectory
+
+ with TemporaryDirectory() as tempdir:
+ CACERTFILE, CERTFILE, KEYFILE = generate_private_key(tempdir)
+ RABBITMQ_CONF_FILE = os.path.join(tempdir, 'rabbitmq.config')
+ with open(RABBITMQ_CONF_FILE, "w") as f:
+ # use the old style config format to ensure it also runs on older RabbitMQ versions.
+ f.write(textwrap.dedent(f'''
+ [
+ {{rabbit, [
+ {{ssl_listeners, [5671]}},
+ {{ssl_options, [{{cacertfile, "{CACERTFILE}"}},
+ {{certfile, "{CERTFILE}"}},
+ {{keyfile, "{KEYFILE}"}},
+ {{verify, verify_peer}},
+ {{fail_if_no_peer_cert, false}}]}}]}}
+ ].
+ '''))
+ os.environ['RABBITMQ_CONFIG_FILE'] = os.path.splitext(RABBITMQ_CONF_FILE)[0]
+
+ ps_s3_creation_triggers_on_master(ca_location=CACERTFILE)
+
+ del os.environ['RABBITMQ_CONFIG_FILE']
+
+
+@attr('amqp_test')
+def test_http_post_object_upload():
+ """ test that uploads object using HTTP POST """
+
+ import boto3
+ from collections import OrderedDict
+ import requests
+
+ hostname = get_ip()
+ zonegroup = 'default'
+ conn = connection()
+
+ endpoint = "http://%s:%d" % (get_config_host(), get_config_port())
+
+ conn1 = boto3.client(service_name='s3',
+ aws_access_key_id=get_access_key(),
+ aws_secret_access_key=get_secret_key(),
+ endpoint_url=endpoint,
+ )
+
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ key_name = 'foo.txt'
+
+ resp = conn1.generate_presigned_post(Bucket=bucket_name, Key=key_name,)
+
+ url = resp['url']
+
+ bucket = conn1.create_bucket(ACL='public-read-write', Bucket=bucket_name)
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
+ task1.start()
+
+ # create s3 topics
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+
+ # create s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:Post']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ payload = OrderedDict([("key" , "foo.txt"),("acl" , "public-read"),\
+ ("Content-Type" , "text/plain"),('file', ('bar'))])
+
+ # POST upload
+ r = requests.post(url, files=payload, verify=True)
+ assert_equal(r.status_code, 204)
+
+ # check amqp receiver
+ events = receiver1.get_and_reset_events()
+ assert_equal(len(events), 1)
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ conn1.delete_object(Bucket=bucket_name, Key=key_name)
+ # delete the bucket
+ conn1.delete_bucket(Bucket=bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_multipart_on_master():
+ """ test multipart object upload on master"""
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name+'_1')
+ task1.start()
+ task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name+'_2')
+ task2.start()
+ task3, receiver3 = create_amqp_receiver_thread(exchange, topic_name+'_3')
+ task3.start()
+
+ # create s3 topics
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf3.set_config()
+
+ # create s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:Post']
+ },
+ {'Id': notification_name+'_3', 'TopicArn': topic_arn3,
+ 'Events': ['s3:ObjectCreated:CompleteMultipartUpload']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.NamedTemporaryFile(mode='w+b')
+ object_size = 1024
+ content = bytearray(os.urandom(object_size))
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
+ uploader = bucket.initiate_multipart_upload('multipart')
+ uploader.upload_part_from_file(fp, 1)
+ uploader.complete_upload()
+ fp.close()
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ events = receiver1.get_and_reset_events()
+ assert_equal(len(events), 1)
+
+ events = receiver2.get_and_reset_events()
+ assert_equal(len(events), 0)
+
+ events = receiver3.get_and_reset_events()
+ assert_equal(len(events), 1)
+ assert_equal(events[0]['Records'][0]['eventName'], 'ObjectCreated:CompleteMultipartUpload')
+ assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
+ assert_equal(events[0]['Records'][0]['s3']['object']['size'], object_size)
+ assert events[0]['Records'][0]['eventTime'] != '0.000000', 'invalid eventTime'
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ stop_amqp_receiver(receiver2, task2)
+ stop_amqp_receiver(receiver3, task3)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ topic_conf3.del_config()
+ for key in bucket.list():
+ key.delete()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+@attr('amqp_test')
+def test_ps_s3_metadata_filter_on_master():
+ """ test s3 notification of metadata on master """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ meta_key = 'meta1'
+ meta_value = 'This is my metadata value'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Metadata': {
+ 'FilterRules': [{'Name': META_PREFIX+meta_key, 'Value': meta_value}]
+ }
+ }
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ expected_keys = []
+ # create objects in the bucket
+ key_name = 'foo'
+ key = bucket.new_key(key_name)
+ key.set_metadata(meta_key, meta_value)
+ key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ expected_keys.append(key_name)
+
+ # create objects in the bucket using COPY
+ key_name = 'copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name)
+ expected_keys.append(key_name)
+
+ # create another objects in the bucket using COPY
+ # but override the metadata value
+ key_name = 'another_copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
+ # this key is not in the expected keys due to the different meta value
+
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.NamedTemporaryFile(mode='w+b')
+ chunk_size = 1024*1024*5 # 5MB
+ object_size = 10*chunk_size
+ content = bytearray(os.urandom(object_size))
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
+ key_name = 'multipart_foo'
+ uploader = bucket.initiate_multipart_upload(key_name,
+ metadata={meta_key: meta_value})
+ for i in range(1,5):
+ uploader.upload_part_from_file(fp, i, size=chunk_size)
+ fp.seek(i*chunk_size)
+ uploader.complete_upload()
+ fp.close()
+ expected_keys.append(key_name)
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), len(expected_keys))
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+ # delete objects
+ for key in bucket.list():
+ key.delete()
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), len(expected_keys))
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_metadata_on_master():
+ """ test s3 notification of metadata on master """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ meta_key = 'meta1'
+ meta_value = 'This is my metadata value'
+ meta_prefix = META_PREFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key_name = 'foo'
+ key = bucket.new_key(key_name)
+ key.set_metadata(meta_key, meta_value)
+ key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ # update the object
+ another_meta_key = 'meta2'
+ key.set_metadata(another_meta_key, meta_value)
+ key.set_contents_from_string('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
+
+ # create objects in the bucket using COPY
+ key_name = 'copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name)
+
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.NamedTemporaryFile(mode='w+b')
+ chunk_size = 1024*1024*5 # 5MB
+ object_size = 10*chunk_size
+ content = bytearray(os.urandom(object_size))
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
+ key_name = 'multipart_foo'
+ uploader = bucket.initiate_multipart_upload(key_name,
+ metadata={meta_key: meta_value})
+ for i in range(1,5):
+ uploader.upload_part_from_file(fp, i, size=chunk_size)
+ fp.seek(i*chunk_size)
+ uploader.complete_upload()
+ fp.close()
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ for event in events:
+ value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
+ assert_equal(value[0], meta_value)
+
+ # delete objects
+ for key in bucket.list():
+ key.delete()
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ for event in events:
+ value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
+ assert_equal(value[0], meta_value)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_tags_on_master():
+ """ test s3 notification of tags on master """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Tags': {
+ 'FilterRules': [{'Name': 'hello', 'Value': 'world'}, {'Name': 'ka', 'Value': 'boom'}]
+ }
+ }
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ expected_keys = []
+ # create objects in the bucket with tags
+ # key 1 has all the tags in the filter
+ tags = 'hello=world&ka=boom&hello=helloworld'
+ key_name1 = 'key1'
+ put_object_tagging(conn, bucket_name, key_name1, tags)
+ expected_keys.append(key_name1)
+ # key 2 has an additional tag not in the filter
+ tags = 'hello=world&foo=bar&ka=boom&hello=helloworld'
+ key_name = 'key2'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ expected_keys.append(key_name)
+ # key 3 has no tags
+ key_name3 = 'key3'
+ key = bucket.new_key(key_name3)
+ key.set_contents_from_string('bar')
+ # key 4 has the wrong of the multi value tags
+ tags = 'hello=helloworld&ka=boom'
+ key_name = 'key4'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ # key 5 has the right of the multi value tags
+ tags = 'hello=world&ka=boom'
+ key_name = 'key5'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ expected_keys.append(key_name)
+ # key 6 is missing a tag
+ tags = 'hello=world'
+ key_name = 'key6'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ # create objects in the bucket using COPY
+ key_name = 'copy_of_'+key_name1
+ bucket.copy_key(key_name, bucket.name, key_name1)
+ expected_keys.append(key_name)
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ event_count = 0
+ expected_tags1 = [{'key': 'hello', 'val': 'world'}, {'key': 'hello', 'val': 'helloworld'}, {'key': 'ka', 'val': 'boom'}]
+ expected_tags1 = sorted(expected_tags1, key=lambda k: k['key']+k['val'])
+ for event in receiver.get_and_reset_events():
+ key = event['Records'][0]['s3']['object']['key']
+ if (key == key_name1):
+ obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
+ assert_equal(obj_tags, expected_tags1)
+ event_count += 1
+ assert(key in expected_keys)
+
+ assert_equal(event_count, len(expected_keys))
+
+ # delete the objects
+ for key in bucket.list():
+ key.delete()
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ event_count = 0
+ # check amqp receiver
+ for event in receiver.get_and_reset_events():
+ key = event['Records'][0]['s3']['object']['key']
+ if (key == key_name1):
+ obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
+ assert_equal(obj_tags, expected_tags1)
+ event_count += 1
+ assert(key in expected_keys)
+
+ assert(event_count == len(expected_keys))
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+@attr('amqp_test')
+def test_ps_s3_versioning_on_master():
+ """ test s3 notification of object versions """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ bucket.configure_versioning(True)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key_name = 'foo'
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('hello')
+ ver1 = key.version_id
+ key.set_contents_from_string('world')
+ ver2 = key.version_id
+ copy_of_key = bucket.copy_key('copy_of_foo', bucket.name, key_name, src_version_id=ver1)
+ ver3 = copy_of_key.version_id
+ versions = [ver1, ver2, ver3]
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ num_of_versions = 0
+ for event_list in events:
+ for event in event_list['Records']:
+ assert event['s3']['object']['key'] in (key_name, copy_of_key.name)
+ version = event['s3']['object']['versionId']
+ num_of_versions += 1
+ if version not in versions:
+ print('version mismatch: '+version+' not in: '+str(versions))
+ # TODO: copy_key() does not return the version of the copied object
+ #assert False
+ else:
+ print('version ok: '+version+' in: '+str(versions))
+
+ assert_equal(num_of_versions, 3)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ bucket.delete_key(copy_of_key, version_id=ver3)
+ bucket.delete_key(key.name, version_id=ver2)
+ bucket.delete_key(key.name, version_id=ver1)
+ #conn.delete_bucket(bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_versioned_deletion_on_master():
+ """ test s3 notification of deletion markers on master """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ bucket.configure_versioning(True)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ },
+ {'Id': notification_name+'_2', 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:DeleteMarkerCreated']
+ },
+ {'Id': notification_name+'_3', 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:Delete']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key = bucket.new_key('foo')
+ content = str(os.urandom(512))
+ size1 = len(content)
+ key.set_contents_from_string(content)
+ ver1 = key.version_id
+ content = str(os.urandom(511))
+ size2 = len(content)
+ key.set_contents_from_string(content)
+ ver2 = key.version_id
+ # create delete marker (non versioned deletion)
+ delete_marker_key = bucket.delete_key(key.name)
+ versions = [ver1, ver2, delete_marker_key.version_id]
+
+ time.sleep(1)
+
+ # versioned deletion
+ bucket.delete_key(key.name, version_id=ver2)
+ bucket.delete_key(key.name, version_id=ver1)
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ delete_events = 0
+ delete_marker_create_events = 0
+ for event_list in events:
+ for event in event_list['Records']:
+ version = event['s3']['object']['versionId']
+ size = event['s3']['object']['size']
+ if version not in versions:
+ print('version mismatch: '+version+' not in: '+str(versions))
+ assert False
+ else:
+ print('version ok: '+version+' in: '+str(versions))
+ if event['eventName'] == 'ObjectRemoved:Delete':
+ delete_events += 1
+ assert size in [size1, size2]
+ assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
+ if event['eventName'] == 'ObjectRemoved:DeleteMarkerCreated':
+ delete_marker_create_events += 1
+ assert size == size2
+ assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
+
+ # 2 key versions were deleted
+ # notified over the same topic via 2 notifications (1,3)
+ assert_equal(delete_events, 2*2)
+ # 1 deletion marker was created
+ # notified over the same topic over 2 notifications (1,2)
+ assert_equal(delete_marker_create_events, 1*2)
+
+ # cleanup
+ delete_marker_key.delete()
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('manual_test')
+def test_ps_s3_persistent_cleanup():
+ """ test reservation cleanup after gateway crash """
+ return SkipTest("only used in manual testing")
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 200
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ gw = conn
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = gw.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf = PSTopicS3(gw, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(gw, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ # stop gateway while clients are sending
+ os.system("killall -9 radosgw");
+ print('wait for 10 sec for before restarting the gateway')
+ time.sleep(10)
+ # TODO: start the radosgw
+ [thr.join() for thr in client_threads]
+
+ keys = list(bucket.list())
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ # check http receiver
+ events = http_server.get_and_reset_events()
+
+ print(str(len(events) ) + " events found out of " + str(number_of_objects))
+
+ # make sure that things are working now
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ keys = list(bucket.list())
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ print('wait for 180 sec for reservations to be stale before queue deletion')
+ time.sleep(180)
+
+ # check http receiver
+ events = http_server.get_and_reset_events()
+
+ print(str(len(events)) + " events found out of " + str(number_of_objects))
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ gw.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('manual_test')
+def test_ps_s3_persistent_notification_pushback():
+ """ test pushing persistent notification pushback """
+ return SkipTest("only used in manual testing")
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, port, num_workers=10, delay=0.5)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ for j in range(100):
+ number_of_objects = randint(500, 1000)
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(j)+'-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ keys = list(bucket.list())
+
+ delay = 30
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ count = 0
+ for key in bucket.list():
+ count += 1
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ if count%100 == 0:
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ client_threads = []
+ start_time = time.time()
+
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ time.sleep(delay)
+ http_server.close()
+
+
+@attr('kafka_test')
+def test_ps_s3_notification_kafka_idle_behaviour():
+ """ test pushing kafka s3 notification idle behaviour check """
+ # TODO convert this test to actual running test by changing
+ # os.system call to verify the process idleness
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create consumer on the topic
+
+ task, receiver = create_kafka_receiver_thread(topic_name+'_1')
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'kafka://' + kafka_server
+ # with acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name + '_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ etags = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+
+ is_idle = False
+
+ while not is_idle:
+ print('waiting for 10sec for checking idleness')
+ time.sleep(10)
+ cmd = "netstat -nnp | grep 9092 | grep radosgw"
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+ out = proc.communicate()[0]
+ if len(out) == 0:
+ is_idle = True
+ else:
+ print("radosgw<->kafka connection is not idle")
+ print(out.decode('utf-8'))
+
+ # do the process of uploading an object and checking for notification again
+ number_of_objects = 10
+ client_threads = []
+ etags = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ etag = hashlib.md5(content.encode()).hexdigest()
+ etags.append(etag)
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True, etags=etags)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True, etags=etags)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
+
+
+@attr('modification_required')
+def test_ps_s3_persistent_gateways_recovery():
+ """ test gateway recovery of persistent notifications """
+ return SkipTest('This test requires two gateways.')
+
+ conn = connection()
+ zonegroup = 'default'
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ gw1 = conn
+ gw2 = connection2()
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = gw1.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create two s3 topics
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw1')
+ topic_arn1 = topic_conf1.set_config()
+ topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData=fromgw2')
+ topic_arn2 = topic_conf2.set_config()
+ # create two s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf1.set_config()
+ assert_equal(status/100, 2)
+ notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectRemoved:Delete']
+ }]
+ s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf2.set_config()
+ assert_equal(status/100, 2)
+ # stop gateway 2
+ print('stopping gateway2...')
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ keys = list(bucket.list())
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ print('wait for 60 sec for before restarting the gateway')
+ time.sleep(60)
+ # check http receiver
+ events = http_server.get_and_reset_events()
+ for key in keys:
+ creations = 0
+ deletions = 0
+ for event in events:
+ if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
+ key.name == event['Records'][0]['s3']['object']['key']:
+ creations += 1
+ elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
+ key.name == event['Records'][0]['s3']['object']['key']:
+ deletions += 1
+ assert_equal(creations, 1)
+ assert_equal(deletions, 1)
+ # cleanup
+ s3_notification_conf1.del_config()
+ topic_conf1.del_config()
+ gw1.delete_bucket(bucket_name)
+ time.sleep(10)
+ s3_notification_conf2.del_config()
+ topic_conf2.del_config()
+ http_server.close()
+
+
+@attr('modification_required')
+def test_ps_s3_persistent_multiple_gateways():
+ """ test pushing persistent notification via two gateways """
+ return SkipTest('This test requires two gateways.')
+
+ conn = connection()
+ zonegroup = 'default'
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+ gw1 = conn
+ gw2 = connection2()
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket1 = gw1.create_bucket(bucket_name)
+ bucket2 = gw2.get_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create two s3 topics
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic1_opaque = 'fromgw1'
+ topic_conf1 = PSTopicS3(gw1, topic_name+'_1', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic1_opaque)
+ topic_arn1 = topic_conf1.set_config()
+ topic2_opaque = 'fromgw2'
+ topic_conf2 = PSTopicS3(gw2, topic_name+'_2', zonegroup, endpoint_args=endpoint_args+'&OpaqueData='+topic2_opaque)
+ topic_arn2 = topic_conf2.set_config()
+ # create two s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+ s3_notification_conf1 = PSNotificationS3(gw1, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf1.set_config()
+ assert_equal(status/100, 2)
+ notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
+ 'Events': []
+ }]
+ s3_notification_conf2 = PSNotificationS3(gw2, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf2.set_config()
+ assert_equal(status/100, 2)
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket1.new_key('gw1_'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ key = bucket2.new_key('gw2_'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ keys = list(bucket1.list())
+ delay = 30
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+ events = http_server.get_and_reset_events()
+ for key in keys:
+ topic1_count = 0
+ topic2_count = 0
+ for event in events:
+ if event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
+ key.name == event['Records'][0]['s3']['object']['key'] and \
+ topic1_opaque == event['Records'][0]['opaqueData']:
+ topic1_count += 1
+ elif event['Records'][0]['eventName'] == 'ObjectCreated:Put' and \
+ key.name == event['Records'][0]['s3']['object']['key'] and \
+ topic2_opaque == event['Records'][0]['opaqueData']:
+ topic2_count += 1
+ assert_equal(topic1_count, 1)
+ assert_equal(topic2_count, 1)
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket1.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+ events = http_server.get_and_reset_events()
+ for key in keys:
+ topic1_count = 0
+ topic2_count = 0
+ for event in events:
+ if event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
+ key.name == event['Records'][0]['s3']['object']['key'] and \
+ topic1_opaque == event['Records'][0]['opaqueData']:
+ topic1_count += 1
+ elif event['Records'][0]['eventName'] == 'ObjectRemoved:Delete' and \
+ key.name == event['Records'][0]['s3']['object']['key'] and \
+ topic2_opaque == event['Records'][0]['opaqueData']:
+ topic2_count += 1
+ assert_equal(topic1_count, 1)
+ assert_equal(topic2_count, 1)
+ # cleanup
+ s3_notification_conf1.del_config()
+ topic_conf1.del_config()
+ s3_notification_conf2.del_config()
+ topic_conf2.del_config()
+ gw1.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('http_test')
+def test_ps_s3_persistent_multiple_endpoints():
+ """ test pushing persistent notification when one of the endpoints has error """
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create two s3 topics
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf1 = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ endpoint_address = 'http://kaboom:9999'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf2 = PSTopicS3(conn, topic_name+'_2', zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+
+ # create two s3 notifications
+ notification_name = bucket_name + NOTIFICATION_SUFFIX+'_1'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+ s3_notification_conf1 = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf1.set_config()
+ assert_equal(status/100, 2)
+ notification_name = bucket_name + NOTIFICATION_SUFFIX+'_2'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn2,
+ 'Events': []
+ }]
+ s3_notification_conf2 = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf2.set_config()
+ assert_equal(status/100, 2)
+
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ keys = list(bucket.list())
+
+ delay = 30
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ http_server.verify_s3_events(keys, exact_match=False, deletions=False)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ http_server.verify_s3_events(keys, exact_match=False, deletions=True)
+
+ # cleanup
+ s3_notification_conf1.del_config()
+ topic_conf1.del_config()
+ s3_notification_conf2.del_config()
+ topic_conf2.del_config()
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+def persistent_notification(endpoint_type):
+ """ test pushing persistent notification """
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ receiver = {}
+ host = get_ip()
+ if endpoint_type == 'http':
+ # create random port for the http server
+ host = get_ip_http()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = StreamingHTTPServer(host, port, num_workers=10)
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ # the http server does not guarantee order, so duplicates are expected
+ exact_match = False
+ elif endpoint_type == 'amqp':
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
+ # amqp broker guarantee ordering
+ exact_match = True
+ elif endpoint_type == 'kafka':
+ # start amqp receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker'+'&persistent=true'
+ # amqp broker guarantee ordering
+ exact_match = True
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
+
+ # create s3 topic
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 100
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ keys = list(bucket.list())
+
+ delay = 40
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ if endpoint_type == 'http':
+ receiver.close()
+ else:
+ stop_amqp_receiver(receiver, task)
+
+
+@attr('http_test')
+def test_ps_s3_persistent_notification_http():
+ """ test pushing persistent notification http """
+ persistent_notification('http')
+
+
+@attr('amqp_test')
+def test_ps_s3_persistent_notification_amqp():
+ """ test pushing persistent notification amqp """
+ persistent_notification('amqp')
+
+
+@attr('kafka_test')
+def test_ps_s3_persistent_notification_kafka():
+ """ test pushing persistent notification kafka """
+ persistent_notification('kafka')
+
+
+def random_string(length):
+ import string
+ letters = string.ascii_letters
+ return ''.join(random.choice(letters) for i in range(length))
+
+
+@attr('amqp_test')
+def test_ps_s3_persistent_notification_large():
+ """ test pushing persistent notification of large notifications """
+
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ receiver = {}
+ host = get_ip()
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+ endpoint_address = 'amqp://' + host
+ opaque_data = random_string(1024*2)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data+'&amqp-exchange='+exchange+'&amqp-ack-level=broker'+'&persistent=true'
+ # amqp broker guarantee ordering
+ exact_match = True
+
+ # create s3 topic
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 100
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key_value = random_string(63)
+ key = bucket.new_key(key_value)
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ keys = list(bucket.list())
+
+ delay = 40
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ receiver.verify_s3_events(keys, exact_match=exact_match, deletions=True)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ stop_amqp_receiver(receiver, task)
+
+
+@attr('modification_required')
+def test_ps_s3_topic_update():
+ """ test updating topic associated with a notification"""
+ return SkipTest('This test is yet to be modified.')
+
+ conn = connection()
+ ps_zone = None
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name+TOPIC_SUFFIX
+ # create amqp topic
+ hostname = get_ip()
+ exchange = 'ex1'
+ amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ amqp_task.start()
+ #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
+
+ topic_arn = topic_conf.set_config()
+ #result, status = topic_conf.set_config()
+ #assert_equal(status/100, 2)
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
+ # get topic
+ result, _ = topic_conf.get_config()
+ # verify topic content
+ parsed_result = json.loads(result)
+ assert_equal(parsed_result['topic']['name'], topic_name)
+ assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
+ # create http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(hostname, port)
+ # create bucket on the first of the rados zones
+ bucket = conn.create_bucket(bucket_name)
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ # create objects in the bucket
+ number_of_objects = 10
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+ keys = list(bucket.list())
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False)
+ # update the same topic with new endpoint
+ #topic_conf = PSTopic(ps_zone.conn, topic_name,endpoint='http://'+ hostname + ':' + str(port))
+ topic_conf = PSTopicS3(conn, topic_name, endpoint_args='http://'+ hostname + ':' + str(port))
+ _, status = topic_conf.set_config()
+ assert_equal(status/100, 2)
+ # get topic
+ result, _ = topic_conf.get_config()
+ # verify topic content
+ parsed_result = json.loads(result)
+ assert_equal(parsed_result['topic']['name'], topic_name)
+ assert_equal(parsed_result['topic']['dest']['push_endpoint'], topic_conf.parameters['push-endpoint'])
+ # delete current objects and create new objects in the bucket
+ for key in bucket.list():
+ key.delete()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i+100))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ #zone_meta_checkpoint(ps_zone.zone)
+ #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+ keys = list(bucket.list())
+ # verify that notifications are still sent to amqp
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False)
+ # update notification to update the endpoint from the topic
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ # delete current objects and create new objects in the bucket
+ for key in bucket.list():
+ key.delete()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i+200))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ #zone_meta_checkpoint(ps_zone.zone)
+ #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+ keys = list(bucket.list())
+ # check that updates switched to http
+ # TODO: use exact match
+ http_server.verify_s3_events(keys, exact_match=False)
+ # cleanup
+ # delete objects from the bucket
+ stop_amqp_receiver(receiver, amqp_task)
+ for key in bucket.list():
+ key.delete()
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('modification_required')
+def test_ps_s3_notification_update():
+ """ test updating the topic of a notification"""
+ return SkipTest('This test is yet to be modified.')
+
+ hostname = get_ip()
+ conn = connection()
+ ps_zone = None
+ bucket_name = gen_bucket_name()
+ topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
+ topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
+ zonegroup = 'default'
+ # create topics
+ # start amqp receiver in a separate thread
+ exchange = 'ex1'
+ amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
+ amqp_task.start()
+ # create random port for the http server
+ http_port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(hostname, http_port)
+ #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
+ topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
+ result, status = topic_conf1.set_config()
+ parsed_result = json.loads(result)
+ topic_arn1 = parsed_result['arn']
+ assert_equal(status/100, 2)
+ #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port))
+ topic_conf2 = PSTopicS3(conn, topic_name2, endpoint_args='http://'+hostname+':'+str(http_port))
+ result, status = topic_conf2.set_config()
+ parsed_result = json.loads(result)
+ topic_arn2 = parsed_result['arn']
+ assert_equal(status/100, 2)
+ # create bucket on the first of the rados zones
+ bucket = conn.create_bucket(bucket_name)
+ # wait for sync
+ #zone_meta_checkpoint(ps_zone.zone)
+ # create s3 notification with topic1
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ # create objects in the bucket
+ number_of_objects = 10
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+ keys = list(bucket.list())
+ # TODO: use exact match
+ receiver.verify_s3_events(keys, exact_match=False);
+ # update notification to use topic2
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ # delete current objects and create new objects in the bucket
+ for key in bucket.list():
+ key.delete()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i+100))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ #zone_meta_checkpoint(ps_zone.zone)
+ #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+ keys = list(bucket.list())
+ # check that updates switched to http
+ # TODO: use exact match
+ http_server.verify_s3_events(keys, exact_match=False)
+ # cleanup
+ # delete objects from the bucket
+ stop_amqp_receiver(receiver, amqp_task)
+ for key in bucket.list():
+ key.delete()
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('modification_required')
+def test_ps_s3_multiple_topics_notification():
+ """ test notification creation with multiple topics"""
+ return SkipTest('This test is yet to be modified.')
+
+ hostname = get_ip()
+ zonegroup = 'default'
+ conn = connection()
+ ps_zone = None
+ bucket_name = gen_bucket_name()
+ topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX
+ topic_name2 = bucket_name+'http'+TOPIC_SUFFIX
+ # create topics
+ # start amqp receiver in a separate thread
+ exchange = 'ex1'
+ amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name1)
+ amqp_task.start()
+ # create random port for the http server
+ http_port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(hostname, http_port)
+ #topic_conf1 = PSTopic(ps_zone.conn, topic_name1,endpoint='amqp://' + hostname,endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
+ topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none')
+ result, status = topic_conf1.set_config()
+ parsed_result = json.loads(result)
+ topic_arn1 = parsed_result['arn']
+ assert_equal(status/100, 2)
+ #topic_conf2 = PSTopic(ps_zone.conn, topic_name2,endpoint='http://'+hostname+':'+str(http_port))
+ topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args='http://'+hostname+':'+str(http_port))
+ result, status = topic_conf2.set_config()
+ parsed_result = json.loads(result)
+ topic_arn2 = parsed_result['arn']
+ assert_equal(status/100, 2)
+ # create bucket on the first of the rados zones
+ bucket = conn.create_bucket(bucket_name)
+ # wait for sync
+ #zone_meta_checkpoint(ps_zone.zone)
+ # create s3 notification
+ notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1'
+ notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2'
+ topic_conf_list = [
+ {
+ 'Id': notification_name1,
+ 'TopicArn': topic_arn1,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {
+ 'Id': notification_name2,
+ 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+ result, _ = s3_notification_conf.get_config()
+ assert_equal(len(result['TopicConfigurations']), 2)
+ assert_equal(result['TopicConfigurations'][0]['Id'], notification_name1)
+ assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2)
+ # get auto-generated subscriptions
+ sub_conf1 = PSSubscription(ps_zone.conn, notification_name1,
+ topic_name1)
+ _, status = sub_conf1.get_config()
+ assert_equal(status/100, 2)
+ sub_conf2 = PSSubscription(ps_zone.conn, notification_name2,
+ topic_name2)
+ _, status = sub_conf2.get_config()
+ assert_equal(status/100, 2)
+ # create objects in the bucket
+ number_of_objects = 10
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ # wait for sync
+ #zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name)
+ # get the events from both of the subscription
+ result, _ = sub_conf1.get_events()
+ records = json.loads(result)
+ for record in records['Records']:
+ log.debug(record)
+ keys = list(bucket.list())
+ # TODO: use exact match
+ verify_s3_records_by_elements(records, keys, exact_match=False)
+ receiver.verify_s3_events(keys, exact_match=False)
+ result, _ = sub_conf2.get_events()
+ parsed_result = json.loads(result)
+ for record in parsed_result['Records']:
+ log.debug(record)
+ keys = list(bucket.list())
+ # TODO: use exact match
+ verify_s3_records_by_elements(records, keys, exact_match=False)
+ http_server.verify_s3_events(keys, exact_match=False)
+ # cleanup
+ stop_amqp_receiver(receiver, amqp_task)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete objects from the bucket
+ for key in bucket.list():
+ key.delete()
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+def kafka_security(security_type):
+ """ test pushing kafka s3 notification securly to master """
+ conn = connection()
+ zonegroup = 'default'
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ # name is constant for manual testing
+ topic_name = bucket_name+'_topic'
+ # create s3 topic
+ if security_type == 'SSL_SASL':
+ endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ elif security_type == 'SSL':
+ endpoint_address = 'kafka://' + kafka_server + ':9093'
+ else:
+ assert False, 'unknown security method '+security_type
+
+ KAFKA_DIR = os.environ['KAFKA_DIR']
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+"/y-ca.crt"
+
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+
+ # create consumer on the topic
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ s3_notification_conf.set_config()
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ try:
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True)
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + kafka notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ receiver.verify_s3_events(keys, exact_match=True, deletions=True)
+ except Exception as err:
+ assert False, str(err)
+ finally:
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ for key in bucket.list():
+ key.delete()
+ conn.delete_bucket(bucket_name)
+ stop_kafka_receiver(receiver, task)
+
+
+@attr('kafka_ssl_test')
+def test_ps_s3_notification_push_kafka_security_ssl():
+ kafka_security('SSL')
+
+
+@attr('kafka_ssl_test')
+def test_ps_s3_notification_push_kafka_security_ssl_sasl():
+ kafka_security('SSL_SASL')
+