summaryrefslogtreecommitdiffstats
path: root/src/test/rgw/bucket_notification/test_bn.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/rgw/bucket_notification/test_bn.py')
-rw-r--r--src/test/rgw/bucket_notification/test_bn.py65
1 files changed, 65 insertions, 0 deletions
diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py
index 87a2acb76..ee89d326d 100644
--- a/src/test/rgw/bucket_notification/test_bn.py
+++ b/src/test/rgw/bucket_notification/test_bn.py
@@ -2346,6 +2346,71 @@ def test_http_post_object_upload():
conn1.delete_bucket(Bucket=bucket_name)
+@attr('mpu_test')
+def test_ps_s3_multipart_on_master_http():
+ """ test http multipart object upload on master"""
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, port, num_workers=10)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ content = str(os.urandom(20*1024*1024))
+ key = bucket.new_key('obj')
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+ assert_equal(event['Records'][0]['s3']['object']['eTag'] != '', True)
+ print(event['Records'][0]['s3']['object'])
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
@attr('amqp_test')
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""