summaryrefslogtreecommitdiffstats
path: root/src/test/rgw/rgw_multi
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/test/rgw/rgw_multi
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/rgw/rgw_multi')
-rw-r--r--src/test/rgw/rgw_multi/__init__.py0
-rw-r--r--src/test/rgw/rgw_multi/conn.py41
-rw-r--r--src/test/rgw/rgw_multi/multisite.py407
-rw-r--r--src/test/rgw/rgw_multi/tests.py2861
-rw-r--r--src/test/rgw/rgw_multi/tests_az.py597
-rw-r--r--src/test/rgw/rgw_multi/tests_es.py276
-rw-r--r--src/test/rgw/rgw_multi/tools.py97
-rw-r--r--src/test/rgw/rgw_multi/zone_az.py42
-rw-r--r--src/test/rgw/rgw_multi/zone_cloud.py326
-rw-r--r--src/test/rgw/rgw_multi/zone_es.py256
-rw-r--r--src/test/rgw/rgw_multi/zone_rados.py134
11 files changed, 5037 insertions, 0 deletions
diff --git a/src/test/rgw/rgw_multi/__init__.py b/src/test/rgw/rgw_multi/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/test/rgw/rgw_multi/__init__.py
diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py
new file mode 100644
index 000000000..59bc2fdd3
--- /dev/null
+++ b/src/test/rgw/rgw_multi/conn.py
@@ -0,0 +1,41 @@
+import boto
+import boto.s3.connection
+import boto.iam.connection
+
+def get_gateway_connection(gateway, credentials):
+ """ connect to the given gateway """
+ if gateway.connection is None:
+ gateway.connection = boto.connect_s3(
+ aws_access_key_id = credentials.access_key,
+ aws_secret_access_key = credentials.secret,
+ host = gateway.host,
+ port = gateway.port,
+ is_secure = False,
+ calling_format = boto.s3.connection.OrdinaryCallingFormat())
+ return gateway.connection
+
+def get_gateway_secure_connection(gateway, credentials):
+ """ secure connect to the given gateway """
+ if gateway.ssl_port == 0:
+ return None
+ if gateway.secure_connection is None:
+ gateway.secure_connection = boto.connect_s3(
+ aws_access_key_id = credentials.access_key,
+ aws_secret_access_key = credentials.secret,
+ host = gateway.host,
+ port = gateway.ssl_port,
+ is_secure = True,
+ validate_certs=False,
+ calling_format = boto.s3.connection.OrdinaryCallingFormat())
+ return gateway.secure_connection
+
+def get_gateway_iam_connection(gateway, credentials):
+ """ connect to iam api of the given gateway """
+ if gateway.iam_connection is None:
+ gateway.iam_connection = boto.connect_iam(
+ aws_access_key_id = credentials.access_key,
+ aws_secret_access_key = credentials.secret,
+ host = gateway.host,
+ port = gateway.port,
+ is_secure = False)
+ return gateway.iam_connection
diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py
new file mode 100644
index 000000000..5d4dcd1aa
--- /dev/null
+++ b/src/test/rgw/rgw_multi/multisite.py
@@ -0,0 +1,407 @@
+from abc import ABCMeta, abstractmethod
+from io import StringIO
+
+import json
+
+from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection
+
+class Cluster:
+ """ interface to run commands against a distinct ceph cluster """
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def admin(self, args = None, **kwargs):
+ """ execute a radosgw-admin command """
+ pass
+
+class Gateway:
+ """ interface to control a single radosgw instance """
+ __metaclass__ = ABCMeta
+
+ def __init__(self, host = None, port = None, cluster = None, zone = None, ssl_port = 0):
+ self.host = host
+ self.port = port
+ self.cluster = cluster
+ self.zone = zone
+ self.connection = None
+ self.secure_connection = None
+ self.ssl_port = ssl_port
+ self.iam_connection = None
+
+ @abstractmethod
+ def start(self, args = []):
+ """ start the gateway with the given args """
+ pass
+
+ @abstractmethod
+ def stop(self):
+ """ stop the gateway """
+ pass
+
+ def endpoint(self):
+ return 'http://%s:%d' % (self.host, self.port)
+
+class SystemObject:
+ """ interface for system objects, represented in json format and
+ manipulated with radosgw-admin commands """
+ __metaclass__ = ABCMeta
+
+ def __init__(self, data = None, uuid = None):
+ self.data = data
+ self.id = uuid
+ if data:
+ self.load_from_json(data)
+
+ @abstractmethod
+ def build_command(self, command):
+ """ return the command line for the given command, including arguments
+ to specify this object """
+ pass
+
+ @abstractmethod
+ def load_from_json(self, data):
+ """ update internal state based on json data """
+ pass
+
+ def command(self, cluster, cmd, args = None, **kwargs):
+ """ run the given command and return the output and retcode """
+ args = self.build_command(cmd) + (args or [])
+ return cluster.admin(args, **kwargs)
+
+ def json_command(self, cluster, cmd, args = None, **kwargs):
+ """ run the given command, parse the output and return the resulting
+ data and retcode """
+ s, r = self.command(cluster, cmd, args or [], **kwargs)
+ if r == 0:
+ data = json.loads(s)
+ self.load_from_json(data)
+ self.data = data
+ return self.data, r
+
+ # mixins for supported commands
+ class Create(object):
+ def create(self, cluster, args = None, **kwargs):
+ """ create the object with the given arguments """
+ return self.json_command(cluster, 'create', args, **kwargs)
+
+ class Delete(object):
+ def delete(self, cluster, args = None, **kwargs):
+ """ delete the object """
+ # not json_command() because delete has no output
+ _, r = self.command(cluster, 'delete', args, **kwargs)
+ if r == 0:
+ self.data = None
+ return r
+
+ class Get(object):
+ def get(self, cluster, args = None, **kwargs):
+ """ read the object from storage """
+ kwargs['read_only'] = True
+ return self.json_command(cluster, 'get', args, **kwargs)
+
+ class Set(object):
+ def set(self, cluster, data, args = None, **kwargs):
+ """ set the object by json """
+ kwargs['stdin'] = StringIO(json.dumps(data))
+ return self.json_command(cluster, 'set', args, **kwargs)
+
+ class Modify(object):
+ def modify(self, cluster, args = None, **kwargs):
+ """ modify the object with the given arguments """
+ return self.json_command(cluster, 'modify', args, **kwargs)
+
+ class CreateDelete(Create, Delete): pass
+ class GetSet(Get, Set): pass
+
+class Zone(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify):
+ def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ self.name = name
+ self.zonegroup = zonegroup
+ self.cluster = cluster
+ self.gateways = gateways or []
+ super(Zone, self).__init__(data, zone_id)
+
+ def zone_arg(self):
+ """ command-line argument to specify this zone """
+ return ['--rgw-zone', self.name]
+
+ def zone_args(self):
+ """ command-line arguments to specify this zone/zonegroup/realm """
+ args = self.zone_arg()
+ if self.zonegroup:
+ args += self.zonegroup.zonegroup_args()
+ return args
+
+ def build_command(self, command):
+ """ build a command line for the given command and args """
+ return ['zone', command] + self.zone_args()
+
+ def load_from_json(self, data):
+ """ load the zone from json """
+ self.id = data['id']
+ self.name = data['name']
+
+ def start(self, args = None):
+ """ start all gateways """
+ for g in self.gateways:
+ g.start(args)
+
+ def stop(self):
+ """ stop all gateways """
+ for g in self.gateways:
+ g.stop()
+
+ def period(self):
+ return self.zonegroup.period if self.zonegroup else None
+
+ def realm(self):
+ return self.zonegroup.realm() if self.zonegroup else None
+
+ def is_read_only(self):
+ return False
+
+ def tier_type(self):
+ raise NotImplementedError
+
+ def syncs_from(self, zone_name):
+ return zone_name != self.name
+
+ def has_buckets(self):
+ return True
+
+ def has_roles(self):
+ return True
+
+ def get_conn(self, credentials):
+ return ZoneConn(self, credentials) # not implemented, but can be used
+
+class ZoneConn(object):
+ def __init__(self, zone, credentials):
+ self.zone = zone
+ self.name = zone.name
+ """ connect to the zone's first gateway """
+ if isinstance(credentials, list):
+ self.credentials = credentials[0]
+ else:
+ self.credentials = credentials
+
+ if self.zone.gateways is not None:
+ self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials)
+ self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials)
+
+ self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials)
+
+ # create connections for the rest of the gateways (if exist)
+ for gw in list(self.zone.gateways):
+ get_gateway_connection(gw, self.credentials)
+ get_gateway_secure_connection(gw, self.credentials)
+
+ get_gateway_iam_connection(gw, self.credentials)
+
+
+ def get_connection(self):
+ return self.conn
+
+ def get_iam_connection(self):
+ return self.iam_conn
+
+ def get_bucket(self, bucket_name, credentials):
+ raise NotImplementedError
+
+ def check_bucket_eq(self, zone, bucket_name):
+ raise NotImplementedError
+
+class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify):
+ def __init__(self, name, period = None, data = None, zonegroup_id = None, zones = None, master_zone = None):
+ self.name = name
+ self.period = period
+ self.zones = zones or []
+ self.master_zone = master_zone
+ super(ZoneGroup, self).__init__(data, zonegroup_id)
+ self.rw_zones = []
+ self.ro_zones = []
+ self.zones_by_type = {}
+ for z in self.zones:
+ if z.is_read_only():
+ self.ro_zones.append(z)
+ else:
+ self.rw_zones.append(z)
+
+ def zonegroup_arg(self):
+ """ command-line argument to specify this zonegroup """
+ return ['--rgw-zonegroup', self.name]
+
+ def zonegroup_args(self):
+ """ command-line arguments to specify this zonegroup/realm """
+ args = self.zonegroup_arg()
+ realm = self.realm()
+ if realm:
+ args += realm.realm_arg()
+ return args
+
+ def build_command(self, command):
+ """ build a command line for the given command and args """
+ return ['zonegroup', command] + self.zonegroup_args()
+
+ def zone_by_id(self, zone_id):
+ """ return the matching zone by id """
+ for zone in self.zones:
+ if zone.id == zone_id:
+ return zone
+ return None
+
+ def load_from_json(self, data):
+ """ load the zonegroup from json """
+ self.id = data['id']
+ self.name = data['name']
+ master_id = data['master_zone']
+ if not self.master_zone or master_id != self.master_zone.id:
+ self.master_zone = self.zone_by_id(master_id)
+
+ def add(self, cluster, zone, args = None, **kwargs):
+ """ add an existing zone to the zonegroup """
+ args = zone.zone_arg() + (args or [])
+ data, r = self.json_command(cluster, 'add', args, **kwargs)
+ if r == 0:
+ zone.zonegroup = self
+ self.zones.append(zone)
+ return data, r
+
+ def remove(self, cluster, zone, args = None, **kwargs):
+ """ remove an existing zone from the zonegroup """
+ args = zone.zone_arg() + (args or [])
+ data, r = self.json_command(cluster, 'remove', args, **kwargs)
+ if r == 0:
+ zone.zonegroup = None
+ self.zones.remove(zone)
+ return data, r
+
+ def realm(self):
+ return self.period.realm if self.period else None
+
+class Period(SystemObject, SystemObject.Get):
+ def __init__(self, realm = None, data = None, period_id = None, zonegroups = None, master_zonegroup = None):
+ self.realm = realm
+ self.zonegroups = zonegroups or []
+ self.master_zonegroup = master_zonegroup
+ super(Period, self).__init__(data, period_id)
+
+ def zonegroup_by_id(self, zonegroup_id):
+ """ return the matching zonegroup by id """
+ for zonegroup in self.zonegroups:
+ if zonegroup.id == zonegroup_id:
+ return zonegroup
+ return None
+
+ def build_command(self, command):
+ """ build a command line for the given command and args """
+ return ['period', command]
+
+ def load_from_json(self, data):
+ """ load the period from json """
+ self.id = data['id']
+ master_id = data['master_zonegroup']
+ if not self.master_zonegroup or master_id != self.master_zonegroup.id:
+ self.master_zonegroup = self.zonegroup_by_id(master_id)
+
+ def update(self, zone, args = None, **kwargs):
+ """ run 'radosgw-admin period update' on the given zone """
+ assert(zone.cluster)
+ args = zone.zone_args() + (args or [])
+ if kwargs.pop('commit', False):
+ args.append('--commit')
+ return self.json_command(zone.cluster, 'update', args, **kwargs)
+
+ def commit(self, zone, args = None, **kwargs):
+ """ run 'radosgw-admin period commit' on the given zone """
+ assert(zone.cluster)
+ args = zone.zone_args() + (args or [])
+ return self.json_command(zone.cluster, 'commit', args, **kwargs)
+
+class Realm(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet):
+ def __init__(self, name, period = None, data = None, realm_id = None):
+ self.name = name
+ self.current_period = period
+ super(Realm, self).__init__(data, realm_id)
+
+ def realm_arg(self):
+ """ return the command-line arguments that specify this realm """
+ return ['--rgw-realm', self.name]
+
+ def build_command(self, command):
+ """ build a command line for the given command and args """
+ return ['realm', command] + self.realm_arg()
+
+ def load_from_json(self, data):
+ """ load the realm from json """
+ self.id = data['id']
+
+ def pull(self, cluster, gateway, credentials, args = [], **kwargs):
+ """ pull an existing realm from the given gateway """
+ args += ['--url', gateway.endpoint()]
+ args += credentials.credential_args()
+ return self.json_command(cluster, 'pull', args, **kwargs)
+
+ def master_zonegroup(self):
+ """ return the current period's master zonegroup """
+ if self.current_period is None:
+ return None
+ return self.current_period.master_zonegroup
+
+ def meta_master_zone(self):
+ """ return the current period's metadata master zone """
+ zonegroup = self.master_zonegroup()
+ if zonegroup is None:
+ return None
+ return zonegroup.master_zone
+
+class Credentials:
+ def __init__(self, access_key, secret):
+ self.access_key = access_key
+ self.secret = secret
+
+ def credential_args(self):
+ return ['--access-key', self.access_key, '--secret', self.secret]
+
+class User(SystemObject):
+ def __init__(self, uid, data = None, name = None, credentials = None, tenant = None):
+ self.name = name
+ self.credentials = credentials or []
+ self.tenant = tenant
+ super(User, self).__init__(data, uid)
+
+ def user_arg(self):
+ """ command-line argument to specify this user """
+ args = ['--uid', self.id]
+ if self.tenant:
+ args += ['--tenant', self.tenant]
+ return args
+
+ def build_command(self, command):
+ """ build a command line for the given command and args """
+ return ['user', command] + self.user_arg()
+
+ def load_from_json(self, data):
+ """ load the user from json """
+ self.id = data['user_id']
+ self.name = data['display_name']
+ self.credentials = [Credentials(k['access_key'], k['secret_key']) for k in data['keys']]
+
+ def create(self, zone, args = None, **kwargs):
+ """ create the user with the given arguments """
+ assert(zone.cluster)
+ args = zone.zone_args() + (args or [])
+ return self.json_command(zone.cluster, 'create', args, **kwargs)
+
+ def info(self, zone, args = None, **kwargs):
+ """ read the user from storage """
+ assert(zone.cluster)
+ args = zone.zone_args() + (args or [])
+ kwargs['read_only'] = True
+ return self.json_command(zone.cluster, 'info', args, **kwargs)
+
+ def delete(self, zone, args = None, **kwargs):
+ """ delete the user """
+ assert(zone.cluster)
+ args = zone.zone_args() + (args or [])
+ return self.command(zone.cluster, 'delete', args, **kwargs)
diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py
new file mode 100644
index 000000000..156fac12e
--- /dev/null
+++ b/src/test/rgw/rgw_multi/tests.py
@@ -0,0 +1,2861 @@
+import json
+import random
+import string
+import sys
+import time
+import logging
+import errno
+import dateutil.parser
+
+from itertools import combinations
+from itertools import zip_longest
+from io import StringIO
+
+import boto
+import boto.s3.connection
+from boto.s3.website import WebsiteConfiguration
+from boto.s3.cors import CORSConfiguration
+
+from nose.tools import eq_ as eq
+from nose.tools import assert_not_equal, assert_equal
+from nose.plugins.attrib import attr
+from nose.plugins.skip import SkipTest
+
+from .multisite import Zone, ZoneGroup, Credentials
+
+from .conn import get_gateway_connection
+from .tools import assert_raises
+
+class Config:
+ """ test configuration """
+ def __init__(self, **kwargs):
+ # by default, wait up to 5 minutes before giving up on a sync checkpoint
+ self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
+ self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
+ # allow some time for realm reconfiguration after changing master zone
+ self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
+ self.tenant = kwargs.get('tenant', '')
+
+# rgw multisite tests, written against the interfaces provided in rgw_multi.
+# these tests must be initialized and run by another module that provides
+# implementations of these interfaces by calling init_multi()
+realm = None
+user = None
+config = None
+def init_multi(_realm, _user, _config=None):
+ global realm
+ realm = _realm
+ global user
+ user = _user
+ global config
+ config = _config or Config()
+ realm_meta_checkpoint(realm)
+
+def get_user():
+ return user.id if user is not None else ''
+
+def get_tenant():
+ return config.tenant if config is not None and config.tenant is not None else ''
+
+def get_realm():
+ return realm
+
+log = logging.getLogger('rgw_multi.tests')
+
+num_buckets = 0
+run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
+
+num_roles = 0
+
+def get_zone_connection(zone, credentials):
+ """ connect to the zone's first gateway """
+ if isinstance(credentials, list):
+ credentials = credentials[0]
+ return get_gateway_connection(zone.gateways[0], credentials)
+
+def mdlog_list(zone, period = None):
+ cmd = ['mdlog', 'list']
+ if period:
+ cmd += ['--period', period]
+ (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
+ return json.loads(mdlog_json)
+
+def mdlog_autotrim(zone):
+ zone.cluster.admin(['mdlog', 'autotrim'])
+
+def datalog_list(zone, args = None):
+ cmd = ['datalog', 'list'] + (args or [])
+ (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
+ return json.loads(datalog_json)
+
+def datalog_status(zone):
+ cmd = ['datalog', 'status']
+ (datalog_json, _) = zone.cluster.admin(cmd, read_only=True)
+ return json.loads(datalog_json)
+
+def datalog_autotrim(zone):
+ zone.cluster.admin(['datalog', 'autotrim'])
+
+def bilog_list(zone, bucket, args = None):
+ cmd = ['bilog', 'list', '--bucket', bucket] + (args or [])
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
+ bilog, _ = zone.cluster.admin(cmd, read_only=True)
+ return json.loads(bilog)
+
+def bilog_autotrim(zone, args = None):
+ zone.cluster.admin(['bilog', 'autotrim'] + (args or []))
+
+def bucket_layout(zone, bucket, args = None):
+ (bl_output,_) = zone.cluster.admin(['bucket', 'layout', '--bucket', bucket] + (args or []))
+ return json.loads(bl_output)
+
+def parse_meta_sync_status(meta_sync_status_json):
+ log.debug('current meta sync status=%s', meta_sync_status_json)
+ sync_status = json.loads(meta_sync_status_json)
+
+ sync_info = sync_status['sync_status']['info']
+ global_sync_status = sync_info['status']
+ num_shards = sync_info['num_shards']
+ period = sync_info['period']
+ realm_epoch = sync_info['realm_epoch']
+
+ sync_markers=sync_status['sync_status']['markers']
+ log.debug('sync_markers=%s', sync_markers)
+ assert(num_shards == len(sync_markers))
+
+ markers={}
+ for i in range(num_shards):
+ # get marker, only if it's an incremental marker for the same realm epoch
+ if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
+ markers[i] = ''
+ else:
+ markers[i] = sync_markers[i]['val']['marker']
+
+ return period, realm_epoch, num_shards, markers
+
+def meta_sync_status(zone):
+ for _ in range(config.checkpoint_retries):
+ cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
+ meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode == 0:
+ return parse_meta_sync_status(meta_sync_status_json)
+ assert(retcode == 2) # ENOENT
+ time.sleep(config.checkpoint_delay)
+
+ assert False, 'failed to read metadata sync status for zone=%s' % zone.name
+
+def meta_master_log_status(master_zone):
+ cmd = ['mdlog', 'status'] + master_zone.zone_args()
+ mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
+ mdlog_status = json.loads(mdlog_status_json)
+
+ markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
+ log.debug('master meta markers=%s', markers)
+ return markers
+
+def compare_meta_status(zone, log_status, sync_status):
+ if len(log_status) != len(sync_status):
+ log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
+ return False
+
+ msg = ''
+ for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
+ if l > s:
+ if len(msg):
+ msg += ', '
+ msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
+
+ if len(msg) > 0:
+ log.warning('zone %s behind master: %s', zone.name, msg)
+ return False
+
+ return True
+
+def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
+ if not meta_master_zone:
+ meta_master_zone = zone.realm().meta_master_zone()
+ if not master_status:
+ master_status = meta_master_log_status(meta_master_zone)
+
+ current_realm_epoch = realm.current_period.data['realm_epoch']
+
+ log.info('starting meta checkpoint for zone=%s', zone.name)
+
+ for _ in range(config.checkpoint_retries):
+ period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
+ if realm_epoch < current_realm_epoch:
+ log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
+ zone.name, realm_epoch, current_realm_epoch)
+ else:
+ log.debug('log_status=%s', master_status)
+ log.debug('sync_status=%s', sync_status)
+ if compare_meta_status(zone, master_status, sync_status):
+ log.info('finish meta checkpoint for zone=%s', zone.name)
+ return
+
+ time.sleep(config.checkpoint_delay)
+ assert False, 'failed meta checkpoint for zone=%s' % zone.name
+
+def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
+ if not meta_master_zone:
+ meta_master_zone = zonegroup.realm().meta_master_zone()
+ if not master_status:
+ master_status = meta_master_log_status(meta_master_zone)
+
+ for zone in zonegroup.zones:
+ if zone == meta_master_zone:
+ continue
+ zone_meta_checkpoint(zone, meta_master_zone, master_status)
+
+def realm_meta_checkpoint(realm):
+ log.info('meta checkpoint')
+
+ meta_master_zone = realm.meta_master_zone()
+ master_status = meta_master_log_status(meta_master_zone)
+
+ for zonegroup in realm.current_period.zonegroups:
+ zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
+
+def parse_data_sync_status(data_sync_status_json):
+ log.debug('current data sync status=%s', data_sync_status_json)
+ sync_status = json.loads(data_sync_status_json)
+
+ global_sync_status=sync_status['sync_status']['info']['status']
+ num_shards=sync_status['sync_status']['info']['num_shards']
+
+ sync_markers=sync_status['sync_status']['markers']
+ log.debug('sync_markers=%s', sync_markers)
+ assert(num_shards == len(sync_markers))
+
+ markers={}
+ for i in range(num_shards):
+ markers[i] = sync_markers[i]['val']['marker']
+
+ return (num_shards, markers)
+
+def data_sync_status(target_zone, source_zone):
+ if target_zone == source_zone:
+ return None
+
+ for _ in range(config.checkpoint_retries):
+ cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
+ cmd += ['--source-zone', source_zone.name]
+ data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode == 0:
+ return parse_data_sync_status(data_sync_status_json)
+
+ assert(retcode == 2) # ENOENT
+ time.sleep(config.checkpoint_delay)
+
+ assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
+ (target_zone.name, source_zone.name)
+
+def bucket_sync_status(target_zone, source_zone, bucket_name):
+ if target_zone == source_zone:
+ return None
+
+ cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args()
+ cmd += ['--source-zone', source_zone.name]
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
+ while True:
+ bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode == 0:
+ break
+
+ assert(retcode == 2) # ENOENT
+
+ sync_status = json.loads(bucket_sync_status_json)
+
+ markers={}
+ for entry in sync_status:
+ val = entry['val']
+ pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
+ markers[entry['key']] = pos
+
+ return markers
+
+def data_source_log_status(source_zone):
+ source_cluster = source_zone.cluster
+ cmd = ['datalog', 'status'] + source_zone.zone_args()
+ datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
+ datalog_status = json.loads(datalog_status_json)
+
+ markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
+ log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
+ return markers
+
+def bucket_source_log_status(source_zone, bucket_name):
+ cmd = ['bilog', 'status'] + source_zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else []
+ source_cluster = source_zone.cluster
+ bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
+ bilog_status = json.loads(bilog_status_json)
+
+ m={}
+ markers={}
+ try:
+ m = bilog_status['markers']
+ except:
+ pass
+
+ for s in m:
+ key = s['key']
+ val = s['val']
+ markers[key] = val
+
+ log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
+ return markers
+
+def compare_data_status(target_zone, source_zone, log_status, sync_status):
+ if len(log_status) != len(sync_status):
+ log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
+ return False
+
+ msg = ''
+ for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
+ if l > s:
+ if len(msg):
+ msg += ', '
+ msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
+
+ if len(msg) > 0:
+ log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
+ return False
+
+ return True
+
+def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
+ if len(log_status) != len(sync_status):
+ log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
+ return False
+
+ msg = ''
+ for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
+ if l > s:
+ if len(msg):
+ msg += ', '
+ msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
+
+ if len(msg) > 0:
+ log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
+ return False
+
+ return True
+
+def zone_data_checkpoint(target_zone, source_zone):
+ if not target_zone.syncs_from(source_zone.name):
+ return
+
+ log_status = data_source_log_status(source_zone)
+ log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
+
+ for _ in range(config.checkpoint_retries):
+ num_shards, sync_status = data_sync_status(target_zone, source_zone)
+
+ log.debug('log_status=%s', log_status)
+ log.debug('sync_status=%s', sync_status)
+
+ if compare_data_status(target_zone, source_zone, log_status, sync_status):
+ log.info('finished data checkpoint for target_zone=%s source_zone=%s',
+ target_zone.name, source_zone.name)
+ return
+ time.sleep(config.checkpoint_delay)
+
+ assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
+ (target_zone.name, source_zone.name)
+
+def zonegroup_data_checkpoint(zonegroup_conns):
+ for source_conn in zonegroup_conns.rw_zones:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+ log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name)
+ zone_data_checkpoint(target_conn.zone, source_conn.zone)
+
+def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
+ if not target_zone.syncs_from(source_zone.name):
+ return
+
+ cmd = ['bucket', 'sync', 'checkpoint']
+ cmd += ['--bucket', bucket_name, '--source-zone', source_zone.name]
+ retry_delay_ms = config.checkpoint_delay * 1000
+ timeout_sec = config.checkpoint_retries * config.checkpoint_delay
+ cmd += ['--retry-delay-ms', str(retry_delay_ms), '--timeout-sec', str(timeout_sec)]
+ cmd += target_zone.zone_args()
+ target_zone.cluster.admin(cmd, debug_rgw=1)
+
+def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
+ for source_conn in zonegroup_conns.rw_zones:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+ log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name)
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
+ for source_conn, target_conn in combinations(zonegroup_conns.zones, 2):
+ if target_conn.zone.has_buckets():
+ target_conn.check_bucket_eq(source_conn, bucket_name)
+
+def set_master_zone(zone):
+ zone.modify(zone.cluster, ['--master'])
+ zonegroup = zone.zonegroup
+ zonegroup.period.update(zone, commit=True)
+ zonegroup.master_zone = zone
+ log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
+ time.sleep(config.reconfigure_delay)
+
+def set_sync_from_all(zone, flag):
+ s = 'true' if flag else 'false'
+ zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)])
+ zonegroup = zone.zonegroup
+ zonegroup.period.update(zone, commit=True)
+ log.info('Set sync_from_all flag on zone %s to %s', zone.name, s)
+ time.sleep(config.reconfigure_delay)
+
+def set_redirect_zone(zone, redirect_zone):
+ id_str = redirect_zone.id if redirect_zone else ''
+ zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)])
+ zonegroup = zone.zonegroup
+ zonegroup.period.update(zone, commit=True)
+ log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str)
+ time.sleep(config.reconfigure_delay)
+
+def enable_bucket_sync(zone, bucket_name):
+ cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
+ zone.cluster.admin(cmd)
+
+def disable_bucket_sync(zone, bucket_name):
+ cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
+ zone.cluster.admin(cmd)
+
+def check_buckets_sync_status_obj_not_exist(zone, buckets):
+ for _ in range(config.checkpoint_retries):
+ cmd = ['log', 'list'] + zone.zone_arg()
+ log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ for bucket in buckets:
+ if log_list.find(':'+bucket+":") >= 0:
+ break
+ else:
+ return
+ time.sleep(config.checkpoint_delay)
+ assert False
+
+def gen_bucket_name():
+ global num_buckets
+
+ num_buckets += 1
+ return run_prefix + '-' + str(num_buckets)
+
+def gen_role_name():
+ global num_roles
+
+ num_roles += 1
+ return "roles" + '-' + run_prefix + '-' + str(num_roles)
+
+class ZonegroupConns:
+ def __init__(self, zonegroup):
+ self.zonegroup = zonegroup
+ self.zones = []
+ self.ro_zones = []
+ self.rw_zones = []
+ self.master_zone = None
+
+ for z in zonegroup.zones:
+ zone_conn = z.get_conn(user.credentials)
+ self.zones.append(zone_conn)
+ if z.is_read_only():
+ self.ro_zones.append(zone_conn)
+ else:
+ self.rw_zones.append(zone_conn)
+
+ if z == zonegroup.master_zone:
+ self.master_zone = zone_conn
+
+def check_all_buckets_exist(zone_conn, buckets):
+ if not zone_conn.zone.has_buckets():
+ return True
+
+ for b in buckets:
+ try:
+ zone_conn.get_bucket(b)
+ except:
+ log.critical('zone %s does not contain bucket %s', zone_conn.zone.name, b)
+ return False
+
+ return True
+
+def check_all_buckets_dont_exist(zone_conn, buckets):
+ if not zone_conn.zone.has_buckets():
+ return True
+
+ for b in buckets:
+ try:
+ zone_conn.get_bucket(b)
+ except:
+ continue
+
+ log.critical('zone %s contains bucket %s', zone.zone, b)
+ return False
+
+ return True
+
+def create_role_per_zone(zonegroup_conns, roles_per_zone = 1):
+ roles = []
+ zone_role = []
+ for zone in zonegroup_conns.rw_zones:
+ for i in range(roles_per_zone):
+ role_name = gen_role_name()
+ log.info('create role zone=%s name=%s', zone.name, role_name)
+ policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/testuser\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
+ role = zone.create_role("", role_name, policy_document, "")
+ roles.append(role_name)
+ zone_role.append((zone, role))
+
+ return roles, zone_role
+
+def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
+ buckets = []
+ zone_bucket = []
+ for zone in zonegroup_conns.rw_zones:
+ for i in range(buckets_per_zone):
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
+ bucket = zone.create_bucket(bucket_name)
+ buckets.append(bucket_name)
+ zone_bucket.append((zone, bucket))
+
+ return buckets, zone_bucket
+
+def create_bucket_per_zone_in_realm():
+ buckets = []
+ zone_bucket = []
+ for zonegroup in realm.current_period.zonegroups:
+ zg_conn = ZonegroupConns(zonegroup)
+ b, z = create_bucket_per_zone(zg_conn)
+ buckets.extend(b)
+ zone_bucket.extend(z)
+ return buckets, zone_bucket
+
+def test_bucket_create():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, _ = create_bucket_per_zone(zonegroup_conns)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone in zonegroup_conns.zones:
+ assert check_all_buckets_exist(zone, buckets)
+
+def test_bucket_recreate():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, _ = create_bucket_per_zone(zonegroup_conns)
+ zonegroup_meta_checkpoint(zonegroup)
+
+
+ for zone in zonegroup_conns.zones:
+ assert check_all_buckets_exist(zone, buckets)
+
+ # recreate buckets on all zones, make sure they weren't removed
+ for zone in zonegroup_conns.rw_zones:
+ for bucket_name in buckets:
+ bucket = zone.create_bucket(bucket_name)
+
+ for zone in zonegroup_conns.zones:
+ assert check_all_buckets_exist(zone, buckets)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone in zonegroup_conns.zones:
+ assert check_all_buckets_exist(zone, buckets)
+
+def test_bucket_remove():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone in zonegroup_conns.zones:
+ assert check_all_buckets_exist(zone, buckets)
+
+ for zone, bucket_name in zone_bucket:
+ zone.conn.delete_bucket(bucket_name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone in zonegroup_conns.zones:
+ assert check_all_buckets_dont_exist(zone, buckets)
+
+def get_bucket(zone, bucket_name):
+ return zone.conn.get_bucket(bucket_name)
+
+def get_key(zone, bucket_name, obj_name):
+ b = get_bucket(zone, bucket_name)
+ return b.get_key(obj_name)
+
+def new_key(zone, bucket_name, obj_name):
+ b = get_bucket(zone, bucket_name)
+ return b.new_key(obj_name)
+
+def check_bucket_eq(zone_conn1, zone_conn2, bucket):
+ if zone_conn2.zone.has_buckets():
+ zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
+
+def check_role_eq(zone_conn1, zone_conn2, role):
+ if zone_conn2.zone.has_roles():
+ zone_conn2.check_role_eq(zone_conn1, role['create_role_response']['create_role_result']['role']['role_name'])
+
+def test_object_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ objnames = [ 'myobj', '_myobj', ':', '&' ]
+ content = 'asdasd'
+
+ # don't wait for meta sync just yet
+ for zone, bucket_name in zone_bucket:
+ for objname in objnames:
+ k = new_key(zone, bucket_name, objname)
+ k.set_contents_from_string(content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for source_conn, bucket in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
+
+def test_object_delete():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ objname = 'myobj'
+ content = 'asdasd'
+
+ # don't wait for meta sync just yet
+ for zone, bucket in zone_bucket:
+ k = new_key(zone, bucket, objname)
+ k.set_contents_from_string(content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # check object exists
+ for source_conn, bucket in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
+
+ # check object removal
+ for source_conn, bucket in zone_bucket:
+ k = get_key(source_conn, bucket, objname)
+ k.delete()
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+ check_bucket_eq(source_conn, target_conn, bucket)
+
+def get_latest_object_version(key):
+ for k in key.bucket.list_versions(key.name):
+ if k.is_latest:
+ return k
+ return None
+
+def test_versioned_object_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # enable versioning
+ for _, bucket in zone_bucket:
+ bucket.configure_versioning(True)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload a dummy object to each bucket and wait for sync. this forces each
+ # bucket to finish a full sync and switch to incremental
+ for source_conn, bucket in zone_bucket:
+ new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+
+ for _, bucket in zone_bucket:
+ # create and delete multiple versions of an object from each zone
+ for zone_conn in zonegroup_conns.rw_zones:
+ obj = 'obj-' + zone_conn.name
+ k = new_key(zone_conn, bucket, obj)
+
+ k.set_contents_from_string('version1')
+ log.debug('version1 id=%s', k.version_id)
+ # don't delete version1 - this tests that the initial version
+ # doesn't get squashed into later versions
+
+ # create and delete the following object versions to test that
+ # the operations don't race with each other during sync
+ k.set_contents_from_string('version2')
+ log.debug('version2 id=%s', k.version_id)
+ k.bucket.delete_key(obj, version_id=k.version_id)
+
+ k.set_contents_from_string('version3')
+ log.debug('version3 id=%s', k.version_id)
+ k.bucket.delete_key(obj, version_id=k.version_id)
+
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ for _, bucket in zone_bucket:
+ # overwrite the acls to test that metadata-only entries are applied
+ for zone_conn in zonegroup_conns.rw_zones:
+ obj = 'obj-' + zone_conn.name
+ k = new_key(zone_conn, bucket.name, obj)
+ v = get_latest_object_version(k)
+ v.make_public()
+
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_concurrent_versioned_object_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a versioned bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ bucket.configure_versioning(True)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload a dummy object and wait for sync. this forces each zone to finish
+ # a full sync and switch to incremental
+ new_key(zone, bucket, 'dummy').set_contents_from_string('')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # create several concurrent versions on each zone and let them race to sync
+ obj = 'obj'
+ for i in range(10):
+ for zone_conn in zonegroup_conns.rw_zones:
+ k = new_key(zone_conn, bucket, obj)
+ k.set_contents_from_string('version1')
+ log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+def test_version_suspended_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a non-versioned bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an initial object
+ key1 = new_key(zone, bucket, 'obj')
+ key1.set_contents_from_string('')
+ log.debug('created initial version id=%s', key1.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # enable versioning
+ bucket.configure_versioning(True)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # re-upload the object as a new version
+ key2 = new_key(zone, bucket, 'obj')
+ key2.set_contents_from_string('')
+ log.debug('created new version id=%s', key2.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # suspend versioning
+ bucket.configure_versioning(False)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # re-upload the object as a 'null' version
+ key3 = new_key(zone, bucket, 'obj')
+ key3.set_contents_from_string('')
+ log.debug('created null version id=%s', key3.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_delete_marker_full_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # enable versioning
+ for _, bucket in zone_bucket:
+ bucket.configure_versioning(True)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone, bucket in zone_bucket:
+ # upload an initial object
+ key1 = new_key(zone, bucket, 'obj')
+ key1.set_contents_from_string('')
+
+ # create a delete marker
+ key2 = new_key(zone, bucket, 'obj')
+ key2.delete()
+
+ # wait for full sync
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_suspended_delete_marker_full_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # enable/suspend versioning
+ for _, bucket in zone_bucket:
+ bucket.configure_versioning(True)
+ bucket.configure_versioning(False)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone, bucket in zone_bucket:
+ # upload an initial object
+ key1 = new_key(zone, bucket, 'obj')
+ key1.set_contents_from_string('')
+
+ # create a delete marker
+ key2 = new_key(zone, bucket, 'obj')
+ key2.delete()
+
+ # wait for full sync
+ for _, bucket in zone_bucket:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_bucket_versioning():
+ buckets, zone_bucket = create_bucket_per_zone_in_realm()
+ for _, bucket in zone_bucket:
+ bucket.configure_versioning(True)
+ res = bucket.get_versioning_status()
+ key = 'Versioning'
+ assert(key in res and res[key] == 'Enabled')
+
+def test_bucket_acl():
+ buckets, zone_bucket = create_bucket_per_zone_in_realm()
+ for _, bucket in zone_bucket:
+ assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
+ bucket.set_acl('public-read')
+ assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
+
+def test_bucket_cors():
+ buckets, zone_bucket = create_bucket_per_zone_in_realm()
+ for _, bucket in zone_bucket:
+ cors_cfg = CORSConfiguration()
+ cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
+ bucket.set_cors(cors_cfg)
+ assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
+
+def test_bucket_delete_notempty():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for zone_conn, bucket_name in zone_bucket:
+ # upload an object to each bucket on its own zone
+ conn = zone_conn.get_connection()
+ bucket = conn.get_bucket(bucket_name)
+ k = bucket.new_key('foo')
+ k.set_contents_from_string('bar')
+ # attempt to delete the bucket before this object can sync
+ try:
+ conn.delete_bucket(bucket_name)
+ except boto.exception.S3ResponseError as e:
+ assert(e.error_code == 'BucketNotEmpty')
+ continue
+ assert False # expected 409 BucketNotEmpty
+
+ # assert that each bucket still exists on the master
+ c1 = zonegroup_conns.master_zone.conn
+ for _, bucket_name in zone_bucket:
+ assert c1.get_bucket(bucket_name)
+
+def test_multi_period_incremental_sync():
+ zonegroup = realm.master_zonegroup()
+ if len(zonegroup.zones) < 3:
+ raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
+
+ # periods to include in mdlog comparison
+ mdlog_periods = [realm.current_period.id]
+
+ # create a bucket in each zone
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ z1, z2, z3 = zonegroup.zones[0:3]
+ assert(z1 == zonegroup.master_zone)
+
+ # kill zone 3 gateways to freeze sync status to incremental in first period
+ z3.stop()
+
+ # change master to zone 2 -> period 2
+ set_master_zone(z2)
+ mdlog_periods += [realm.current_period.id]
+
+ for zone_conn, _ in zone_bucket:
+ if zone_conn.zone == z3:
+ continue
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
+ bucket = zone_conn.conn.create_bucket(bucket_name)
+ buckets.append(bucket_name)
+
+ # wait for zone 1 to sync
+ zone_meta_checkpoint(z1)
+
+ # change master back to zone 1 -> period 3
+ set_master_zone(z1)
+ mdlog_periods += [realm.current_period.id]
+
+ for zone_conn, bucket_name in zone_bucket:
+ if zone_conn.zone == z3:
+ continue
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
+ zone_conn.conn.create_bucket(bucket_name)
+ buckets.append(bucket_name)
+
+ # restart zone 3 gateway and wait for sync
+ z3.start()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # verify that we end up with the same objects
+ for bucket_name in buckets:
+ for source_conn, _ in zone_bucket:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+
+ if target_conn.zone.has_buckets():
+ target_conn.check_bucket_eq(source_conn, bucket_name)
+
+ # verify that mdlogs are not empty and match for each period
+ for period in mdlog_periods:
+ master_mdlog = mdlog_list(z1, period)
+ assert len(master_mdlog) > 0
+ for zone in zonegroup.zones:
+ if zone == z1:
+ continue
+ mdlog = mdlog_list(zone, period)
+ assert len(mdlog) == len(master_mdlog)
+
+ # autotrim mdlogs for master zone
+ mdlog_autotrim(z1)
+
+ # autotrim mdlogs for peers
+ for zone in zonegroup.zones:
+ if zone == z1:
+ continue
+ mdlog_autotrim(zone)
+
+ # verify that mdlogs are empty for each period
+ for period in mdlog_periods:
+ for zone in zonegroup.zones:
+ mdlog = mdlog_list(zone, period)
+ assert len(mdlog) == 0
+
+def test_datalog_autotrim():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ # upload an object to each zone to generate a datalog entry
+ for zone, bucket in zone_bucket:
+ k = new_key(zone, bucket.name, 'key')
+ k.set_contents_from_string('body')
+
+ # wait for metadata and data sync to catch up
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+ # trim each datalog
+ for zone, _ in zone_bucket:
+ # read max markers for each shard
+ status = datalog_status(zone.zone)
+
+ datalog_autotrim(zone.zone)
+
+ for shard_id, shard_status in enumerate(status):
+ try:
+ before_trim = dateutil.parser.isoparse(shard_status['last_update'])
+ except: # empty timestamps look like "0.000000" and will fail here
+ continue
+ entries = datalog_list(zone.zone, ['--shard-id', str(shard_id), '--max-entries', '1'])
+ if not len(entries):
+ continue
+ after_trim = dateutil.parser.isoparse(entries[0]['timestamp'])
+ assert before_trim < after_trim, "any datalog entries must be newer than trim"
+
+def test_multi_zone_redirect():
+ zonegroup = realm.master_zonegroup()
+ if len(zonegroup.rw_zones) < 2:
+ raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ (zc1, zc2) = zonegroup_conns.rw_zones[0:2]
+
+ z1, z2 = (zc1.zone, zc2.zone)
+
+ set_sync_from_all(z2, False)
+
+ # create a bucket on the first zone
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', z1.name, bucket_name)
+ bucket = zc1.conn.create_bucket(bucket_name)
+ obj = 'testredirect'
+
+ key = bucket.new_key(obj)
+ data = 'A'*512
+ key.set_contents_from_string(data)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # try to read object from second zone (should fail)
+ bucket2 = get_bucket(zc2, bucket_name)
+ assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj)
+
+ set_redirect_zone(z2, z1)
+
+ key2 = bucket2.get_key(obj)
+
+ eq(data, key2.get_contents_as_string(encoding='ascii'))
+
+ key = bucket.new_key(obj)
+
+ for x in ['a', 'b', 'c', 'd']:
+ data = x*512
+ key.set_contents_from_string(data)
+ eq(data, key2.get_contents_as_string(encoding='ascii'))
+
+ # revert config changes
+ set_sync_from_all(z2, True)
+ set_redirect_zone(z2, None)
+
+def test_zonegroup_remove():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ if len(zonegroup.zones) < 2:
+ raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
+
+ zonegroup_meta_checkpoint(zonegroup)
+ z1, z2 = zonegroup.zones[0:2]
+ c1, c2 = (z1.cluster, z2.cluster)
+
+ # get admin credentials out of existing zone
+ system_key = z1.data['system_key']
+ admin_creds = Credentials(system_key['access_key'], system_key['secret_key'])
+
+ # create a new zone in zonegroup on c2 and commit
+ zone = Zone('remove', zonegroup, c2)
+ zone.create(c2, admin_creds.credential_args())
+ zonegroup.zones.append(zone)
+ zonegroup.period.update(zone, commit=True)
+
+ zonegroup.remove(c1, zone)
+
+ # another 'zonegroup remove' should fail with ENOENT
+ _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
+ assert(retcode == 2) # ENOENT
+
+ # delete the new zone
+ zone.delete(c2)
+
+ # validate the resulting period
+ zonegroup.period.update(z1, commit=True)
+
+
+def test_zg_master_zone_delete():
+
+ master_zg = realm.master_zonegroup()
+ master_zone = master_zg.master_zone
+
+ assert(len(master_zg.zones) >= 1)
+ master_cluster = master_zg.zones[0].cluster
+
+ rm_zg = ZoneGroup('remove_zg')
+ rm_zg.create(master_cluster)
+
+ rm_zone = Zone('remove', rm_zg, master_cluster)
+ rm_zone.create(master_cluster)
+ master_zg.period.update(master_zone, commit=True)
+
+
+ rm_zone.delete(master_cluster)
+ # Period update: This should now fail as the zone will be the master zone
+ # in that zg
+ _, retcode = master_zg.period.update(master_zone, check_retcode=False)
+ assert(retcode == errno.EINVAL)
+
+ # Proceed to delete the zonegroup as well, previous period now does not
+ # contain a dangling master_zone, this must succeed
+ rm_zg.delete(master_cluster)
+ master_zg.period.update(master_zone, commit=True)
+
+def test_set_bucket_website():
+ buckets, zone_bucket = create_bucket_per_zone_in_realm()
+ for _, bucket in zone_bucket:
+ website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
+ try:
+ bucket.set_website_configuration(website_cfg)
+ except boto.exception.S3ResponseError as e:
+ if e.error_code == 'MethodNotAllowed':
+ raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
+ assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
+
+def test_set_bucket_policy():
+ policy = '''{
+ "Version": "2012-10-17",
+ "Statement": [{
+ "Effect": "Allow",
+ "Principal": "*"
+ }]
+}'''
+ buckets, zone_bucket = create_bucket_per_zone_in_realm()
+ for _, bucket in zone_bucket:
+ bucket.set_policy(policy)
+ assert(bucket.get_policy().decode('ascii') == policy)
+
+@attr('bucket_sync_disable')
+def test_bucket_sync_disable():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for bucket_name in buckets:
+ disable_bucket_sync(realm.meta_master_zone(), bucket_name)
+
+ for zone in zonegroup.zones:
+ check_buckets_sync_status_obj_not_exist(zone, buckets)
+
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('bucket_sync_disable')
+def test_bucket_sync_enable_right_after_disable():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ objnames = ['obj1', 'obj2', 'obj3', 'obj4']
+ content = 'asdasd'
+
+ for zone, bucket in zone_bucket:
+ for objname in objnames:
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string(content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for bucket_name in buckets:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
+
+ for bucket_name in buckets:
+ disable_bucket_sync(realm.meta_master_zone(), bucket_name)
+ enable_bucket_sync(realm.meta_master_zone(), bucket_name)
+
+ objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
+
+ for zone, bucket in zone_bucket:
+ for objname in objnames_2:
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string(content)
+
+ for bucket_name in buckets:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
+
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('bucket_sync_disable')
+def test_bucket_sync_disable_enable():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
+ content = 'asdasd'
+
+ for zone, bucket in zone_bucket:
+ for objname in objnames:
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string(content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for bucket_name in buckets:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
+
+ for bucket_name in buckets:
+ disable_bucket_sync(realm.meta_master_zone(), bucket_name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
+
+ for zone, bucket in zone_bucket:
+ for objname in objnames_2:
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string(content)
+
+ for bucket_name in buckets:
+ enable_bucket_sync(realm.meta_master_zone(), bucket_name)
+
+ for bucket_name in buckets:
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
+
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+def test_multipart_object_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
+
+ _, bucket = zone_bucket[0]
+
+ # initiate a multipart upload
+ upload = bucket.initiate_multipart_upload('MULTIPART')
+ mp = boto.s3.multipart.MultiPartUpload(bucket)
+ mp.key_name = upload.key_name
+ mp.id = upload.id
+ part_size = 5 * 1024 * 1024 # 5M min part size
+ mp.upload_part_from_file(StringIO('a' * part_size), 1)
+ mp.upload_part_from_file(StringIO('b' * part_size), 2)
+ mp.upload_part_from_file(StringIO('c' * part_size), 3)
+ mp.upload_part_from_file(StringIO('d' * part_size), 4)
+ mp.complete_upload()
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_encrypted_object_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ if len(zonegroup.rw_zones) < 2:
+ raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
+
+ (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
+
+ # create a bucket on the first zone
+ bucket_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
+ bucket = zone1.conn.create_bucket(bucket_name)
+
+ # upload an object with sse-c encryption
+ sse_c_headers = {
+ 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
+ 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+ 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
+ }
+ key = bucket.new_key('testobj-sse-c')
+ data = 'A'*512
+ key.set_contents_from_string(data, headers=sse_c_headers)
+
+ # upload an object with sse-kms encryption
+ sse_kms_headers = {
+ 'x-amz-server-side-encryption': 'aws:kms',
+ # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
+ 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
+ }
+ key = bucket.new_key('testobj-sse-kms')
+ key.set_contents_from_string(data, headers=sse_kms_headers)
+
+ # wait for the bucket metadata and data to sync
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
+
+ # read the encrypted objects from the second zone
+ bucket2 = get_bucket(zone2, bucket_name)
+ key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
+ eq(data, key.get_contents_as_string(headers=sse_c_headers, encoding='ascii'))
+
+ key = bucket2.get_key('testobj-sse-kms')
+ eq(data, key.get_contents_as_string(encoding='ascii'))
+
+def test_bucket_index_log_trim():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a test bucket, upload some objects, and wait for sync
+ def make_test_bucket():
+ name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone.name, name)
+ bucket = zone.conn.create_bucket(name)
+ for objname in ('a', 'b', 'c', 'd'):
+ k = new_key(zone, name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_bucket_checkpoint(zonegroup_conns, name)
+ return bucket
+
+ # create a 'cold' bucket
+ cold_bucket = make_test_bucket()
+
+ # trim with max-buckets=0 to clear counters for cold bucket. this should
+ # prevent it from being considered 'active' by the next autotrim
+ bilog_autotrim(zone.zone, [
+ '--rgw-sync-log-trim-max-buckets', '0',
+ ])
+
+ # create an 'active' bucket
+ active_bucket = make_test_bucket()
+
+ # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only
+ bilog_autotrim(zone.zone, [
+ '--rgw-sync-log-trim-max-buckets', '1',
+ '--rgw-sync-log-trim-min-cold-buckets', '0',
+ ])
+
+ # verify active bucket has empty bilog
+ active_bilog = bilog_list(zone.zone, active_bucket.name)
+ assert(len(active_bilog) == 0)
+
+ # verify cold bucket has nonempty bilog
+ cold_bilog = bilog_list(zone.zone, cold_bucket.name)
+ assert(len(cold_bilog) > 0)
+
+ # trim with min-cold-buckets=999 to trim all buckets
+ bilog_autotrim(zone.zone, [
+ '--rgw-sync-log-trim-max-buckets', '999',
+ '--rgw-sync-log-trim-min-cold-buckets', '999',
+ ])
+
+ # verify cold bucket has empty bilog
+ cold_bilog = bilog_list(zone.zone, cold_bucket.name)
+ assert(len(cold_bilog) == 0)
+
+def test_bucket_reshard_index_log_trim():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a test bucket, upload some objects, and wait for sync
+ def make_test_bucket():
+ name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone.name, name)
+ bucket = zone.conn.create_bucket(name)
+ for objname in ('a', 'b', 'c', 'd'):
+ k = new_key(zone, name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_bucket_checkpoint(zonegroup_conns, name)
+ return bucket
+
+ # create a 'test' bucket
+ test_bucket = make_test_bucket()
+
+ # checking bucket layout before resharding
+ json_obj_1 = bucket_layout(zone.zone, test_bucket.name)
+ assert(len(json_obj_1['layout']['logs']) == 1)
+
+ first_gen = json_obj_1['layout']['current_index']['gen']
+
+ before_reshard_bilog = bilog_list(zone.zone, test_bucket.name, ['--gen', str(first_gen)])
+ assert(len(before_reshard_bilog) == 4)
+
+ # Resharding the bucket
+ zone.zone.cluster.admin(['bucket', 'reshard',
+ '--bucket', test_bucket.name,
+ '--num-shards', '3',
+ '--yes-i-really-mean-it'])
+
+ # checking bucket layout after 1st resharding
+ json_obj_2 = bucket_layout(zone.zone, test_bucket.name)
+ assert(len(json_obj_2['layout']['logs']) == 2)
+
+ second_gen = json_obj_2['layout']['current_index']['gen']
+
+ after_reshard_bilog = bilog_list(zone.zone, test_bucket.name, ['--gen', str(second_gen)])
+ assert(len(after_reshard_bilog) == 0)
+
+ # upload more objects
+ for objname in ('e', 'f', 'g', 'h'):
+ k = new_key(zone, test_bucket.name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
+
+ # Resharding the bucket again
+ zone.zone.cluster.admin(['bucket', 'reshard',
+ '--bucket', test_bucket.name,
+ '--num-shards', '3',
+ '--yes-i-really-mean-it'])
+
+ # checking bucket layout after 2nd resharding
+ json_obj_3 = bucket_layout(zone.zone, test_bucket.name)
+ assert(len(json_obj_3['layout']['logs']) == 3)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
+
+ bilog_autotrim(zone.zone)
+
+ # checking bucket layout after 1st bilog autotrim
+ json_obj_4 = bucket_layout(zone.zone, test_bucket.name)
+ assert(len(json_obj_4['layout']['logs']) == 2)
+
+ bilog_autotrim(zone.zone)
+
+ # checking bucket layout after 2nd bilog autotrim
+ json_obj_5 = bucket_layout(zone.zone, test_bucket.name)
+ assert(len(json_obj_5['layout']['logs']) == 1)
+
+ bilog_autotrim(zone.zone)
+
+ # upload more objects
+ for objname in ('i', 'j', 'k', 'l'):
+ k = new_key(zone, test_bucket.name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name)
+
+ # verify the bucket has non-empty bilog
+ test_bilog = bilog_list(zone.zone, test_bucket.name)
+ assert(len(test_bilog) > 0)
+
+@attr('bucket_reshard')
+def test_bucket_reshard_incremental():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload some objects
+ for objname in ('a', 'b', 'c', 'd'):
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # reshard in each zone
+ for z in zonegroup_conns.rw_zones:
+ z.zone.cluster.admin(['bucket', 'reshard',
+ '--bucket', bucket.name,
+ '--num-shards', '3',
+ '--yes-i-really-mean-it'])
+
+ # upload more objects
+ for objname in ('e', 'f', 'g', 'h'):
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+@attr('bucket_reshard')
+def test_bucket_reshard_full():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ # create a bucket
+ bucket = zone.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # stop gateways in other zones so we can force the bucket to full sync
+ for z in zonegroup_conns.rw_zones[1:]:
+ z.zone.stop()
+
+ # use try-finally to restart gateways even if something fails
+ try:
+ # upload some objects
+ for objname in ('a', 'b', 'c', 'd'):
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string('foo')
+
+ # reshard on first zone
+ zone.zone.cluster.admin(['bucket', 'reshard',
+ '--bucket', bucket.name,
+ '--num-shards', '3',
+ '--yes-i-really-mean-it'])
+
+ # upload more objects
+ for objname in ('e', 'f', 'g', 'h'):
+ k = new_key(zone, bucket.name, objname)
+ k.set_contents_from_string('foo')
+ finally:
+ for z in zonegroup_conns.rw_zones[1:]:
+ z.zone.start()
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_bucket_creation_time():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones]
+ for z1, z2 in combinations(zone_buckets, 2):
+ for a, b in zip(z1, z2):
+ eq(a.name, b.name)
+ eq(a.creation_date, b.creation_date)
+
+def get_bucket_shard_objects(zone, num_shards):
+ """
+ Get one object for each shard of the bucket index log
+ """
+ cmd = ['bucket', 'shard', 'objects'] + zone.zone_args()
+ cmd += ['--num-shards', str(num_shards)]
+ shardobjs_json, ret = zone.cluster.admin(cmd, read_only=True)
+ assert ret == 0
+ shardobjs = json.loads(shardobjs_json)
+ return shardobjs['objs']
+
+def write_most_shards(zone, bucket_name, num_shards):
+ """
+ Write one object to most (but not all) bucket index shards.
+ """
+ objs = get_bucket_shard_objects(zone.zone, num_shards)
+ random.shuffle(objs)
+ del objs[-(len(objs)//10):]
+ for obj in objs:
+ k = new_key(zone, bucket_name, obj)
+ k.set_contents_from_string('foo')
+
+def reshard_bucket(zone, bucket_name, num_shards):
+ """
+ Reshard a bucket
+ """
+ cmd = ['bucket', 'reshard'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--num-shards', str(num_shards)]
+ cmd += ['--yes-i-really-mean-it']
+ zone.cluster.admin(cmd)
+
+def get_obj_names(zone, bucket_name, maxobjs):
+ """
+ Get names of objects in a bucket.
+ """
+ cmd = ['bucket', 'list'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--max-entries', str(maxobjs)]
+ objs_json, _ = zone.cluster.admin(cmd, read_only=True)
+ objs = json.loads(objs_json)
+ return [o['name'] for o in objs]
+
+def bucket_keys_eq(zone1, zone2, bucket_name):
+ """
+ Ensure that two buckets have the same keys, but get the lists through
+ radosgw-admin rather than S3 so it can be used when radosgw isn't running.
+ Only works for buckets of 10,000 objects since the tests calling it don't
+ need more, and the output from bucket list doesn't have an obvious marker
+ with which to continue.
+ """
+ keys1 = get_obj_names(zone1, bucket_name, 10000)
+ keys2 = get_obj_names(zone2, bucket_name, 10000)
+ for key1, key2 in zip_longest(keys1, keys2):
+ if key1 is None:
+ log.critical('key=%s is missing from zone=%s', key1.name,
+ zone1.name)
+ assert False
+ if key2 is None:
+ log.critical('key=%s is missing from zone=%s', key2.name,
+ zone2.name)
+ assert False
+
+@attr('bucket_reshard')
+def test_bucket_sync_run_basic_incremental():
+ """
+ Create several generations of objects, then run bucket sync
+ run to ensure they're all processed.
+ """
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+
+ # create a bucket write objects to it and wait for them to sync, ensuring
+ # we are in incremental.
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+ write_most_shards(primary, bucket.name, 11)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop gateways in other zones so we can rely on bucket sync run
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ secondary.zone.stop()
+
+ # build up multiple generations each with some objects written to
+ # them.
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ write_most_shards(primary, bucket.name, num_shards)
+
+ # bucket sync run on every secondary
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+
+ bucket_keys_eq(primary.zone, secondary.zone, bucket.name)
+
+ finally:
+ # Restart so bucket_checkpoint can actually fetch things from the
+ # secondaries. Put this in a finally block so they restart even on
+ # error.
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ secondary.zone.start()
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def trash_bucket(zone, bucket_name):
+ """
+ Remove objects through radosgw-admin, zapping bilog to prevent the deletes
+ from replicating.
+ """
+ objs = get_obj_names(zone, bucket_name, 10000)
+ # Delete the objects
+ for obj in objs:
+ cmd = ['object', 'rm'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--object', obj]
+ zone.cluster.admin(cmd)
+
+ # Zap the bilog
+ cmd = ['bilog', 'trim'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ zone.cluster.admin(cmd)
+
+@attr('bucket_reshard')
+def test_zap_init_bucket_sync_run():
+ """
+ Create several generations of objects, trash them, then run bucket sync init
+ and bucket sync run.
+ """
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Write zeroth generation
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+
+ # Stop gateways, trash bucket, init, sync, and restart for every secondary
+ for secondary in zonegroup_conns.rw_zones[1:]:
+ try:
+ secondary.zone.stop()
+
+ trash_bucket(secondary.zone, bucket.name)
+
+ cmd = ['bucket', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name]
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+
+ cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+
+ bucket_keys_eq(primary.zone, secondary.zone, bucket.name)
+
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_role_sync():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ roles, zone_role = create_role_per_zone(zonegroup_conns)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ for source_conn, role in zone_role:
+ for target_conn in zonegroup_conns.zones:
+ if source_conn.zone == target_conn.zone:
+ continue
+
+ check_role_eq(source_conn, target_conn, role)
+
+@attr('data_sync_init')
+def test_bucket_full_sync_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ try:
+ # stop secondary zone before it starts a bucket full sync
+ secondary.zone.stop()
+
+ # write some objects that don't sync yet
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect all objects to replicate via 'bucket full sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+@attr('bucket_reshard')
+def test_resharded_bucket_full_sync_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ try:
+ # stop secondary zone before it starts a bucket full sync
+ secondary.zone.stop()
+
+ # Write zeroth generation
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect all objects to replicate via 'bucket full sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+def test_bucket_incremental_sync_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload a dummy object and wait for sync. this forces each zone to finish
+ # a full sync and switch to incremental
+ k = new_key(primary, bucket, 'dummy')
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop secondary zone before it syncs the rest
+ secondary.zone.stop()
+
+ # Write more objects to primary
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect remaining objects to replicate via 'bucket incremental sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+@attr('bucket_reshard')
+def test_resharded_bucket_incremental_sync_latest_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Write zeroth generation to primary
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+
+ # wait for the secondary to catch up to the latest gen
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop secondary zone before it syncs the rest
+ secondary.zone.stop()
+
+ # write some more objects to the last gen
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect remaining objects in last gen to replicate via 'bucket incremental sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+@attr('bucket_reshard')
+def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Write zeroth generation to primary
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # wait for the secondary to catch up
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop secondary zone before it syncs later generations
+ secondary.zone.stop()
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect all generations to replicate via 'bucket incremental sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+def sync_info(cluster, bucket = None):
+ cmd = ['sync', 'info']
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to get sync policy'
+
+ return json.loads(result_json)
+
+def get_sync_policy(cluster, bucket = None):
+ cmd = ['sync', 'policy', 'get']
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to get sync policy'
+
+ return json.loads(result_json)
+
+def create_sync_policy_group(cluster, group, status = "allowed", bucket = None):
+ cmd = ['sync', 'group', 'create', '--group-id', group, '--status' , status]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to create sync policy group id=%s, bucket=%s' % (group, bucket)
+ return json.loads(result_json)
+
+def set_sync_policy_group_status(cluster, group, status, bucket = None):
+ cmd = ['sync', 'group', 'modify', '--group-id', group, '--status' , status]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to set sync policy group id=%s, bucket=%s' % (group, bucket)
+ return json.loads(result_json)
+
+def get_sync_policy_group(cluster, group, bucket = None):
+ cmd = ['sync', 'group', 'get', '--group-id', group]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to get sync policy group id=%s, bucket=%s' % (group, bucket)
+ return json.loads(result_json)
+
+def remove_sync_policy_group(cluster, group, bucket = None):
+ cmd = ['sync', 'group', 'remove', '--group-id', group]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket)
+ return json.loads(result_json)
+
+def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None):
+ cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to create sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket)
+ return json.loads(result_json)
+
+def create_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None):
+ cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to create sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket)
+ return json.loads(result_json)
+
+def remove_sync_group_flow_symmetrical(cluster, group, flow_id, zones = None, bucket = None):
+ cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical']
+ if zones:
+ cmd += ['--zones=%s' % zones]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to remove sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket)
+ return json.loads(result_json)
+
+def remove_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None):
+ cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones]
+ if bucket:
+ cmd += ['--bucket', bucket]
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to remove sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket)
+ return json.loads(result_json)
+
+def create_sync_group_pipe(cluster, group, pipe_id, src_zones, dest_zones, bucket = None, args = []):
+ cmd = ['sync', 'group', 'pipe', 'create', '--group-id', group, '--pipe-id' , pipe_id, '--source-zones=%s' % src_zones, '--dest-zones=%s' % dest_zones]
+ if bucket:
+ b_args = '--bucket=' + bucket
+ cmd.append(b_args)
+ if args:
+ cmd += args
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to create sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket)
+ return json.loads(result_json)
+
+def remove_sync_group_pipe(cluster, group, pipe_id, bucket = None, args = None):
+ cmd = ['sync', 'group', 'pipe', 'remove', '--group-id', group, '--pipe-id' , pipe_id]
+ if bucket:
+ b_args = '--bucket=' + bucket
+ cmd.append(b_args)
+ if args:
+ cmd.append(args)
+ (result_json, retcode) = cluster.admin(cmd)
+ if retcode != 0:
+ assert False, 'failed to remove sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket)
+ return json.loads(result_json)
+
+def create_zone_bucket(zone):
+ b_name = gen_bucket_name()
+ log.info('create bucket zone=%s name=%s', zone.name, b_name)
+ bucket = zone.create_bucket(b_name)
+ return bucket
+
+def create_object(zone_conn, bucket, objname, content):
+ k = new_key(zone_conn, bucket.name, objname)
+ k.set_contents_from_string(content)
+
+def create_objects(zone_conn, bucket, obj_arr, content):
+ for objname in obj_arr:
+ create_object(zone_conn, bucket, objname, content)
+
+def check_object_exists(bucket, objname, content = None):
+ k = bucket.get_key(objname)
+ assert_not_equal(k, None)
+ if (content != None):
+ assert_equal(k.get_contents_as_string(encoding='ascii'), content)
+
+def check_objects_exist(bucket, obj_arr, content = None):
+ for objname in obj_arr:
+ check_object_exists(bucket, objname, content)
+
+def check_object_not_exists(bucket, objname):
+ k = bucket.get_key(objname)
+ assert_equal(k, None)
+
+def check_objects_not_exist(bucket, obj_arr):
+ for objname in obj_arr:
+ check_object_not_exists(bucket, objname)
+
+@attr('sync_policy')
+def test_sync_policy_config_zonegroup():
+ """
+ test_sync_policy_config_zonegroup:
+ test configuration of all sync commands
+ """
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ z1, z2 = zonegroup.zones[0:2]
+ c1, c2 = (z1.cluster, z2.cluster)
+
+ zones = z1.name+","+z2.name
+
+ c1.admin(['sync', 'policy', 'get'])
+
+ # (a) zonegroup level
+ create_sync_policy_group(c1, "sync-group")
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ get_sync_policy_group(c1, "sync-group")
+
+ get_sync_policy(c1)
+
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
+
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ get_sync_policy_group(c1, "sync-group")
+
+ zonegroup.period.update(z1, commit=True)
+
+ # (b) bucket level
+ zc1, zc2 = zonegroup_conns.zones[0:2]
+ bucket = create_zone_bucket(zc1)
+ bucket_name = bucket.name
+
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name)
+ get_sync_policy_group(c1, "sync-bucket", bucket_name)
+
+ get_sync_policy(c1, bucket_name)
+
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+ create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name)
+ get_sync_policy_group(c1, "sync-bucket", bucket_name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name)
+ remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name)
+ remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name)
+ remove_sync_policy_group(c1, "sync-bucket", bucket_name)
+
+ get_sync_policy(c1, bucket_name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name)
+ remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1")
+ remove_sync_policy_group(c1, "sync-group")
+
+ get_sync_policy(c1)
+
+ zonegroup.period.update(z1, commit=True)
+
+ return
+
+@attr('sync_policy')
+def test_sync_flow_symmetrical_zonegroup_all():
+ """
+ test_sync_flow_symmetrical_zonegroup_all:
+ allows sync from all the zones to all other zones (default case)
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ c1.admin(['sync', 'policy', 'get'])
+
+ zones = zoneA.name + ',' + zoneB.name
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ objnames = [ 'obj1', 'obj2' ]
+ content = 'asdasd'
+ buckets = []
+
+ # create bucket & object in all zones
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ create_object(zcA, bucketA, objnames[0], content)
+
+ bucketB = create_zone_bucket(zcB)
+ buckets.append(bucketB)
+ create_object(zcB, bucketB, objnames[1], content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ # 'zonegroup_data_checkpoint' currently fails for the zones not
+ # allowed to sync. So as a workaround, data checkpoint is done
+ # for only the ones configured.
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify if objects are synced accross the zone
+ bucket = get_bucket(zcB, bucketA.name)
+ check_object_exists(bucket, objnames[0], content)
+
+ bucket = get_bucket(zcA, bucketB.name)
+ check_object_exists(bucket, objnames[1], content)
+
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+def test_sync_flow_symmetrical_zonegroup_select():
+ """
+ test_sync_flow_symmetrical_zonegroup_select:
+ allow sync between zoneA & zoneB
+ verify zoneC doesnt sync the data
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ if len(zonegroup.zones) < 3:
+ raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+ (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ buckets = []
+ content = 'asdasd'
+
+ # create bucketA & objects in zoneA
+ objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ create_objects(zcA, bucketA, objnamesA, content)
+
+ # create bucketB & objects in zoneB
+ objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+ bucketB = create_zone_bucket(zcB)
+ buckets.append(bucketB)
+ create_objects(zcB, bucketB, objnamesB, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+ zone_data_checkpoint(zoneA, zoneB)
+
+ # verify if objnamesA synced to only zoneB but not zoneC
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesA, content)
+
+ bucket = get_bucket(zcC, bucketA.name)
+ check_objects_not_exist(bucket, objnamesA)
+
+ # verify if objnamesB synced to only zoneA but not zoneC
+ bucket = get_bucket(zcA, bucketB.name)
+ check_objects_exist(bucket, objnamesB, content)
+
+ bucket = get_bucket(zcC, bucketB.name)
+ check_objects_not_exist(bucket, objnamesB)
+
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+def test_sync_flow_directional_zonegroup_select():
+ """
+ test_sync_flow_directional_zonegroup_select:
+ allow sync from only zoneA to zoneB
+
+ verify that data doesn't get synced to zoneC and
+ zoneA shouldn't sync data from zoneB either
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ if len(zonegroup.zones) < 3:
+ raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.")
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ (zoneA, zoneB, zoneC) = zonegroup.zones[0:3]
+ (zcA, zcB, zcC) = zonegroup_conns.zones[0:3]
+
+ c1 = zoneA.cluster
+
+ # configure sync policy
+ zones = zoneA.name + ',' + zoneB.name
+ c1.admin(['sync', 'policy', 'get'])
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ buckets = []
+ content = 'asdasd'
+
+ # create bucketA & objects in zoneA
+ objnamesA = [ 'obj1', 'obj2', 'obj3' ]
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ create_objects(zcA, bucketA, objnamesA, content)
+
+ # create bucketB & objects in zoneB
+ objnamesB = [ 'obj4', 'obj5', 'obj6' ]
+ bucketB = create_zone_bucket(zcB)
+ buckets.append(bucketB)
+ create_objects(zcB, bucketB, objnamesB, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify if objnamesA synced to only zoneB but not zoneC
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesA, content)
+
+ bucket = get_bucket(zcC, bucketA.name)
+ check_objects_not_exist(bucket, objnamesA)
+
+ # verify if objnamesB are not synced to either zoneA or zoneC
+ bucket = get_bucket(zcA, bucketB.name)
+ check_objects_not_exist(bucket, objnamesB)
+
+ bucket = get_bucket(zcC, bucketB.name)
+ check_objects_not_exist(bucket, objnamesB)
+
+ """
+ verify the same at bucketA level
+ configure another policy at bucketA level with src and dest
+ zones specified to zoneA and zoneB resp.
+
+ verify zoneA bucketA syncs to zoneB BucketA but not viceversa.
+ """
+ # reconfigure zonegroup pipe & flow
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ args = ['--source-bucket=*', '--dest-bucket=*']
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # create objects in bucketA in zoneA and zoneB
+ objnamesC = [ 'obj7', 'obj8', 'obj9' ]
+ objnamesD = [ 'obj10', 'obj11', 'obj12' ]
+ create_objects(zcA, bucketA, objnamesC, content)
+ create_objects(zcB, bucketA, objnamesD, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objnamesC are synced to bucketA in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesC, content)
+
+ # verify that objnamesD are not synced to bucketA in zoneA
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnamesD)
+
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+def test_sync_single_bucket():
+ """
+ test_sync_single_bucket:
+ Allow data sync for only bucketA but not for other buckets via
+ below 2 methods
+
+ (a) zonegroup: symmetrical flow but configure pipe for only bucketA.
+ (b) bucket level: configure policy for bucketA
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+
+ c1 = zoneA.cluster
+
+ c1.admin(['sync', 'policy', 'get'])
+
+ zones = zoneA.name + ',' + zoneB.name
+ get_sync_policy(c1)
+
+ objnames = [ 'obj1', 'obj2', 'obj3' ]
+ content = 'asdasd'
+ buckets = []
+
+ # create bucketA & bucketB in zoneA
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ bucketB = create_zone_bucket(zcA)
+ buckets.append(bucketB)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ """
+ Method (a): configure pipe for only bucketA
+ """
+ # configure sync policy & pipe for only bucketA
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name]
+
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ get_sync_policy(c1)
+ zonegroup.period.update(zoneA, commit=True)
+
+ sync_info(c1)
+
+ # create objects in bucketA & bucketB
+ create_objects(zcA, bucketA, objnames, content)
+ create_object(zcA, bucketB, objnames, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify if bucketA objects are synced
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnames, content)
+
+ # bucketB objects should not be synced
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_not_exist(bucket, objnames)
+
+
+ """
+ Method (b): configure policy at only bucketA level
+ """
+ # reconfigure group pipe
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ # create object in bucketA
+ create_object(zcA, bucketA, objnames[2], content)
+
+ # create object in bucketA too
+ create_object(zcA, bucketB, objnames[2], content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify if bucketA objects are synced
+ bucket = get_bucket(zcB, bucketA.name)
+ check_object_exists(bucket, objnames[2], content)
+
+ # bucketB objects should not be synced
+ bucket = get_bucket(zcB, bucketB.name)
+ check_object_not_exists(bucket, objnames[2])
+
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+def test_sync_different_buckets():
+ """
+ test_sync_different_buckets:
+ sync zoneA bucketA to zoneB bucketB via below methods
+
+ (a) zonegroup: directional flow but configure pipe for zoneA bucketA to zoneB bucketB
+ (b) bucket: configure another policy at bucketA level with pipe set to
+ another bucket(bucketB) in target zone.
+
+ sync zoneA bucketA from zoneB bucketB
+ (c) configure another policy at bucketA level with pipe set from
+ another bucket(bucketB) in source zone.
+
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+ zones = zoneA.name + ',' + zoneB.name
+
+ c1 = zoneA.cluster
+
+ c1.admin(['sync', 'policy', 'get'])
+
+ objnames = [ 'obj1', 'obj2' ]
+ objnamesB = [ 'obj3', 'obj4' ]
+ content = 'asdasd'
+ buckets = []
+
+ # create bucketA & bucketB in zoneA
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ bucketB = create_zone_bucket(zcA)
+ buckets.append(bucketB)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ """
+ Method (a): zonegroup - configure pipe for only bucketA
+ """
+ # configure pipe from zoneA bucketA to zoneB bucketB
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketB.name]
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # create objects in bucketA
+ create_objects(zcA, bucketA, objnames, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objects are synced to bucketB in zoneB
+ # but not to bucketA
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_not_exist(bucket, objnames)
+
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_exist(bucket, objnames, content)
+ """
+ Method (b): configure policy at only bucketA level with pipe
+ set to bucketB in target zone
+ """
+
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ args = ['--source-bucket=*', '--dest-bucket=' + bucketB.name]
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipeA", zones, zones, bucketA.name, args)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ objnamesC = [ 'obj5', 'obj6' ]
+
+ zonegroup_meta_checkpoint(zonegroup)
+ # create objects in bucketA
+ create_objects(zcA, bucketA, objnamesC, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ """
+ # verify that objects are synced to bucketB in zoneB
+ # but not to bucketA
+ """
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_not_exist(bucket, objnamesC)
+
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_exist(bucket, objnamesC, content)
+
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ zonegroup_meta_checkpoint(zonegroup)
+ get_sync_policy(c1, bucketA.name)
+
+ """
+ Method (c): configure policy at only bucketA level with pipe
+ set from bucketB in source zone
+ verify zoneA bucketA syncs from zoneB BucketB but not bucketA
+ """
+
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ args = ['--source-bucket=' + bucketB.name, '--dest-bucket=' + '*']
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args)
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1, bucketA.name)
+
+ # create objects in bucketA & B in ZoneB
+ objnamesD = [ 'obj7', 'obj8' ]
+ objnamesE = [ 'obj9', 'obj10' ]
+
+ create_objects(zcB, bucketA, objnamesD, content)
+ create_objects(zcB, bucketB, objnamesE, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneA, zoneB)
+ """
+ # verify that objects from only bucketB are synced to
+ # bucketA in zoneA
+ """
+ bucket = get_bucket(zcA, bucketA.name)
+ check_objects_not_exist(bucket, objnamesD)
+ check_objects_exist(bucket, objnamesE, content)
+
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+def test_sync_multiple_buckets_to_single():
+ """
+ test_sync_multiple_buckets_to_single:
+ directional flow
+ (a) pipe: sync zoneA bucketA,bucketB to zoneB bucketB
+
+ (b) configure another policy at bucketA level with pipe configured
+ to sync from multiple buckets (bucketA & bucketB)
+
+ verify zoneA bucketA & bucketB syncs to zoneB BucketB
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+ zones = zoneA.name + ',' + zoneB.name
+
+ c1 = zoneA.cluster
+
+ c1.admin(['sync', 'policy', 'get'])
+
+ objnamesA = [ 'obj1', 'obj2' ]
+ objnamesB = [ 'obj3', 'obj4' ]
+ content = 'asdasd'
+ buckets = []
+
+ # create bucketA & bucketB in zoneA
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ bucketB = create_zone_bucket(zcA)
+ buckets.append(bucketB)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # configure pipe from zoneA bucketA,bucketB to zoneB bucketB
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+ source_buckets = [ bucketA.name, bucketB.name ]
+ for source_bucket in source_buckets:
+ args = ['--source-bucket=' + source_bucket, '--dest-bucket=' + bucketB.name]
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, None, args)
+
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # create objects in bucketA & bucketB
+ create_objects(zcA, bucketA, objnamesA, content)
+ create_objects(zcA, bucketB, objnamesB, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that both zoneA bucketA & bucketB objects are synced to
+ # bucketB in zoneB but not to bucketA
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_not_exist(bucket, objnamesA)
+ check_objects_not_exist(bucket, objnamesB)
+
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_exist(bucket, objnamesA, content)
+ check_objects_exist(bucket, objnamesB, content)
+
+ """
+ Method (b): configure at bucket level
+ """
+ # reconfigure pipe & flow
+ for source_bucket in source_buckets:
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket)
+ remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name)
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones)
+
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ objnamesC = [ 'obj5', 'obj6' ]
+ objnamesD = [ 'obj7', 'obj8' ]
+
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ source_buckets = [ bucketA.name, bucketB.name ]
+ for source_bucket in source_buckets:
+ args = ['--source-bucket=' + source_bucket, '--dest-bucket=' + '*']
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, bucketA.name, args)
+
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ # create objects in bucketA
+ create_objects(zcA, bucketA, objnamesC, content)
+ create_objects(zcA, bucketB, objnamesD, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that both zoneA bucketA & bucketB objects are synced to
+ # bucketA in zoneB but not to bucketB
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_not_exist(bucket, objnamesC)
+ check_objects_not_exist(bucket, objnamesD)
+
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesD, content)
+ check_objects_exist(bucket, objnamesD, content)
+
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ remove_sync_policy_group(c1, "sync-group")
+ return
+
+@attr('sync_policy')
+def test_sync_single_bucket_to_multiple():
+ """
+ test_sync_single_bucket_to_multiple:
+ directional flow
+ (a) pipe: sync zoneA bucketA to zoneB bucketA & bucketB
+
+ (b) configure another policy at bucketA level with pipe configured
+ to sync to multiple buckets (bucketA & bucketB)
+
+ verify zoneA bucketA syncs to zoneB bucketA & bucketB
+ """
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ (zoneA, zoneB) = zonegroup.zones[0:2]
+ (zcA, zcB) = zonegroup_conns.zones[0:2]
+ zones = zoneA.name + ',' + zoneB.name
+
+ c1 = zoneA.cluster
+
+ c1.admin(['sync', 'policy', 'get'])
+
+ objnamesA = [ 'obj1', 'obj2' ]
+ content = 'asdasd'
+ buckets = []
+
+ # create bucketA & bucketB in zoneA
+ bucketA = create_zone_bucket(zcA)
+ buckets.append(bucketA)
+ bucketB = create_zone_bucket(zcA)
+ buckets.append(bucketB)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # configure pipe from zoneA bucketA to zoneB bucketA, bucketB
+ create_sync_policy_group(c1, "sync-group")
+ create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones)
+
+ dest_buckets = [ bucketA.name, bucketB.name ]
+ for dest_bucket in dest_buckets:
+ args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + dest_bucket]
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, None, args)
+
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args)
+ set_sync_policy_group_status(c1, "sync-group", "enabled")
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ # create objects in bucketA
+ create_objects(zcA, bucketA, objnamesA, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objects from zoneA bucketA are synced to both
+ # bucketA & bucketB in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesA, content)
+
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_exist(bucket, objnamesA, content)
+
+ """
+ Method (b): configure at bucket level
+ """
+ remove_sync_group_pipe(c1, "sync-group", "sync-pipe")
+ create_sync_group_pipe(c1, "sync-group", "sync-pipe", '*', '*')
+
+ # change state to allowed
+ set_sync_policy_group_status(c1, "sync-group", "allowed")
+
+ zonegroup.period.update(zoneA, commit=True)
+ get_sync_policy(c1)
+
+ objnamesB = [ 'obj3', 'obj4' ]
+
+ # configure sync policy for only bucketA and enable it
+ create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name)
+ create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name)
+ dest_buckets = [ bucketA.name, bucketB.name ]
+ for dest_bucket in dest_buckets:
+ args = ['--source-bucket=' + '*', '--dest-bucket=' + dest_bucket]
+ create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, bucketA.name, args)
+
+ set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name)
+
+ get_sync_policy(c1)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ # create objects in bucketA
+ create_objects(zcA, bucketA, objnamesB, content)
+
+ zonegroup_meta_checkpoint(zonegroup)
+ zone_data_checkpoint(zoneB, zoneA)
+
+ # verify that objects from zoneA bucketA are synced to both
+ # bucketA & bucketB in zoneB
+ bucket = get_bucket(zcB, bucketA.name)
+ check_objects_exist(bucket, objnamesB, content)
+
+ bucket = get_bucket(zcB, bucketB.name)
+ check_objects_exist(bucket, objnamesB, content)
+
+ remove_sync_policy_group(c1, "sync-bucket", bucketA.name)
+ remove_sync_policy_group(c1, "sync-group")
+ return
diff --git a/src/test/rgw/rgw_multi/tests_az.py b/src/test/rgw/rgw_multi/tests_az.py
new file mode 100644
index 000000000..13ec832a2
--- /dev/null
+++ b/src/test/rgw/rgw_multi/tests_az.py
@@ -0,0 +1,597 @@
+import logging
+
+from nose import SkipTest
+from nose.tools import assert_not_equal, assert_equal
+
+from boto.s3.deletemarker import DeleteMarker
+
+from .tests import get_realm, \
+ ZonegroupConns, \
+ zonegroup_meta_checkpoint, \
+ zone_meta_checkpoint, \
+ zone_bucket_checkpoint, \
+ zone_data_checkpoint, \
+ zonegroup_bucket_checkpoint, \
+ check_bucket_eq, \
+ gen_bucket_name, \
+ get_user, \
+ get_tenant
+
+from .zone_az import print_connection_info
+
+
+# configure logging for the tests module
+log = logging.getLogger(__name__)
+
+
+##########################################
+# utility functions for archive zone tests
+##########################################
+
+def check_az_configured():
+ """check if at least one archive zone exist"""
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ az_zones = zonegroup.zones_by_type.get("archive")
+ if az_zones is None or len(az_zones) != 1:
+ raise SkipTest("Requires one archive zone")
+
+
+def is_az_zone(zone_conn):
+ """check if a specific zone is archive zone"""
+ if not zone_conn:
+ return False
+ return zone_conn.zone.tier_type() == "archive"
+
+
+def init_env():
+ """initialize the environment"""
+ check_az_configured()
+
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ az_zones = []
+ zones = []
+ for conn in zonegroup_conns.zones:
+ if is_az_zone(conn):
+ zone_meta_checkpoint(conn.zone)
+ az_zones.append(conn)
+ elif not conn.zone.is_read_only():
+ zones.append(conn)
+
+ assert_not_equal(len(zones), 0)
+ assert_not_equal(len(az_zones), 0)
+ return zones, az_zones
+
+
+def zone_full_checkpoint(target_zone, source_zone):
+ zone_meta_checkpoint(target_zone)
+ zone_data_checkpoint(target_zone, source_zone)
+
+
+def check_bucket_exists_on_zone(zone, bucket_name):
+ try:
+ zone.conn.get_bucket(bucket_name)
+ except:
+ return False
+ return True
+
+
+def check_key_exists(key):
+ try:
+ key.get_contents_as_string()
+ except:
+ return False
+ return True
+
+
+def get_versioning_status(bucket):
+ res = bucket.get_versioning_status()
+ key = 'Versioning'
+ if not key in res:
+ return None
+ else:
+ return res[key]
+
+
+def get_versioned_objs(bucket):
+ b = []
+ for b_entry in bucket.list_versions():
+ if isinstance(b_entry, DeleteMarker):
+ continue
+ d = {}
+ d['version_id'] = b_entry.version_id
+ d['size'] = b_entry.size
+ d['etag'] = b_entry.etag
+ d['is_latest'] = b_entry.is_latest
+ b.append({b_entry.key:d})
+ return b
+
+
+def get_versioned_entries(bucket):
+ dm = []
+ ver = []
+ for b_entry in bucket.list_versions():
+ if isinstance(b_entry, DeleteMarker):
+ d = {}
+ d['version_id'] = b_entry.version_id
+ d['is_latest'] = b_entry.is_latest
+ dm.append({b_entry.name:d})
+ else:
+ d = {}
+ d['version_id'] = b_entry.version_id
+ d['size'] = b_entry.size
+ d['etag'] = b_entry.etag
+ d['is_latest'] = b_entry.is_latest
+ ver.append({b_entry.key:d})
+ return (dm, ver)
+
+
+def get_number_buckets_by_zone(zone):
+ return len(zone.conn.get_all_buckets())
+
+
+def get_bucket_names_by_zone(zone):
+ return [b.name for b in zone.conn.get_all_buckets()]
+
+
+def get_full_bucket_name(partial_bucket_name, bucket_names_az):
+ full_bucket_name = None
+ for bucket_name in bucket_names_az:
+ if bucket_name.startswith(partial_bucket_name):
+ full_bucket_name = bucket_name
+ break
+ return full_bucket_name
+
+
+####################
+# archive zone tests
+####################
+
+
+def test_az_info():
+ """ log information for manual testing """
+ return SkipTest("only used in manual testing")
+ zones, az_zones = init_env()
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ # create bucket on the first of the rados zones
+ bucket = zones[0].create_bucket(bucket_name)
+ # create objects in the bucket
+ number_of_objects = 3
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ key.set_contents_from_string('bar')
+ print('Zonegroup: ' + zonegroup.name)
+ print('user: ' + get_user())
+ print('tenant: ' + get_tenant())
+ print('Master Zone')
+ print_connection_info(zones[0].conn)
+ print('Archive Zone')
+ print_connection_info(az_zones[0].conn)
+ print('Bucket: ' + bucket_name)
+
+
+def test_az_create_empty_bucket():
+ """ test empty bucket replication """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create bucket on the non archive zone
+ zones[0].create_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # bucket exist on the archive zone
+ p = check_bucket_exists_on_zone(az_zones[0], bucket_name)
+ assert_equal(p, True)
+
+
+def test_az_check_empty_bucket_versioning():
+ """ test bucket vesioning with empty bucket """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create bucket on the non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # get bucket on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ # check for non bucket versioning
+ p1 = get_versioning_status(bucket) is None
+ assert_equal(p1, True)
+ p2 = get_versioning_status(bucket_az) is None
+ assert_equal(p2, True)
+
+
+def test_az_object_replication():
+ """ test object replication """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create bucket on the non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("bar")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check object on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ key_az = bucket_az.get_key("foo")
+ p1 = key_az.get_contents_as_string(encoding='ascii') == "bar"
+ assert_equal(p1, True)
+
+
+def test_az_object_replication_versioning():
+ """ test object replication versioning """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create object on the non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("bar")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check object content on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ key_az = bucket_az.get_key("foo")
+ p1 = key_az.get_contents_as_string(encoding='ascii') == "bar"
+ assert_equal(p1, True)
+ # grab object versioning and etag
+ for b_version in bucket.list_versions():
+ b_version_id = b_version.version_id
+ b_version_etag = b_version.etag
+ for b_az_version in bucket_az.list_versions():
+ b_az_version_id = b_az_version.version_id
+ b_az_version_etag = b_az_version.etag
+ # check
+ p2 = b_version_id == 'null'
+ assert_equal(p2, True)
+ p3 = b_az_version_id != 'null'
+ assert_equal(p3, True)
+ p4 = b_version_etag == b_az_version_etag
+ assert_equal(p4, True)
+
+
+def test_az_lazy_activation_of_versioned_bucket():
+ """ test lazy activation of versioned bucket """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create object on the non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # get bucket on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ # check for non bucket versioning
+ p1 = get_versioning_status(bucket) is None
+ assert_equal(p1, True)
+ p2 = get_versioning_status(bucket_az) is None
+ assert_equal(p2, True)
+ # create object on non archive zone
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("bar")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check lazy versioned buckets
+ p3 = get_versioning_status(bucket) is None
+ assert_equal(p3, True)
+ p4 = get_versioning_status(bucket_az) == 'Enabled'
+ assert_equal(p4, True)
+
+
+def test_az_archive_zone_double_object_replication_versioning():
+ """ test archive zone double object replication versioning """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create object on the non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("bar")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # get bucket on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ # check for non bucket versioning
+ p1 = get_versioning_status(bucket) is None
+ assert_equal(p1, True)
+ p2 = get_versioning_status(bucket_az) == 'Enabled'
+ assert_equal(p2, True)
+ # overwrite object on non archive zone
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("ouch")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check lazy versioned buckets
+ p3 = get_versioning_status(bucket) is None
+ assert_equal(p3, True)
+ p4 = get_versioning_status(bucket_az) == 'Enabled'
+ assert_equal(p4, True)
+ # get versioned objects
+ objs = get_versioned_objs(bucket)
+ objs_az = get_versioned_objs(bucket_az)
+ # check version_id, size, and is_latest on non archive zone
+ p5 = objs[0]['foo']['version_id'] == 'null'
+ assert_equal(p5, True)
+ p6 = objs[0]['foo']['size'] == 4
+ assert_equal(p6, True)
+ p7 = objs[0]['foo']['is_latest'] == True
+ assert_equal(p7, True)
+ # check version_id, size, is_latest on archive zone
+ latest_obj_az_etag = None
+ for obj_az in objs_az:
+ current_obj_az = obj_az['foo']
+ if current_obj_az['is_latest'] == True:
+ p8 = current_obj_az['size'] == 4
+ assert_equal(p8, True)
+ latest_obj_az_etag = current_obj_az['etag']
+ else:
+ p9 = current_obj_az['size'] == 3
+ assert_equal(p9, True)
+ assert_not_equal(current_obj_az['version_id'], 'null')
+ # check last versions' etags
+ p10 = objs[0]['foo']['etag'] == latest_obj_az_etag
+ assert_equal(p10, True)
+
+
+def test_az_deleted_object_replication():
+ """ test zone deleted object replication """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create object on the non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("bar")
+ p1 = key.get_contents_as_string(encoding='ascii') == "bar"
+ assert_equal(p1, True)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # update object on non archive zone
+ key.set_contents_from_string("soup")
+ p2 = key.get_contents_as_string(encoding='ascii') == "soup"
+ assert_equal(p2, True)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # delete object on non archive zone
+ key.delete()
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check object on non archive zone
+ p3 = check_key_exists(key) == False
+ assert_equal(p3, True)
+ # check objects on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ key_az = bucket_az.get_key("foo")
+ p4 = check_key_exists(key_az) == True
+ assert_equal(p4, True)
+ p5 = key_az.get_contents_as_string(encoding='ascii') == "soup"
+ assert_equal(p5, True)
+ b_ver_az = get_versioned_objs(bucket_az)
+ p6 = len(b_ver_az) == 2
+ assert_equal(p6, True)
+
+
+def test_az_bucket_renaming_on_empty_bucket_deletion():
+ """ test bucket renaming on empty bucket deletion """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # grab number of buckets on non archive zone
+ num_buckets = get_number_buckets_by_zone(zones[0])
+ # grab number of buckets on archive zone
+ num_buckets_az = get_number_buckets_by_zone(az_zones[0])
+ # create bucket on non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # delete bucket in non archive zone
+ zones[0].delete_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check no new buckets on non archive zone
+ p1 = get_number_buckets_by_zone(zones[0]) == num_buckets
+ assert_equal(p1, True)
+ # check non deletion on bucket on archive zone
+ p2 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1)
+ assert_equal(p2, True)
+ # check bucket renaming
+ bucket_names_az = get_bucket_names_by_zone(az_zones[0])
+ new_bucket_name = bucket_name + '-deleted-'
+ p3 = any(bucket_name.startswith(new_bucket_name) for bucket_name in bucket_names_az)
+ assert_equal(p3, True)
+
+
+def test_az_old_object_version_in_archive_zone():
+ """ test old object version in archive zone """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # grab number of buckets on non archive zone
+ num_buckets = get_number_buckets_by_zone(zones[0])
+ # grab number of buckets on archive zone
+ num_buckets_az = get_number_buckets_by_zone(az_zones[0])
+ # create bucket on non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ # create object on non archive zone
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("zero")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # save object version on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ b_ver_az = get_versioned_objs(bucket_az)
+ obj_az_version_id = b_ver_az[0]['foo']['version_id']
+ # update object on non archive zone
+ key.set_contents_from_string("one")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # delete object on non archive zone
+ key.delete()
+ # delete bucket on non archive zone
+ zones[0].delete_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check same buckets on non archive zone
+ p1 = get_number_buckets_by_zone(zones[0]) == num_buckets
+ assert_equal(p1, True)
+ # check for new bucket on archive zone
+ p2 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1)
+ assert_equal(p2, True)
+ # get new bucket name on archive zone
+ bucket_names_az = get_bucket_names_by_zone(az_zones[0])
+ new_bucket_name_az = get_full_bucket_name(bucket_name + '-deleted-', bucket_names_az)
+ p3 = new_bucket_name_az is not None
+ assert_equal(p3, True)
+ # check number of objects on archive zone
+ new_bucket_az = az_zones[0].conn.get_bucket(new_bucket_name_az)
+ new_b_ver_az = get_versioned_objs(new_bucket_az)
+ p4 = len(new_b_ver_az) == 2
+ assert_equal(p4, True)
+ # check versioned objects on archive zone
+ new_key_az = new_bucket_az.get_key("foo", version_id=obj_az_version_id)
+ p5 = new_key_az.get_contents_as_string(encoding='ascii') == "zero"
+ assert_equal(p5, True)
+ new_key_latest_az = new_bucket_az.get_key("foo")
+ p6 = new_key_latest_az.get_contents_as_string(encoding='ascii') == "one"
+ assert_equal(p6, True)
+
+
+def test_az_force_bucket_renaming_if_same_bucket_name():
+ """ test force bucket renaming if same bucket name """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # grab number of buckets on non archive zone
+ num_buckets = get_number_buckets_by_zone(zones[0])
+ # grab number of buckets on archive zone
+ num_buckets_az = get_number_buckets_by_zone(az_zones[0])
+ # create bucket on non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check same buckets on non archive zone
+ p1 = get_number_buckets_by_zone(zones[0]) == (num_buckets + 1)
+ assert_equal(p1, True)
+ # check for new bucket on archive zone
+ p2 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1)
+ assert_equal(p2, True)
+ # delete bucket on non archive zone
+ zones[0].delete_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check number of buckets on non archive zone
+ p3 = get_number_buckets_by_zone(zones[0]) == num_buckets
+ assert_equal(p3, True)
+ # check number of buckets on archive zone
+ p4 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1)
+ assert_equal(p4, True)
+ # get new bucket name on archive zone
+ bucket_names_az = get_bucket_names_by_zone(az_zones[0])
+ new_bucket_name_az = get_full_bucket_name(bucket_name + '-deleted-', bucket_names_az)
+ p5 = new_bucket_name_az is not None
+ assert_equal(p5, True)
+ # create bucket on non archive zone
+ _ = zones[0].create_bucket(new_bucket_name_az)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check number of buckets on non archive zone
+ p6 = get_number_buckets_by_zone(zones[0]) == (num_buckets + 1)
+ assert_equal(p6, True)
+ # check number of buckets on archive zone
+ p7 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 2)
+ assert_equal(p7, True)
+
+
+def test_az_versioning_support_in_zones():
+ """ test versioning support on zones """
+ zones, az_zones = init_env()
+ bucket_name = gen_bucket_name()
+ # create bucket on non archive zone
+ bucket = zones[0].create_bucket(bucket_name)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # get bucket on archive zone
+ bucket_az = az_zones[0].conn.get_bucket(bucket_name)
+ # check non versioned buckets
+ p1 = get_versioning_status(bucket) is None
+ assert_equal(p1, True)
+ p2 = get_versioning_status(bucket_az) is None
+ assert_equal(p2, True)
+ # create object on non archive zone
+ key = bucket.new_key("foo")
+ key.set_contents_from_string("zero")
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check bucket versioning
+ p3 = get_versioning_status(bucket) is None
+ assert_equal(p3, True)
+ p4 = get_versioning_status(bucket_az) == 'Enabled'
+ assert_equal(p4, True)
+ # enable bucket versioning on non archive zone
+ bucket.configure_versioning(True)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check bucket versioning
+ p5 = get_versioning_status(bucket) == 'Enabled'
+ assert_equal(p5, True)
+ p6 = get_versioning_status(bucket_az) == 'Enabled'
+ assert_equal(p6, True)
+ # delete object on non archive zone
+ key.delete()
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check delete-markers and versions on non archive zone
+ (b_dm, b_ver) = get_versioned_entries(bucket)
+ p7 = len(b_dm) == 1
+ assert_equal(p7, True)
+ p8 = len(b_ver) == 1
+ assert_equal(p8, True)
+ # check delete-markers and versions on archive zone
+ (b_dm_az, b_ver_az) = get_versioned_entries(bucket_az)
+ p9 = len(b_dm_az) == 1
+ assert_equal(p9, True)
+ p10 = len(b_ver_az) == 1
+ assert_equal(p10, True)
+ # delete delete-marker on non archive zone
+ dm_version_id = b_dm[0]['foo']['version_id']
+ bucket.delete_key("foo", version_id=dm_version_id)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check delete-markers and versions on non archive zone
+ (b_dm, b_ver) = get_versioned_entries(bucket)
+ p11 = len(b_dm) == 0
+ assert_equal(p11, True)
+ p12 = len(b_ver) == 1
+ assert_equal(p12, True)
+ # check delete-markers and versions on archive zone
+ (b_dm_az, b_ver_az) = get_versioned_entries(bucket_az)
+ p13 = len(b_dm_az) == 1
+ assert_equal(p13, True)
+ p14 = len(b_ver_az) == 1
+ assert_equal(p14, True)
+ # delete delete-marker on archive zone
+ dm_az_version_id = b_dm_az[0]['foo']['version_id']
+ bucket_az.delete_key("foo", version_id=dm_az_version_id)
+ # sync
+ zone_full_checkpoint(az_zones[0].zone, zones[0].zone)
+ # check delete-markers and versions on non archive zone
+ (b_dm, b_ver) = get_versioned_entries(bucket)
+ p15 = len(b_dm) == 0
+ assert_equal(p15, True)
+ p16 = len(b_ver) == 1
+ assert_equal(p16, True)
+ # check delete-markers and versions on archive zone
+ (b_dm_az, b_ver_az) = get_versioned_entries(bucket_az)
+ p17 = len(b_dm_az) == 0
+ assert_equal(p17, True)
+ p17 = len(b_ver_az) == 1
+ assert_equal(p17, True)
+ # check body in zones
+ obj_version_id = b_ver[0]['foo']['version_id']
+ key = bucket.get_key("foo", version_id=obj_version_id)
+ p18 = key.get_contents_as_string(encoding='ascii') == "zero"
+ assert_equal(p18, True)
+ obj_az_version_id = b_ver_az[0]['foo']['version_id']
+ key_az = bucket_az.get_key("foo", version_id=obj_az_version_id)
+ p19 = key_az.get_contents_as_string(encoding='ascii') == "zero"
+ assert_equal(p19, True)
diff --git a/src/test/rgw/rgw_multi/tests_es.py b/src/test/rgw/rgw_multi/tests_es.py
new file mode 100644
index 000000000..08c11718b
--- /dev/null
+++ b/src/test/rgw/rgw_multi/tests_es.py
@@ -0,0 +1,276 @@
+import json
+import logging
+
+import boto
+import boto.s3.connection
+
+import datetime
+import dateutil
+
+from itertools import zip_longest # type: ignore
+
+from nose.tools import eq_ as eq
+
+from .multisite import *
+from .tests import *
+from .zone_es import *
+
+log = logging.getLogger(__name__)
+
+
+def check_es_configured():
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ es_zones = zonegroup.zones_by_type.get("elasticsearch")
+ if not es_zones:
+ raise SkipTest("Requires at least one ES zone")
+
+def is_es_zone(zone_conn):
+ if not zone_conn:
+ return False
+
+ return zone_conn.zone.tier_type() == "elasticsearch"
+
+def verify_search(bucket_name, src_keys, result_keys, f):
+ check_keys = []
+ for k in src_keys:
+ if bucket_name:
+ if bucket_name != k.bucket.name:
+ continue
+ if f(k):
+ check_keys.append(k)
+ check_keys.sort(key = lambda l: (l.bucket.name, l.name, l.version_id))
+
+ log.debug('check keys:' + dump_json(check_keys))
+ log.debug('result keys:' + dump_json(result_keys))
+
+ for k1, k2 in zip_longest(check_keys, result_keys):
+ assert k1
+ assert k2
+ check_object_eq(k1, k2)
+
+def do_check_mdsearch(conn, bucket, src_keys, req_str, src_filter):
+ if bucket:
+ bucket_name = bucket.name
+ else:
+ bucket_name = ''
+ req = MDSearch(conn, bucket_name, req_str)
+ result_keys = req.search(sort_key = lambda k: (k.bucket.name, k.name, k.version_id))
+ verify_search(bucket_name, src_keys, result_keys, src_filter)
+
+def init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = None):
+ check_es_configured()
+
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns, buckets_per_zone = buckets_per_zone)
+
+ if bucket_init_cb:
+ for zone_conn, bucket in zone_bucket:
+ bucket_init_cb(zone_conn, bucket)
+
+ src_keys = []
+
+ owner = None
+
+ obj_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
+
+ # don't wait for meta sync just yet
+ for zone, bucket in zone_bucket:
+ for count in range(num_keys):
+ objname = obj_prefix + str(count)
+ k = new_key(zone, bucket.name, objname)
+ # k.set_contents_from_string(content + 'x' * count)
+ if not create_obj:
+ continue
+
+ create_obj(k, count)
+
+ if not owner:
+ for list_key in bucket.list_versions():
+ owner = list_key.owner
+ break
+
+ k = bucket.get_key(k.name, version_id = k.version_id)
+ k.owner = owner # owner is not set when doing get_key()
+
+ src_keys.append(k)
+
+ zonegroup_meta_checkpoint(zonegroup)
+
+ sources = []
+ targets = []
+ for target_conn in zonegroup_conns.zones:
+ if not is_es_zone(target_conn):
+ sources.append(target_conn)
+ continue
+
+ targets.append(target_conn)
+
+ buckets = []
+ # make sure all targets are synced
+ for source_conn, bucket in zone_bucket:
+ buckets.append(bucket)
+ for target_conn in targets:
+ zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
+
+ return targets, sources, buckets, src_keys
+
+def test_es_object_search():
+ min_size = 10
+ content = 'a' * min_size
+
+ def create_obj(k, i):
+ k.set_contents_from_string(content + 'x' * i)
+
+ targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 2)
+
+ for target_conn in targets:
+
+ # bucket checks
+ for bucket in buckets:
+ # check name
+ do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name)
+
+ # check on all buckets
+ for key in src_keys:
+ # limiting to checking specific key name, otherwise could get results from
+ # other runs / tests
+ do_check_mdsearch(target_conn.conn, None, src_keys , 'name == ' + key.name, lambda k: k.name == key.name)
+
+ # check on specific bucket
+ for bucket in buckets:
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name < ' + key.name, lambda k: k.name < key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name <= ' + key.name, lambda k: k.name <= key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + key.name, lambda k: k.name == key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name >= ' + key.name, lambda k: k.name >= key.name)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name > ' + key.name, lambda k: k.name > key.name)
+
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + src_keys[0].name + ' or name >= ' + src_keys[2].name,
+ lambda k: k.name == src_keys[0].name or k.name >= src_keys[2].name)
+
+ # check etag
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag < ' + key.etag[1:-1], lambda k: k.etag < key.etag)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag == ' + key.etag[1:-1], lambda k: k.etag == key.etag)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag > ' + key.etag[1:-1], lambda k: k.etag > key.etag)
+
+ # check size
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size < ' + str(key.size), lambda k: k.size < key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size <= ' + str(key.size), lambda k: k.size <= key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size == ' + str(key.size), lambda k: k.size == key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size >= ' + str(key.size), lambda k: k.size >= key.size)
+ for key in src_keys:
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size > ' + str(key.size), lambda k: k.size > key.size)
+
+def date_from_str(s):
+ return dateutil.parser.parse(s)
+
+def test_es_object_search_custom():
+ min_size = 10
+ content = 'a' * min_size
+
+ def bucket_init(zone_conn, bucket):
+ req = MDSearchConfig(zone_conn.conn, bucket.name)
+ req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date')
+
+ def create_obj(k, i):
+ date = datetime.datetime.now() + datetime.timedelta(seconds=1) * i
+ date_str = date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
+ k.set_contents_from_string(content + 'x' * i, headers = { 'X-Amz-Meta-Foo-Str': str(i * 5),
+ 'X-Amz-Meta-Foo-Int': str(i * 5),
+ 'X-Amz-Meta-Foo-Date': date_str})
+
+ targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init)
+
+
+ for target_conn in targets:
+
+ # bucket checks
+ for bucket in buckets:
+ str_vals = []
+ for key in src_keys:
+ # check string values
+ val = key.get_metadata('foo-str')
+ str_vals.append(val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str < ' + val, lambda k: k.get_metadata('foo-str') < val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + val, lambda k: k.get_metadata('foo-str') <= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str == ' + val, lambda k: k.get_metadata('foo-str') == val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + val, lambda k: k.get_metadata('foo-str') >= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str > ' + val, lambda k: k.get_metadata('foo-str') > val)
+
+ # check int values
+ sval = key.get_metadata('foo-int')
+ val = int(sval)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int < ' + sval, lambda k: int(k.get_metadata('foo-int')) < val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int <= ' + sval, lambda k: int(k.get_metadata('foo-int')) <= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int == ' + sval, lambda k: int(k.get_metadata('foo-int')) == val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int >= ' + sval, lambda k: int(k.get_metadata('foo-int')) >= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int > ' + sval, lambda k: int(k.get_metadata('foo-int')) > val)
+
+ # check int values
+ sval = key.get_metadata('foo-date')
+ val = date_from_str(sval)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date < ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) < val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date <= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) <= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date == ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) == val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date >= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) >= val)
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date > ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) > val)
+
+ # 'or' query
+ for i in range(len(src_keys) // 2):
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + str_vals[i] + ' or x-amz-meta-foo-str >= ' + str_vals[-i],
+ lambda k: k.get_metadata('foo-str') <= str_vals[i] or k.get_metadata('foo-str') >= str_vals[-i] )
+
+ # 'and' query
+ for i in range(len(src_keys) // 2):
+ do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + str_vals[i] + ' and x-amz-meta-foo-str <= ' + str_vals[i + 1],
+ lambda k: k.get_metadata('foo-str') >= str_vals[i] and k.get_metadata('foo-str') <= str_vals[i + 1] )
+ # more complicated query
+ for i in range(len(src_keys) // 2):
+ do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name + ' and x-amz-meta-foo-str >= ' + str_vals[i] +
+ ' and (x-amz-meta-foo-str <= ' + str_vals[i + 1] + ')',
+ lambda k: k.bucket.name == bucket.name and (k.get_metadata('foo-str') >= str_vals[i] and
+ k.get_metadata('foo-str') <= str_vals[i + 1]) )
+
+def test_es_bucket_conf():
+ min_size = 0
+
+ def bucket_init(zone_conn, bucket):
+ req = MDSearchConfig(zone_conn.conn, bucket.name)
+ req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date')
+
+ targets, sources, buckets, _ = init_env(None, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init)
+
+ for source_conn in sources:
+ for bucket in buckets:
+ req = MDSearchConfig(source_conn.conn, bucket.name)
+ conf = req.get_config()
+
+ d = {}
+
+ for entry in conf:
+ d[entry['Key']] = entry['Type']
+
+ eq(len(d), 3)
+ eq(d['x-amz-meta-foo-str'], 'str')
+ eq(d['x-amz-meta-foo-int'], 'int')
+ eq(d['x-amz-meta-foo-date'], 'date')
+
+ req.del_config()
+
+ conf = req.get_config()
+
+ eq(len(conf), 0)
+
+ break # no need to iterate over all zones
diff --git a/src/test/rgw/rgw_multi/tools.py b/src/test/rgw/rgw_multi/tools.py
new file mode 100644
index 000000000..dd7f91ade
--- /dev/null
+++ b/src/test/rgw/rgw_multi/tools.py
@@ -0,0 +1,97 @@
+import json
+import boto
+
+def append_attr_value(d, attr, attrv):
+ if attrv and len(str(attrv)) > 0:
+ d[attr] = attrv
+
+def append_attr(d, k, attr):
+ try:
+ attrv = getattr(k, attr)
+ except:
+ return
+ append_attr_value(d, attr, attrv)
+
+def get_attrs(k, attrs):
+ d = {}
+ for a in attrs:
+ append_attr(d, k, a)
+
+ return d
+
+def append_query_arg(s, n, v):
+ if not v:
+ return s
+ nv = '{n}={v}'.format(n=n, v=v)
+ if not s:
+ return nv
+ return '{s}&{nv}'.format(s=s, nv=nv)
+
+class KeyJSONEncoder(boto.s3.key.Key):
+ @staticmethod
+ def default(k, versioned=False):
+ attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control',
+ 'content_type', 'content_disposition', 'content_language',
+ 'owner', 'storage_class', 'md5', 'version_id', 'encrypted',
+ 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag']
+ d = get_attrs(k, attrs)
+ d['etag'] = k.etag[1:-1]
+ if versioned:
+ d['is_latest'] = k.is_latest
+ return d
+
+class DeleteMarkerJSONEncoder(boto.s3.key.Key):
+ @staticmethod
+ def default(k):
+ attrs = ['name', 'version_id', 'last_modified', 'owner']
+ d = get_attrs(k, attrs)
+ d['delete_marker'] = True
+ d['is_latest'] = k.is_latest
+ return d
+
+class UserJSONEncoder(boto.s3.user.User):
+ @staticmethod
+ def default(k):
+ attrs = ['id', 'display_name']
+ return get_attrs(k, attrs)
+
+class BucketJSONEncoder(boto.s3.bucket.Bucket):
+ @staticmethod
+ def default(k):
+ attrs = ['name', 'creation_date']
+ return get_attrs(k, attrs)
+
+class BotoJSONEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, boto.s3.key.Key):
+ return KeyJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.deletemarker.DeleteMarker):
+ return DeleteMarkerJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.user.User):
+ return UserJSONEncoder.default(obj)
+ if isinstance(obj, boto.s3.prefix.Prefix):
+ return (lambda x: {'prefix': x.name})(obj)
+ if isinstance(obj, boto.s3.bucket.Bucket):
+ return BucketJSONEncoder.default(obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+def dump_json(o, cls=BotoJSONEncoder):
+ return json.dumps(o, cls=cls, indent=4)
+
+def assert_raises(excClass, callableObj, *args, **kwargs):
+ """
+ Like unittest.TestCase.assertRaises, but returns the exception.
+ """
+ try:
+ callableObj(*args, **kwargs)
+ except excClass as e:
+ return e
+ else:
+ if hasattr(excClass, '__name__'):
+ excName = excClass.__name__
+ else:
+ excName = str(excClass)
+ raise AssertionError("%s not raised" % excName)
+
+
diff --git a/src/test/rgw/rgw_multi/zone_az.py b/src/test/rgw/rgw_multi/zone_az.py
new file mode 100644
index 000000000..f9cd43574
--- /dev/null
+++ b/src/test/rgw/rgw_multi/zone_az.py
@@ -0,0 +1,42 @@
+import logging
+
+from .multisite import Zone
+
+
+log = logging.getLogger('rgw_multi.tests')
+
+
+class AZone(Zone): # pylint: disable=too-many-ancestors
+ """ archive zone class """
+ def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None):
+ super(AZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+ def is_read_only(self):
+ return False
+
+ def tier_type(self):
+ return "archive"
+
+ def create(self, cluster, args=None, **kwargs):
+ if args is None:
+ args = ''
+ args += ['--tier-type', self.tier_type()]
+ return self.json_command(cluster, 'create', args)
+
+ def has_buckets(self):
+ return False
+
+ def has_roles(self):
+ return True
+
+class AZoneConfig:
+ """ archive zone configuration """
+ def __init__(self, cfg, section):
+ pass
+
+
+def print_connection_info(conn):
+ """print info of connection"""
+ print("Host: " + conn.host+':'+str(conn.port))
+ print("AWS Secret Key: " + conn.aws_secret_access_key)
+ print("AWS Access Key: " + conn.aws_access_key_id)
diff --git a/src/test/rgw/rgw_multi/zone_cloud.py b/src/test/rgw/rgw_multi/zone_cloud.py
new file mode 100644
index 000000000..dd5640cf2
--- /dev/null
+++ b/src/test/rgw/rgw_multi/zone_cloud.py
@@ -0,0 +1,326 @@
+import json
+import requests.compat
+import logging
+
+import boto
+import boto.s3.connection
+
+import dateutil.parser
+import datetime
+
+import re
+
+from nose.tools import eq_ as eq
+from itertools import zip_longest # type: ignore
+from urllib.parse import urlparse
+
+from .multisite import *
+from .tools import *
+
+log = logging.getLogger(__name__)
+
+def get_key_ver(k):
+ if not k.version_id:
+ return 'null'
+ return k.version_id
+
+def unquote(s):
+ if s[0] == '"' and s[-1] == '"':
+ return s[1:-1]
+ return s
+
+def check_object_eq(k1, k2, check_extra = True):
+ assert k1
+ assert k2
+ log.debug('comparing key name=%s', k1.name)
+ eq(k1.name, k2.name)
+ eq(k1.metadata, k2.metadata)
+ # eq(k1.cache_control, k2.cache_control)
+ eq(k1.content_type, k2.content_type)
+ eq(k1.content_encoding, k2.content_encoding)
+ eq(k1.content_disposition, k2.content_disposition)
+ eq(k1.content_language, k2.content_language)
+
+ eq(unquote(k1.etag), unquote(k2.etag))
+
+ mtime1 = dateutil.parser.parse(k1.last_modified)
+ mtime2 = dateutil.parser.parse(k2.last_modified)
+ log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified)
+ assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
+ # if check_extra:
+ # eq(k1.owner.id, k2.owner.id)
+ # eq(k1.owner.display_name, k2.owner.display_name)
+ # eq(k1.storage_class, k2.storage_class)
+ eq(k1.size, k2.size)
+ eq(get_key_ver(k1), get_key_ver(k2))
+ # eq(k1.encrypted, k2.encrypted)
+
+def make_request(conn, method, bucket, key, query_args, headers):
+ result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
+ if result.status // 100 != 2:
+ raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
+ return result
+
+class CloudKey:
+ def __init__(self, zone_bucket, k):
+ self.zone_bucket = zone_bucket
+
+ # we need two keys: when listing buckets, we get keys that only contain partial data
+ # but we need to have the full data so that we could use all the meta-rgwx- headers
+ # that are needed in order to create a correct representation of the object
+ self.key = k
+ self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update()
+ self.update()
+
+ def update(self):
+ k = self.key
+ rk = self.rgwx_key
+
+ self.size = rk.size
+ orig_name = rk.metadata.get('rgwx-source-key')
+ if not orig_name:
+ self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id)
+ rk = self.rgwx_key
+ orig_name = rk.metadata.get('rgwx-source-key')
+
+ self.name = orig_name
+ self.version_id = rk.metadata.get('rgwx-source-version-id')
+
+ ve = rk.metadata.get('rgwx-versioned-epoch')
+ if ve:
+ self.versioned_epoch = int(ve)
+ else:
+ self.versioned_epoch = 0
+
+ mt = rk.metadata.get('rgwx-source-mtime')
+ if mt:
+ self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT')
+ else:
+ self.last_modified = k.last_modified
+
+ et = rk.metadata.get('rgwx-source-etag')
+ if rk.etag.find('-') >= 0 or et.find('-') >= 0:
+ # in this case we will use the source etag as it was uploaded via multipart upload
+ # in one of the zones, so there's no way to make sure etags are calculated the same
+ # way. In the other case we'd just want to keep the etag that was generated in the
+ # regular upload mechanism, which should be consistent in both ends
+ self.etag = et
+ else:
+ self.etag = rk.etag
+
+ if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get
+ self.etag = '"' + self.etag + '"'
+
+ new_meta = {}
+ for meta_key, meta_val in k.metadata.items():
+ if not meta_key.startswith('rgwx-'):
+ new_meta[meta_key] = meta_val
+
+ self.metadata = new_meta
+
+ self.cache_control = k.cache_control
+ self.content_type = k.content_type
+ self.content_encoding = k.content_encoding
+ self.content_disposition = k.content_disposition
+ self.content_language = k.content_language
+
+
+ def get_contents_as_string(self, encoding=None):
+ r = self.key.get_contents_as_string(encoding=encoding)
+
+ # the previous call changed the status of the source object, as it loaded
+ # its metadata
+
+ self.rgwx_key = self.key
+ self.update()
+
+ return r
+
+
+class CloudZoneBucket:
+ def __init__(self, zone_conn, target_path, name):
+ self.zone_conn = zone_conn
+ self.name = name
+ self.cloud_conn = zone_conn.zone.cloud_conn
+
+ target_path = target_path[:]
+ if target_path[-1] != '/':
+ target_path += '/'
+ target_path = target_path.replace('${bucket}', name)
+
+ tp = target_path.split('/', 1)
+
+ if len(tp) == 1:
+ self.target_bucket = target_path
+ self.target_prefix = ''
+ else:
+ self.target_bucket = tp[0]
+ self.target_prefix = tp[1]
+
+ log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix)
+ self.bucket = self.cloud_conn.get_bucket(self.target_bucket)
+
+ def get_all_versions(self):
+ l = []
+
+ for k in self.bucket.get_all_keys(prefix=self.target_prefix):
+ new_key = CloudKey(self, k)
+
+ log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch)
+ l.append(new_key)
+
+
+ sort_key = lambda k: (k.name, -k.versioned_epoch)
+ l.sort(key = sort_key)
+
+ for new_key in l:
+ yield new_key
+
+ def get_key(self, name, version_id=None):
+ return CloudKey(self, self.bucket.get_key(name, version_id=version_id))
+
+
+def parse_endpoint(endpoint):
+ o = urlparse(endpoint)
+
+ netloc = o.netloc.split(':')
+
+ host = netloc[0]
+
+ if len(netloc) > 1:
+ port = int(netloc[1])
+ else:
+ port = o.port
+
+ is_secure = False
+
+ if o.scheme == 'https':
+ is_secure = True
+
+ if not port:
+ if is_secure:
+ port = 443
+ else:
+ port = 80
+
+ return host, port, is_secure
+
+
+class CloudZone(Zone):
+ def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path,
+ zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ self.cloud_endpoint = cloud_endpoint
+ self.credentials = credentials
+ self.source_bucket = source_bucket
+ self.target_path = target_path
+
+ self.target_path = self.target_path.replace('${zone}', name)
+ # self.target_path = self.target_path.replace('${zone_id}', zone_id)
+ self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name)
+ self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id)
+
+ log.debug('target_path=%s', self.target_path)
+
+ host, port, is_secure = parse_endpoint(cloud_endpoint)
+
+ self.cloud_conn = boto.connect_s3(
+ aws_access_key_id = credentials.access_key,
+ aws_secret_access_key = credentials.secret,
+ host = host,
+ port = port,
+ is_secure = is_secure,
+ calling_format = boto.s3.connection.OrdinaryCallingFormat())
+ super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+
+ def is_read_only(self):
+ return True
+
+ def tier_type(self):
+ return "cloud"
+
+ def create(self, cluster, args = None, check_retcode = True):
+ """ create the object with the given arguments """
+
+ if args is None:
+ args = ''
+
+ tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint,
+ 'connection.access_key=' + self.credentials.access_key,
+ 'connection.secret=' + self.credentials.secret,
+ 'target_path=' + re.escape(self.target_path)])
+
+ args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
+
+ return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
+
+ def has_buckets(self):
+ return False
+
+ def has_roles(self):
+ return False
+
+ class Conn(ZoneConn):
+ def __init__(self, zone, credentials):
+ super(CloudZone.Conn, self).__init__(zone, credentials)
+
+ def get_bucket(self, bucket_name):
+ return CloudZoneBucket(self, self.zone.target_path, bucket_name)
+
+ def create_bucket(self, name):
+ # should not be here, a bug in the test suite
+ log.critical('Conn.create_bucket() should not be called in cloud zone')
+ assert False
+
+ def check_bucket_eq(self, zone_conn, bucket_name):
+ assert(zone_conn.zone.tier_type() == "rados")
+
+ log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
+ b1 = self.get_bucket(bucket_name)
+ b2 = zone_conn.get_bucket(bucket_name)
+
+ log.debug('bucket1 objects:')
+ for o in b1.get_all_versions():
+ log.debug('o=%s', o.name)
+ log.debug('bucket2 objects:')
+ for o in b2.get_all_versions():
+ log.debug('o=%s', o.name)
+
+ for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+ if k1 is None:
+ log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+ assert False
+ if k2 is None:
+ log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+ assert False
+
+ check_object_eq(k1, k2)
+
+
+ log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+ return True
+
+ def create_role(self, path, rolename, policy_document, tag_list):
+ assert False
+
+ def get_conn(self, credentials):
+ return self.Conn(self, credentials)
+
+
+class CloudZoneConfig:
+ def __init__(self, cfg, section):
+ self.endpoint = cfg.get(section, 'endpoint')
+ access_key = cfg.get(section, 'access_key')
+ secret = cfg.get(section, 'secret')
+ self.credentials = Credentials(access_key, secret)
+ try:
+ self.target_path = cfg.get(section, 'target_path')
+ except:
+ self.target_path = 'rgw-${zonegroup_id}/${bucket}'
+
+ try:
+ self.source_bucket = cfg.get(section, 'source_bucket')
+ except:
+ self.source_bucket = '*'
+
diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py
new file mode 100644
index 000000000..e98b3fdd8
--- /dev/null
+++ b/src/test/rgw/rgw_multi/zone_es.py
@@ -0,0 +1,256 @@
+import json
+import requests.compat
+import logging
+
+import boto
+import boto.s3.connection
+
+import dateutil.parser
+
+from nose.tools import eq_ as eq
+from itertools import zip_longest # type: ignore
+
+from .multisite import *
+from .tools import *
+
+log = logging.getLogger(__name__)
+
+def get_key_ver(k):
+ if not k.version_id:
+ return 'null'
+ return k.version_id
+
+def check_object_eq(k1, k2, check_extra = True):
+ assert k1
+ assert k2
+ log.debug('comparing key name=%s', k1.name)
+ eq(k1.name, k2.name)
+ eq(k1.metadata, k2.metadata)
+ # eq(k1.cache_control, k2.cache_control)
+ eq(k1.content_type, k2.content_type)
+ # eq(k1.content_encoding, k2.content_encoding)
+ # eq(k1.content_disposition, k2.content_disposition)
+ # eq(k1.content_language, k2.content_language)
+ eq(k1.etag, k2.etag)
+ mtime1 = dateutil.parser.parse(k1.last_modified)
+ mtime2 = dateutil.parser.parse(k2.last_modified)
+ assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
+ if check_extra:
+ eq(k1.owner.id, k2.owner.id)
+ eq(k1.owner.display_name, k2.owner.display_name)
+ # eq(k1.storage_class, k2.storage_class)
+ eq(k1.size, k2.size)
+ eq(get_key_ver(k1), get_key_ver(k2))
+ # eq(k1.encrypted, k2.encrypted)
+
+def make_request(conn, method, bucket, key, query_args, headers):
+ result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
+ if result.status // 100 != 2:
+ raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
+ return result
+
+
+class MDSearch:
+ def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
+ self.conn = conn
+ self.bucket_name = bucket_name or ''
+ if bucket_name:
+ self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+ else:
+ self.bucket = None
+ self.query = query
+ self.query_args = query_args
+ self.max_keys = None
+ self.marker = marker
+
+ def raw_search(self):
+ q = self.query or ''
+ query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
+ if self.max_keys is not None:
+ query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
+ if self.marker:
+ query_args = append_query_arg(query_args, 'marker', self.marker)
+
+ query_args = append_query_arg(query_args, 'format', 'json')
+
+ headers = {}
+
+ result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
+
+ l = []
+
+ result_dict = json.loads(result.read())
+
+ for entry in result_dict['Objects']:
+ bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
+ k = boto.s3.key.Key(bucket, entry['Key'])
+
+ k.version_id = entry['Instance']
+ k.etag = entry['ETag']
+ k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
+ k.last_modified = entry['LastModified']
+ k.size = entry['Size']
+ k.content_type = entry['ContentType']
+ k.versioned_epoch = entry['VersionedEpoch']
+
+ k.metadata = {}
+ for e in entry['CustomMetadata']:
+ k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
+
+ l.append(k)
+
+ return result_dict, l
+
+ def search(self, drain = True, sort = True, sort_key = None):
+ l = []
+
+ is_done = False
+
+ while not is_done:
+ result, result_keys = self.raw_search()
+
+ l = l + result_keys
+
+ is_done = not (drain and (result['IsTruncated'] == "true"))
+ marker = result['Marker']
+
+ if sort:
+ if not sort_key:
+ sort_key = lambda k: (k.name, -k.versioned_epoch)
+ l.sort(key = sort_key)
+
+ return l
+
+
+class MDSearchConfig:
+ def __init__(self, conn, bucket_name):
+ self.conn = conn
+ self.bucket_name = bucket_name or ''
+ if bucket_name:
+ self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
+ else:
+ self.bucket = None
+
+ def send_request(self, conf, method):
+ query_args = 'mdsearch'
+ headers = None
+ if conf:
+ headers = { 'X-Amz-Meta-Search': conf }
+
+ query_args = append_query_arg(query_args, 'format', 'json')
+
+ return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
+
+ def get_config(self):
+ result = self.send_request(None, 'GET')
+ return json.loads(result.read())
+
+ def set_config(self, conf):
+ self.send_request(conf, 'POST')
+
+ def del_config(self):
+ self.send_request(None, 'DELETE')
+
+
+class ESZoneBucket:
+ def __init__(self, zone_conn, name, conn):
+ self.zone_conn = zone_conn
+ self.name = name
+ self.conn = conn
+
+ self.bucket = boto.s3.bucket.Bucket(name=name)
+
+ def get_all_versions(self):
+
+ marker = None
+ is_done = False
+
+ req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
+
+ for k in req.search():
+ yield k
+
+
+
+
+class ESZone(Zone):
+ def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ self.es_endpoint = es_endpoint
+ super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+ def is_read_only(self):
+ return True
+
+ def tier_type(self):
+ return "elasticsearch"
+
+ def create(self, cluster, args = None, check_retcode = True):
+ """ create the object with the given arguments """
+
+ if args is None:
+ args = ''
+
+ tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ])
+
+ args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
+
+ return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
+
+ def has_buckets(self):
+ return False
+
+ def has_roles(self):
+ return False
+
+ class Conn(ZoneConn):
+ def __init__(self, zone, credentials):
+ super(ESZone.Conn, self).__init__(zone, credentials)
+
+ def get_bucket(self, bucket_name):
+ return ESZoneBucket(self, bucket_name, self.conn)
+
+ def create_bucket(self, name):
+ # should not be here, a bug in the test suite
+ log.critical('Conn.create_bucket() should not be called in ES zone')
+ assert False
+
+ def check_bucket_eq(self, zone_conn, bucket_name):
+ assert(zone_conn.zone.tier_type() == "rados")
+
+ log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
+ b1 = self.get_bucket(bucket_name)
+ b2 = zone_conn.get_bucket(bucket_name)
+
+ log.debug('bucket1 objects:')
+ for o in b1.get_all_versions():
+ log.debug('o=%s', o.name)
+ log.debug('bucket2 objects:')
+ for o in b2.get_all_versions():
+ log.debug('o=%s', o.name)
+
+ for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
+ if k1 is None:
+ log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+ assert False
+ if k2 is None:
+ log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+ assert False
+
+ check_object_eq(k1, k2)
+
+
+ log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+ return True
+
+ def create_role(self, path, rolename, policy_document, tag_list):
+ assert False
+
+ def get_conn(self, credentials):
+ return self.Conn(self, credentials)
+
+
+class ESZoneConfig:
+ def __init__(self, cfg, section):
+ self.endpoint = cfg.get(section, 'endpoint')
+
diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py
new file mode 100644
index 000000000..ac4edd004
--- /dev/null
+++ b/src/test/rgw/rgw_multi/zone_rados.py
@@ -0,0 +1,134 @@
+import logging
+from boto.s3.deletemarker import DeleteMarker
+
+from itertools import zip_longest # type: ignore
+
+from nose.tools import eq_ as eq
+
+from .multisite import *
+
+log = logging.getLogger(__name__)
+
+def check_object_eq(k1, k2, check_extra = True):
+ assert k1
+ assert k2
+ log.debug('comparing key name=%s', k1.name)
+ eq(k1.name, k2.name)
+ eq(k1.version_id, k2.version_id)
+ eq(k1.is_latest, k2.is_latest)
+ eq(k1.last_modified, k2.last_modified)
+ if isinstance(k1, DeleteMarker):
+ assert isinstance(k2, DeleteMarker)
+ return
+
+ eq(k1.get_contents_as_string(), k2.get_contents_as_string())
+ eq(k1.metadata, k2.metadata)
+ eq(k1.cache_control, k2.cache_control)
+ eq(k1.content_type, k2.content_type)
+ eq(k1.content_encoding, k2.content_encoding)
+ eq(k1.content_disposition, k2.content_disposition)
+ eq(k1.content_language, k2.content_language)
+ eq(k1.etag, k2.etag)
+ if check_extra:
+ eq(k1.owner.id, k2.owner.id)
+ eq(k1.owner.display_name, k2.owner.display_name)
+ eq(k1.storage_class, k2.storage_class)
+ eq(k1.size, k2.size)
+ eq(k1.encrypted, k2.encrypted)
+
+class RadosZone(Zone):
+ def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+ super(RadosZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
+ def tier_type(self):
+ return "rados"
+
+
+ class Conn(ZoneConn):
+ def __init__(self, zone, credentials):
+ super(RadosZone.Conn, self).__init__(zone, credentials)
+
+ def get_bucket(self, name):
+ return self.conn.get_bucket(name)
+
+ def create_bucket(self, name):
+ return self.conn.create_bucket(name)
+
+ def delete_bucket(self, name):
+ return self.conn.delete_bucket(name)
+
+ def check_bucket_eq(self, zone_conn, bucket_name):
+ log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+ b1 = self.get_bucket(bucket_name)
+ b2 = zone_conn.get_bucket(bucket_name)
+
+ b1_versions = b1.list_versions()
+ log.debug('bucket1 objects:')
+ for o in b1_versions:
+ log.debug('o=%s', o.name)
+
+ b2_versions = b2.list_versions()
+ log.debug('bucket2 objects:')
+ for o in b2_versions:
+ log.debug('o=%s', o.name)
+
+ for k1, k2 in zip_longest(b1_versions, b2_versions):
+ if k1 is None:
+ log.critical('key=%s is missing from zone=%s', k2.name, self.name)
+ assert False
+ if k2 is None:
+ log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
+ assert False
+
+ check_object_eq(k1, k2)
+
+ if isinstance(k1, DeleteMarker):
+ # verify that HEAD sees a delete marker
+ assert b1.get_key(k1.name) is None
+ assert b2.get_key(k2.name) is None
+ else:
+ # now get the keys through a HEAD operation, verify that the available data is the same
+ k1_head = b1.get_key(k1.name, version_id=k1.version_id)
+ k2_head = b2.get_key(k2.name, version_id=k2.version_id)
+ check_object_eq(k1_head, k2_head, False)
+
+ if k1.version_id:
+ # compare the olh to make sure they agree about the current version
+ k1_olh = b1.get_key(k1.name)
+ k2_olh = b2.get_key(k2.name)
+ # if there's a delete marker, HEAD will return None
+ if k1_olh or k2_olh:
+ check_object_eq(k1_olh, k2_olh, False)
+
+ log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
+
+ return True
+
+ def get_role(self, role_name):
+ return self.iam_conn.get_role(role_name)
+
+ def check_role_eq(self, zone_conn, role_name):
+ log.info('comparing role=%s zones={%s, %s}', role_name, self.name, zone_conn.name)
+ r1 = self.get_role(role_name)
+ r2 = zone_conn.get_role(role_name)
+
+ assert r1
+ assert r2
+ log.debug('comparing role name=%s', r1['get_role_response']['get_role_result']['role']['role_name'])
+ eq(r1['get_role_response']['get_role_result']['role']['role_name'], r2['get_role_response']['get_role_result']['role']['role_name'])
+ eq(r1['get_role_response']['get_role_result']['role']['role_id'], r2['get_role_response']['get_role_result']['role']['role_id'])
+ eq(r1['get_role_response']['get_role_result']['role']['path'], r2['get_role_response']['get_role_result']['role']['path'])
+ eq(r1['get_role_response']['get_role_result']['role']['arn'], r2['get_role_response']['get_role_result']['role']['arn'])
+ eq(r1['get_role_response']['get_role_result']['role']['max_session_duration'], r2['get_role_response']['get_role_result']['role']['max_session_duration'])
+ eq(r1['get_role_response']['get_role_result']['role']['assume_role_policy_document'], r2['get_role_response']['get_role_result']['role']['assume_role_policy_document'])
+
+ log.info('success, role identical: role=%s zones={%s, %s}', role_name, self.name, zone_conn.name)
+
+ return True
+
+ def create_role(self, path, rolename, policy_document, tag_list):
+ return self.iam_conn.create_role(rolename, policy_document, path)
+
+ def get_conn(self, credentials):
+ return self.Conn(self, credentials)
+