diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/test/rgw/rgw_multi | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/rgw/rgw_multi')
-rw-r--r-- | src/test/rgw/rgw_multi/__init__.py | 0 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/conn.py | 41 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/multisite.py | 407 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/tests.py | 2861 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/tests_az.py | 597 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/tests_es.py | 276 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/tools.py | 97 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/zone_az.py | 42 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/zone_cloud.py | 326 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/zone_es.py | 256 | ||||
-rw-r--r-- | src/test/rgw/rgw_multi/zone_rados.py | 134 |
11 files changed, 5037 insertions, 0 deletions
diff --git a/src/test/rgw/rgw_multi/__init__.py b/src/test/rgw/rgw_multi/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/test/rgw/rgw_multi/__init__.py diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py new file mode 100644 index 000000000..59bc2fdd3 --- /dev/null +++ b/src/test/rgw/rgw_multi/conn.py @@ -0,0 +1,41 @@ +import boto +import boto.s3.connection +import boto.iam.connection + +def get_gateway_connection(gateway, credentials): + """ connect to the given gateway """ + if gateway.connection is None: + gateway.connection = boto.connect_s3( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = gateway.host, + port = gateway.port, + is_secure = False, + calling_format = boto.s3.connection.OrdinaryCallingFormat()) + return gateway.connection + +def get_gateway_secure_connection(gateway, credentials): + """ secure connect to the given gateway """ + if gateway.ssl_port == 0: + return None + if gateway.secure_connection is None: + gateway.secure_connection = boto.connect_s3( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = gateway.host, + port = gateway.ssl_port, + is_secure = True, + validate_certs=False, + calling_format = boto.s3.connection.OrdinaryCallingFormat()) + return gateway.secure_connection + +def get_gateway_iam_connection(gateway, credentials): + """ connect to iam api of the given gateway """ + if gateway.iam_connection is None: + gateway.iam_connection = boto.connect_iam( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = gateway.host, + port = gateway.port, + is_secure = False) + return gateway.iam_connection diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py new file mode 100644 index 000000000..5d4dcd1aa --- /dev/null +++ b/src/test/rgw/rgw_multi/multisite.py @@ -0,0 +1,407 @@ +from abc import ABCMeta, abstractmethod +from io import StringIO + +import json + +from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection + +class Cluster: + """ interface to run commands against a distinct ceph cluster """ + __metaclass__ = ABCMeta + + @abstractmethod + def admin(self, args = None, **kwargs): + """ execute a radosgw-admin command """ + pass + +class Gateway: + """ interface to control a single radosgw instance """ + __metaclass__ = ABCMeta + + def __init__(self, host = None, port = None, cluster = None, zone = None, ssl_port = 0): + self.host = host + self.port = port + self.cluster = cluster + self.zone = zone + self.connection = None + self.secure_connection = None + self.ssl_port = ssl_port + self.iam_connection = None + + @abstractmethod + def start(self, args = []): + """ start the gateway with the given args """ + pass + + @abstractmethod + def stop(self): + """ stop the gateway """ + pass + + def endpoint(self): + return 'http://%s:%d' % (self.host, self.port) + +class SystemObject: + """ interface for system objects, represented in json format and + manipulated with radosgw-admin commands """ + __metaclass__ = ABCMeta + + def __init__(self, data = None, uuid = None): + self.data = data + self.id = uuid + if data: + self.load_from_json(data) + + @abstractmethod + def build_command(self, command): + """ return the command line for the given command, including arguments + to specify this object """ + pass + + @abstractmethod + def load_from_json(self, data): + """ update internal state based on json data """ + pass + + def command(self, cluster, cmd, args = None, **kwargs): + """ run the given command and return the output and retcode """ + args = self.build_command(cmd) + (args or []) + return cluster.admin(args, **kwargs) + + def json_command(self, cluster, cmd, args = None, **kwargs): + """ run the given command, parse the output and return the resulting + data and retcode """ + s, r = self.command(cluster, cmd, args or [], **kwargs) + if r == 0: + data = json.loads(s) + self.load_from_json(data) + self.data = data + return self.data, r + + # mixins for supported commands + class Create(object): + def create(self, cluster, args = None, **kwargs): + """ create the object with the given arguments """ + return self.json_command(cluster, 'create', args, **kwargs) + + class Delete(object): + def delete(self, cluster, args = None, **kwargs): + """ delete the object """ + # not json_command() because delete has no output + _, r = self.command(cluster, 'delete', args, **kwargs) + if r == 0: + self.data = None + return r + + class Get(object): + def get(self, cluster, args = None, **kwargs): + """ read the object from storage """ + kwargs['read_only'] = True + return self.json_command(cluster, 'get', args, **kwargs) + + class Set(object): + def set(self, cluster, data, args = None, **kwargs): + """ set the object by json """ + kwargs['stdin'] = StringIO(json.dumps(data)) + return self.json_command(cluster, 'set', args, **kwargs) + + class Modify(object): + def modify(self, cluster, args = None, **kwargs): + """ modify the object with the given arguments """ + return self.json_command(cluster, 'modify', args, **kwargs) + + class CreateDelete(Create, Delete): pass + class GetSet(Get, Set): pass + +class Zone(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify): + def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + self.name = name + self.zonegroup = zonegroup + self.cluster = cluster + self.gateways = gateways or [] + super(Zone, self).__init__(data, zone_id) + + def zone_arg(self): + """ command-line argument to specify this zone """ + return ['--rgw-zone', self.name] + + def zone_args(self): + """ command-line arguments to specify this zone/zonegroup/realm """ + args = self.zone_arg() + if self.zonegroup: + args += self.zonegroup.zonegroup_args() + return args + + def build_command(self, command): + """ build a command line for the given command and args """ + return ['zone', command] + self.zone_args() + + def load_from_json(self, data): + """ load the zone from json """ + self.id = data['id'] + self.name = data['name'] + + def start(self, args = None): + """ start all gateways """ + for g in self.gateways: + g.start(args) + + def stop(self): + """ stop all gateways """ + for g in self.gateways: + g.stop() + + def period(self): + return self.zonegroup.period if self.zonegroup else None + + def realm(self): + return self.zonegroup.realm() if self.zonegroup else None + + def is_read_only(self): + return False + + def tier_type(self): + raise NotImplementedError + + def syncs_from(self, zone_name): + return zone_name != self.name + + def has_buckets(self): + return True + + def has_roles(self): + return True + + def get_conn(self, credentials): + return ZoneConn(self, credentials) # not implemented, but can be used + +class ZoneConn(object): + def __init__(self, zone, credentials): + self.zone = zone + self.name = zone.name + """ connect to the zone's first gateway """ + if isinstance(credentials, list): + self.credentials = credentials[0] + else: + self.credentials = credentials + + if self.zone.gateways is not None: + self.conn = get_gateway_connection(self.zone.gateways[0], self.credentials) + self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials) + + self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials) + + # create connections for the rest of the gateways (if exist) + for gw in list(self.zone.gateways): + get_gateway_connection(gw, self.credentials) + get_gateway_secure_connection(gw, self.credentials) + + get_gateway_iam_connection(gw, self.credentials) + + + def get_connection(self): + return self.conn + + def get_iam_connection(self): + return self.iam_conn + + def get_bucket(self, bucket_name, credentials): + raise NotImplementedError + + def check_bucket_eq(self, zone, bucket_name): + raise NotImplementedError + +class ZoneGroup(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet, SystemObject.Modify): + def __init__(self, name, period = None, data = None, zonegroup_id = None, zones = None, master_zone = None): + self.name = name + self.period = period + self.zones = zones or [] + self.master_zone = master_zone + super(ZoneGroup, self).__init__(data, zonegroup_id) + self.rw_zones = [] + self.ro_zones = [] + self.zones_by_type = {} + for z in self.zones: + if z.is_read_only(): + self.ro_zones.append(z) + else: + self.rw_zones.append(z) + + def zonegroup_arg(self): + """ command-line argument to specify this zonegroup """ + return ['--rgw-zonegroup', self.name] + + def zonegroup_args(self): + """ command-line arguments to specify this zonegroup/realm """ + args = self.zonegroup_arg() + realm = self.realm() + if realm: + args += realm.realm_arg() + return args + + def build_command(self, command): + """ build a command line for the given command and args """ + return ['zonegroup', command] + self.zonegroup_args() + + def zone_by_id(self, zone_id): + """ return the matching zone by id """ + for zone in self.zones: + if zone.id == zone_id: + return zone + return None + + def load_from_json(self, data): + """ load the zonegroup from json """ + self.id = data['id'] + self.name = data['name'] + master_id = data['master_zone'] + if not self.master_zone or master_id != self.master_zone.id: + self.master_zone = self.zone_by_id(master_id) + + def add(self, cluster, zone, args = None, **kwargs): + """ add an existing zone to the zonegroup """ + args = zone.zone_arg() + (args or []) + data, r = self.json_command(cluster, 'add', args, **kwargs) + if r == 0: + zone.zonegroup = self + self.zones.append(zone) + return data, r + + def remove(self, cluster, zone, args = None, **kwargs): + """ remove an existing zone from the zonegroup """ + args = zone.zone_arg() + (args or []) + data, r = self.json_command(cluster, 'remove', args, **kwargs) + if r == 0: + zone.zonegroup = None + self.zones.remove(zone) + return data, r + + def realm(self): + return self.period.realm if self.period else None + +class Period(SystemObject, SystemObject.Get): + def __init__(self, realm = None, data = None, period_id = None, zonegroups = None, master_zonegroup = None): + self.realm = realm + self.zonegroups = zonegroups or [] + self.master_zonegroup = master_zonegroup + super(Period, self).__init__(data, period_id) + + def zonegroup_by_id(self, zonegroup_id): + """ return the matching zonegroup by id """ + for zonegroup in self.zonegroups: + if zonegroup.id == zonegroup_id: + return zonegroup + return None + + def build_command(self, command): + """ build a command line for the given command and args """ + return ['period', command] + + def load_from_json(self, data): + """ load the period from json """ + self.id = data['id'] + master_id = data['master_zonegroup'] + if not self.master_zonegroup or master_id != self.master_zonegroup.id: + self.master_zonegroup = self.zonegroup_by_id(master_id) + + def update(self, zone, args = None, **kwargs): + """ run 'radosgw-admin period update' on the given zone """ + assert(zone.cluster) + args = zone.zone_args() + (args or []) + if kwargs.pop('commit', False): + args.append('--commit') + return self.json_command(zone.cluster, 'update', args, **kwargs) + + def commit(self, zone, args = None, **kwargs): + """ run 'radosgw-admin period commit' on the given zone """ + assert(zone.cluster) + args = zone.zone_args() + (args or []) + return self.json_command(zone.cluster, 'commit', args, **kwargs) + +class Realm(SystemObject, SystemObject.CreateDelete, SystemObject.GetSet): + def __init__(self, name, period = None, data = None, realm_id = None): + self.name = name + self.current_period = period + super(Realm, self).__init__(data, realm_id) + + def realm_arg(self): + """ return the command-line arguments that specify this realm """ + return ['--rgw-realm', self.name] + + def build_command(self, command): + """ build a command line for the given command and args """ + return ['realm', command] + self.realm_arg() + + def load_from_json(self, data): + """ load the realm from json """ + self.id = data['id'] + + def pull(self, cluster, gateway, credentials, args = [], **kwargs): + """ pull an existing realm from the given gateway """ + args += ['--url', gateway.endpoint()] + args += credentials.credential_args() + return self.json_command(cluster, 'pull', args, **kwargs) + + def master_zonegroup(self): + """ return the current period's master zonegroup """ + if self.current_period is None: + return None + return self.current_period.master_zonegroup + + def meta_master_zone(self): + """ return the current period's metadata master zone """ + zonegroup = self.master_zonegroup() + if zonegroup is None: + return None + return zonegroup.master_zone + +class Credentials: + def __init__(self, access_key, secret): + self.access_key = access_key + self.secret = secret + + def credential_args(self): + return ['--access-key', self.access_key, '--secret', self.secret] + +class User(SystemObject): + def __init__(self, uid, data = None, name = None, credentials = None, tenant = None): + self.name = name + self.credentials = credentials or [] + self.tenant = tenant + super(User, self).__init__(data, uid) + + def user_arg(self): + """ command-line argument to specify this user """ + args = ['--uid', self.id] + if self.tenant: + args += ['--tenant', self.tenant] + return args + + def build_command(self, command): + """ build a command line for the given command and args """ + return ['user', command] + self.user_arg() + + def load_from_json(self, data): + """ load the user from json """ + self.id = data['user_id'] + self.name = data['display_name'] + self.credentials = [Credentials(k['access_key'], k['secret_key']) for k in data['keys']] + + def create(self, zone, args = None, **kwargs): + """ create the user with the given arguments """ + assert(zone.cluster) + args = zone.zone_args() + (args or []) + return self.json_command(zone.cluster, 'create', args, **kwargs) + + def info(self, zone, args = None, **kwargs): + """ read the user from storage """ + assert(zone.cluster) + args = zone.zone_args() + (args or []) + kwargs['read_only'] = True + return self.json_command(zone.cluster, 'info', args, **kwargs) + + def delete(self, zone, args = None, **kwargs): + """ delete the user """ + assert(zone.cluster) + args = zone.zone_args() + (args or []) + return self.command(zone.cluster, 'delete', args, **kwargs) diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py new file mode 100644 index 000000000..156fac12e --- /dev/null +++ b/src/test/rgw/rgw_multi/tests.py @@ -0,0 +1,2861 @@ +import json +import random +import string +import sys +import time +import logging +import errno +import dateutil.parser + +from itertools import combinations +from itertools import zip_longest +from io import StringIO + +import boto +import boto.s3.connection +from boto.s3.website import WebsiteConfiguration +from boto.s3.cors import CORSConfiguration + +from nose.tools import eq_ as eq +from nose.tools import assert_not_equal, assert_equal +from nose.plugins.attrib import attr +from nose.plugins.skip import SkipTest + +from .multisite import Zone, ZoneGroup, Credentials + +from .conn import get_gateway_connection +from .tools import assert_raises + +class Config: + """ test configuration """ + def __init__(self, **kwargs): + # by default, wait up to 5 minutes before giving up on a sync checkpoint + self.checkpoint_retries = kwargs.get('checkpoint_retries', 60) + self.checkpoint_delay = kwargs.get('checkpoint_delay', 5) + # allow some time for realm reconfiguration after changing master zone + self.reconfigure_delay = kwargs.get('reconfigure_delay', 5) + self.tenant = kwargs.get('tenant', '') + +# rgw multisite tests, written against the interfaces provided in rgw_multi. +# these tests must be initialized and run by another module that provides +# implementations of these interfaces by calling init_multi() +realm = None +user = None +config = None +def init_multi(_realm, _user, _config=None): + global realm + realm = _realm + global user + user = _user + global config + config = _config or Config() + realm_meta_checkpoint(realm) + +def get_user(): + return user.id if user is not None else '' + +def get_tenant(): + return config.tenant if config is not None and config.tenant is not None else '' + +def get_realm(): + return realm + +log = logging.getLogger('rgw_multi.tests') + +num_buckets = 0 +run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) + +num_roles = 0 + +def get_zone_connection(zone, credentials): + """ connect to the zone's first gateway """ + if isinstance(credentials, list): + credentials = credentials[0] + return get_gateway_connection(zone.gateways[0], credentials) + +def mdlog_list(zone, period = None): + cmd = ['mdlog', 'list'] + if period: + cmd += ['--period', period] + (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True) + return json.loads(mdlog_json) + +def mdlog_autotrim(zone): + zone.cluster.admin(['mdlog', 'autotrim']) + +def datalog_list(zone, args = None): + cmd = ['datalog', 'list'] + (args or []) + (datalog_json, _) = zone.cluster.admin(cmd, read_only=True) + return json.loads(datalog_json) + +def datalog_status(zone): + cmd = ['datalog', 'status'] + (datalog_json, _) = zone.cluster.admin(cmd, read_only=True) + return json.loads(datalog_json) + +def datalog_autotrim(zone): + zone.cluster.admin(['datalog', 'autotrim']) + +def bilog_list(zone, bucket, args = None): + cmd = ['bilog', 'list', '--bucket', bucket] + (args or []) + cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else [] + bilog, _ = zone.cluster.admin(cmd, read_only=True) + return json.loads(bilog) + +def bilog_autotrim(zone, args = None): + zone.cluster.admin(['bilog', 'autotrim'] + (args or [])) + +def bucket_layout(zone, bucket, args = None): + (bl_output,_) = zone.cluster.admin(['bucket', 'layout', '--bucket', bucket] + (args or [])) + return json.loads(bl_output) + +def parse_meta_sync_status(meta_sync_status_json): + log.debug('current meta sync status=%s', meta_sync_status_json) + sync_status = json.loads(meta_sync_status_json) + + sync_info = sync_status['sync_status']['info'] + global_sync_status = sync_info['status'] + num_shards = sync_info['num_shards'] + period = sync_info['period'] + realm_epoch = sync_info['realm_epoch'] + + sync_markers=sync_status['sync_status']['markers'] + log.debug('sync_markers=%s', sync_markers) + assert(num_shards == len(sync_markers)) + + markers={} + for i in range(num_shards): + # get marker, only if it's an incremental marker for the same realm epoch + if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0: + markers[i] = '' + else: + markers[i] = sync_markers[i]['val']['marker'] + + return period, realm_epoch, num_shards, markers + +def meta_sync_status(zone): + for _ in range(config.checkpoint_retries): + cmd = ['metadata', 'sync', 'status'] + zone.zone_args() + meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True) + if retcode == 0: + return parse_meta_sync_status(meta_sync_status_json) + assert(retcode == 2) # ENOENT + time.sleep(config.checkpoint_delay) + + assert False, 'failed to read metadata sync status for zone=%s' % zone.name + +def meta_master_log_status(master_zone): + cmd = ['mdlog', 'status'] + master_zone.zone_args() + mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True) + mdlog_status = json.loads(mdlog_status_json) + + markers = {i: s['marker'] for i, s in enumerate(mdlog_status)} + log.debug('master meta markers=%s', markers) + return markers + +def compare_meta_status(zone, log_status, sync_status): + if len(log_status) != len(sync_status): + log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status)) + return False + + msg = '' + for i, l, s in zip(log_status, log_status.values(), sync_status.values()): + if l > s: + if len(msg): + msg += ', ' + msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s + + if len(msg) > 0: + log.warning('zone %s behind master: %s', zone.name, msg) + return False + + return True + +def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None): + if not meta_master_zone: + meta_master_zone = zone.realm().meta_master_zone() + if not master_status: + master_status = meta_master_log_status(meta_master_zone) + + current_realm_epoch = realm.current_period.data['realm_epoch'] + + log.info('starting meta checkpoint for zone=%s', zone.name) + + for _ in range(config.checkpoint_retries): + period, realm_epoch, num_shards, sync_status = meta_sync_status(zone) + if realm_epoch < current_realm_epoch: + log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d', + zone.name, realm_epoch, current_realm_epoch) + else: + log.debug('log_status=%s', master_status) + log.debug('sync_status=%s', sync_status) + if compare_meta_status(zone, master_status, sync_status): + log.info('finish meta checkpoint for zone=%s', zone.name) + return + + time.sleep(config.checkpoint_delay) + assert False, 'failed meta checkpoint for zone=%s' % zone.name + +def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None): + if not meta_master_zone: + meta_master_zone = zonegroup.realm().meta_master_zone() + if not master_status: + master_status = meta_master_log_status(meta_master_zone) + + for zone in zonegroup.zones: + if zone == meta_master_zone: + continue + zone_meta_checkpoint(zone, meta_master_zone, master_status) + +def realm_meta_checkpoint(realm): + log.info('meta checkpoint') + + meta_master_zone = realm.meta_master_zone() + master_status = meta_master_log_status(meta_master_zone) + + for zonegroup in realm.current_period.zonegroups: + zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status) + +def parse_data_sync_status(data_sync_status_json): + log.debug('current data sync status=%s', data_sync_status_json) + sync_status = json.loads(data_sync_status_json) + + global_sync_status=sync_status['sync_status']['info']['status'] + num_shards=sync_status['sync_status']['info']['num_shards'] + + sync_markers=sync_status['sync_status']['markers'] + log.debug('sync_markers=%s', sync_markers) + assert(num_shards == len(sync_markers)) + + markers={} + for i in range(num_shards): + markers[i] = sync_markers[i]['val']['marker'] + + return (num_shards, markers) + +def data_sync_status(target_zone, source_zone): + if target_zone == source_zone: + return None + + for _ in range(config.checkpoint_retries): + cmd = ['data', 'sync', 'status'] + target_zone.zone_args() + cmd += ['--source-zone', source_zone.name] + data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) + if retcode == 0: + return parse_data_sync_status(data_sync_status_json) + + assert(retcode == 2) # ENOENT + time.sleep(config.checkpoint_delay) + + assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \ + (target_zone.name, source_zone.name) + +def bucket_sync_status(target_zone, source_zone, bucket_name): + if target_zone == source_zone: + return None + + cmd = ['bucket', 'sync', 'markers'] + target_zone.zone_args() + cmd += ['--source-zone', source_zone.name] + cmd += ['--bucket', bucket_name] + cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else [] + while True: + bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) + if retcode == 0: + break + + assert(retcode == 2) # ENOENT + + sync_status = json.loads(bucket_sync_status_json) + + markers={} + for entry in sync_status: + val = entry['val'] + pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3 + markers[entry['key']] = pos + + return markers + +def data_source_log_status(source_zone): + source_cluster = source_zone.cluster + cmd = ['datalog', 'status'] + source_zone.zone_args() + datalog_status_json, retcode = source_cluster.admin(cmd, read_only=True) + datalog_status = json.loads(datalog_status_json) + + markers = {i: s['marker'] for i, s in enumerate(datalog_status)} + log.debug('data markers for zone=%s markers=%s', source_zone.name, markers) + return markers + +def bucket_source_log_status(source_zone, bucket_name): + cmd = ['bilog', 'status'] + source_zone.zone_args() + cmd += ['--bucket', bucket_name] + cmd += ['--tenant', config.tenant, '--uid', user.name] if config.tenant else [] + source_cluster = source_zone.cluster + bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True) + bilog_status = json.loads(bilog_status_json) + + m={} + markers={} + try: + m = bilog_status['markers'] + except: + pass + + for s in m: + key = s['key'] + val = s['val'] + markers[key] = val + + log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers) + return markers + +def compare_data_status(target_zone, source_zone, log_status, sync_status): + if len(log_status) != len(sync_status): + log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status)) + return False + + msg = '' + for i, l, s in zip(log_status, log_status.values(), sync_status.values()): + if l > s: + if len(msg): + msg += ', ' + msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s + + if len(msg) > 0: + log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg) + return False + + return True + +def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): + if len(log_status) != len(sync_status): + log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status)) + return False + + msg = '' + for i, l, s in zip(log_status, log_status.values(), sync_status.values()): + if l > s: + if len(msg): + msg += ', ' + msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s + + if len(msg) > 0: + log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg) + return False + + return True + +def zone_data_checkpoint(target_zone, source_zone): + if not target_zone.syncs_from(source_zone.name): + return + + log_status = data_source_log_status(source_zone) + log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name) + + for _ in range(config.checkpoint_retries): + num_shards, sync_status = data_sync_status(target_zone, source_zone) + + log.debug('log_status=%s', log_status) + log.debug('sync_status=%s', sync_status) + + if compare_data_status(target_zone, source_zone, log_status, sync_status): + log.info('finished data checkpoint for target_zone=%s source_zone=%s', + target_zone.name, source_zone.name) + return + time.sleep(config.checkpoint_delay) + + assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \ + (target_zone.name, source_zone.name) + +def zonegroup_data_checkpoint(zonegroup_conns): + for source_conn in zonegroup_conns.rw_zones: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + log.debug('data checkpoint: source=%s target=%s', source_conn.zone.name, target_conn.zone.name) + zone_data_checkpoint(target_conn.zone, source_conn.zone) + +def zone_bucket_checkpoint(target_zone, source_zone, bucket_name): + if not target_zone.syncs_from(source_zone.name): + return + + cmd = ['bucket', 'sync', 'checkpoint'] + cmd += ['--bucket', bucket_name, '--source-zone', source_zone.name] + retry_delay_ms = config.checkpoint_delay * 1000 + timeout_sec = config.checkpoint_retries * config.checkpoint_delay + cmd += ['--retry-delay-ms', str(retry_delay_ms), '--timeout-sec', str(timeout_sec)] + cmd += target_zone.zone_args() + target_zone.cluster.admin(cmd, debug_rgw=1) + +def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name): + for source_conn in zonegroup_conns.rw_zones: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + log.debug('bucket checkpoint: source=%s target=%s bucket=%s', source_conn.zone.name, target_conn.zone.name, bucket_name) + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name) + for source_conn, target_conn in combinations(zonegroup_conns.zones, 2): + if target_conn.zone.has_buckets(): + target_conn.check_bucket_eq(source_conn, bucket_name) + +def set_master_zone(zone): + zone.modify(zone.cluster, ['--master']) + zonegroup = zone.zonegroup + zonegroup.period.update(zone, commit=True) + zonegroup.master_zone = zone + log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay) + time.sleep(config.reconfigure_delay) + +def set_sync_from_all(zone, flag): + s = 'true' if flag else 'false' + zone.modify(zone.cluster, ['--sync-from-all={}'.format(s)]) + zonegroup = zone.zonegroup + zonegroup.period.update(zone, commit=True) + log.info('Set sync_from_all flag on zone %s to %s', zone.name, s) + time.sleep(config.reconfigure_delay) + +def set_redirect_zone(zone, redirect_zone): + id_str = redirect_zone.id if redirect_zone else '' + zone.modify(zone.cluster, ['--redirect-zone={}'.format(id_str)]) + zonegroup = zone.zonegroup + zonegroup.period.update(zone, commit=True) + log.info('Set redirect_zone zone %s to "%s"', zone.name, id_str) + time.sleep(config.reconfigure_delay) + +def enable_bucket_sync(zone, bucket_name): + cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args() + zone.cluster.admin(cmd) + +def disable_bucket_sync(zone, bucket_name): + cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args() + zone.cluster.admin(cmd) + +def check_buckets_sync_status_obj_not_exist(zone, buckets): + for _ in range(config.checkpoint_retries): + cmd = ['log', 'list'] + zone.zone_arg() + log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True) + for bucket in buckets: + if log_list.find(':'+bucket+":") >= 0: + break + else: + return + time.sleep(config.checkpoint_delay) + assert False + +def gen_bucket_name(): + global num_buckets + + num_buckets += 1 + return run_prefix + '-' + str(num_buckets) + +def gen_role_name(): + global num_roles + + num_roles += 1 + return "roles" + '-' + run_prefix + '-' + str(num_roles) + +class ZonegroupConns: + def __init__(self, zonegroup): + self.zonegroup = zonegroup + self.zones = [] + self.ro_zones = [] + self.rw_zones = [] + self.master_zone = None + + for z in zonegroup.zones: + zone_conn = z.get_conn(user.credentials) + self.zones.append(zone_conn) + if z.is_read_only(): + self.ro_zones.append(zone_conn) + else: + self.rw_zones.append(zone_conn) + + if z == zonegroup.master_zone: + self.master_zone = zone_conn + +def check_all_buckets_exist(zone_conn, buckets): + if not zone_conn.zone.has_buckets(): + return True + + for b in buckets: + try: + zone_conn.get_bucket(b) + except: + log.critical('zone %s does not contain bucket %s', zone_conn.zone.name, b) + return False + + return True + +def check_all_buckets_dont_exist(zone_conn, buckets): + if not zone_conn.zone.has_buckets(): + return True + + for b in buckets: + try: + zone_conn.get_bucket(b) + except: + continue + + log.critical('zone %s contains bucket %s', zone.zone, b) + return False + + return True + +def create_role_per_zone(zonegroup_conns, roles_per_zone = 1): + roles = [] + zone_role = [] + for zone in zonegroup_conns.rw_zones: + for i in range(roles_per_zone): + role_name = gen_role_name() + log.info('create role zone=%s name=%s', zone.name, role_name) + policy_document = "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam:::user/testuser\"]},\"Action\":[\"sts:AssumeRole\"]}]}" + role = zone.create_role("", role_name, policy_document, "") + roles.append(role_name) + zone_role.append((zone, role)) + + return roles, zone_role + +def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1): + buckets = [] + zone_bucket = [] + for zone in zonegroup_conns.rw_zones: + for i in range(buckets_per_zone): + bucket_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone.name, bucket_name) + bucket = zone.create_bucket(bucket_name) + buckets.append(bucket_name) + zone_bucket.append((zone, bucket)) + + return buckets, zone_bucket + +def create_bucket_per_zone_in_realm(): + buckets = [] + zone_bucket = [] + for zonegroup in realm.current_period.zonegroups: + zg_conn = ZonegroupConns(zonegroup) + b, z = create_bucket_per_zone(zg_conn) + buckets.extend(b) + zone_bucket.extend(z) + return buckets, zone_bucket + +def test_bucket_create(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, _ = create_bucket_per_zone(zonegroup_conns) + zonegroup_meta_checkpoint(zonegroup) + + for zone in zonegroup_conns.zones: + assert check_all_buckets_exist(zone, buckets) + +def test_bucket_recreate(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, _ = create_bucket_per_zone(zonegroup_conns) + zonegroup_meta_checkpoint(zonegroup) + + + for zone in zonegroup_conns.zones: + assert check_all_buckets_exist(zone, buckets) + + # recreate buckets on all zones, make sure they weren't removed + for zone in zonegroup_conns.rw_zones: + for bucket_name in buckets: + bucket = zone.create_bucket(bucket_name) + + for zone in zonegroup_conns.zones: + assert check_all_buckets_exist(zone, buckets) + + zonegroup_meta_checkpoint(zonegroup) + + for zone in zonegroup_conns.zones: + assert check_all_buckets_exist(zone, buckets) + +def test_bucket_remove(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + zonegroup_meta_checkpoint(zonegroup) + + for zone in zonegroup_conns.zones: + assert check_all_buckets_exist(zone, buckets) + + for zone, bucket_name in zone_bucket: + zone.conn.delete_bucket(bucket_name) + + zonegroup_meta_checkpoint(zonegroup) + + for zone in zonegroup_conns.zones: + assert check_all_buckets_dont_exist(zone, buckets) + +def get_bucket(zone, bucket_name): + return zone.conn.get_bucket(bucket_name) + +def get_key(zone, bucket_name, obj_name): + b = get_bucket(zone, bucket_name) + return b.get_key(obj_name) + +def new_key(zone, bucket_name, obj_name): + b = get_bucket(zone, bucket_name) + return b.new_key(obj_name) + +def check_bucket_eq(zone_conn1, zone_conn2, bucket): + if zone_conn2.zone.has_buckets(): + zone_conn2.check_bucket_eq(zone_conn1, bucket.name) + +def check_role_eq(zone_conn1, zone_conn2, role): + if zone_conn2.zone.has_roles(): + zone_conn2.check_role_eq(zone_conn1, role['create_role_response']['create_role_result']['role']['role_name']) + +def test_object_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + objnames = [ 'myobj', '_myobj', ':', '&' ] + content = 'asdasd' + + # don't wait for meta sync just yet + for zone, bucket_name in zone_bucket: + for objname in objnames: + k = new_key(zone, bucket_name, objname) + k.set_contents_from_string(content) + + zonegroup_meta_checkpoint(zonegroup) + + for source_conn, bucket in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) + +def test_object_delete(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + objname = 'myobj' + content = 'asdasd' + + # don't wait for meta sync just yet + for zone, bucket in zone_bucket: + k = new_key(zone, bucket, objname) + k.set_contents_from_string(content) + + zonegroup_meta_checkpoint(zonegroup) + + # check object exists + for source_conn, bucket in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) + + # check object removal + for source_conn, bucket in zone_bucket: + k = get_key(source_conn, bucket, objname) + k.delete() + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + check_bucket_eq(source_conn, target_conn, bucket) + +def get_latest_object_version(key): + for k in key.bucket.list_versions(key.name): + if k.is_latest: + return k + return None + +def test_versioned_object_incremental_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + # enable versioning + for _, bucket in zone_bucket: + bucket.configure_versioning(True) + + zonegroup_meta_checkpoint(zonegroup) + + # upload a dummy object to each bucket and wait for sync. this forces each + # bucket to finish a full sync and switch to incremental + for source_conn, bucket in zone_bucket: + new_key(source_conn, bucket, 'dummy').set_contents_from_string('') + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + + for _, bucket in zone_bucket: + # create and delete multiple versions of an object from each zone + for zone_conn in zonegroup_conns.rw_zones: + obj = 'obj-' + zone_conn.name + k = new_key(zone_conn, bucket, obj) + + k.set_contents_from_string('version1') + log.debug('version1 id=%s', k.version_id) + # don't delete version1 - this tests that the initial version + # doesn't get squashed into later versions + + # create and delete the following object versions to test that + # the operations don't race with each other during sync + k.set_contents_from_string('version2') + log.debug('version2 id=%s', k.version_id) + k.bucket.delete_key(obj, version_id=k.version_id) + + k.set_contents_from_string('version3') + log.debug('version3 id=%s', k.version_id) + k.bucket.delete_key(obj, version_id=k.version_id) + + for _, bucket in zone_bucket: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + for _, bucket in zone_bucket: + # overwrite the acls to test that metadata-only entries are applied + for zone_conn in zonegroup_conns.rw_zones: + obj = 'obj-' + zone_conn.name + k = new_key(zone_conn, bucket.name, obj) + v = get_latest_object_version(k) + v.make_public() + + for _, bucket in zone_bucket: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_concurrent_versioned_object_incremental_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + zone = zonegroup_conns.rw_zones[0] + + # create a versioned bucket + bucket = zone.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + bucket.configure_versioning(True) + + zonegroup_meta_checkpoint(zonegroup) + + # upload a dummy object and wait for sync. this forces each zone to finish + # a full sync and switch to incremental + new_key(zone, bucket, 'dummy').set_contents_from_string('') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # create several concurrent versions on each zone and let them race to sync + obj = 'obj' + for i in range(10): + for zone_conn in zonegroup_conns.rw_zones: + k = new_key(zone_conn, bucket, obj) + k.set_contents_from_string('version1') + log.debug('zone=%s version=%s', zone_conn.zone.name, k.version_id) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + zonegroup_data_checkpoint(zonegroup_conns) + +def test_version_suspended_incremental_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zone = zonegroup_conns.rw_zones[0] + + # create a non-versioned bucket + bucket = zone.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # upload an initial object + key1 = new_key(zone, bucket, 'obj') + key1.set_contents_from_string('') + log.debug('created initial version id=%s', key1.version_id) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # enable versioning + bucket.configure_versioning(True) + zonegroup_meta_checkpoint(zonegroup) + + # re-upload the object as a new version + key2 = new_key(zone, bucket, 'obj') + key2.set_contents_from_string('') + log.debug('created new version id=%s', key2.version_id) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # suspend versioning + bucket.configure_versioning(False) + zonegroup_meta_checkpoint(zonegroup) + + # re-upload the object as a 'null' version + key3 = new_key(zone, bucket, 'obj') + key3.set_contents_from_string('') + log.debug('created null version id=%s', key3.version_id) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_delete_marker_full_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + # enable versioning + for _, bucket in zone_bucket: + bucket.configure_versioning(True) + zonegroup_meta_checkpoint(zonegroup) + + for zone, bucket in zone_bucket: + # upload an initial object + key1 = new_key(zone, bucket, 'obj') + key1.set_contents_from_string('') + + # create a delete marker + key2 = new_key(zone, bucket, 'obj') + key2.delete() + + # wait for full sync + for _, bucket in zone_bucket: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_suspended_delete_marker_full_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + # enable/suspend versioning + for _, bucket in zone_bucket: + bucket.configure_versioning(True) + bucket.configure_versioning(False) + zonegroup_meta_checkpoint(zonegroup) + + for zone, bucket in zone_bucket: + # upload an initial object + key1 = new_key(zone, bucket, 'obj') + key1.set_contents_from_string('') + + # create a delete marker + key2 = new_key(zone, bucket, 'obj') + key2.delete() + + # wait for full sync + for _, bucket in zone_bucket: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_bucket_versioning(): + buckets, zone_bucket = create_bucket_per_zone_in_realm() + for _, bucket in zone_bucket: + bucket.configure_versioning(True) + res = bucket.get_versioning_status() + key = 'Versioning' + assert(key in res and res[key] == 'Enabled') + +def test_bucket_acl(): + buckets, zone_bucket = create_bucket_per_zone_in_realm() + for _, bucket in zone_bucket: + assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner + bucket.set_acl('public-read') + assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers + +def test_bucket_cors(): + buckets, zone_bucket = create_bucket_per_zone_in_realm() + for _, bucket in zone_bucket: + cors_cfg = CORSConfiguration() + cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000) + bucket.set_cors(cors_cfg) + assert(bucket.get_cors().to_xml() == cors_cfg.to_xml()) + +def test_bucket_delete_notempty(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + zonegroup_meta_checkpoint(zonegroup) + + for zone_conn, bucket_name in zone_bucket: + # upload an object to each bucket on its own zone + conn = zone_conn.get_connection() + bucket = conn.get_bucket(bucket_name) + k = bucket.new_key('foo') + k.set_contents_from_string('bar') + # attempt to delete the bucket before this object can sync + try: + conn.delete_bucket(bucket_name) + except boto.exception.S3ResponseError as e: + assert(e.error_code == 'BucketNotEmpty') + continue + assert False # expected 409 BucketNotEmpty + + # assert that each bucket still exists on the master + c1 = zonegroup_conns.master_zone.conn + for _, bucket_name in zone_bucket: + assert c1.get_bucket(bucket_name) + +def test_multi_period_incremental_sync(): + zonegroup = realm.master_zonegroup() + if len(zonegroup.zones) < 3: + raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.") + + # periods to include in mdlog comparison + mdlog_periods = [realm.current_period.id] + + # create a bucket in each zone + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + zonegroup_meta_checkpoint(zonegroup) + + z1, z2, z3 = zonegroup.zones[0:3] + assert(z1 == zonegroup.master_zone) + + # kill zone 3 gateways to freeze sync status to incremental in first period + z3.stop() + + # change master to zone 2 -> period 2 + set_master_zone(z2) + mdlog_periods += [realm.current_period.id] + + for zone_conn, _ in zone_bucket: + if zone_conn.zone == z3: + continue + bucket_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) + bucket = zone_conn.conn.create_bucket(bucket_name) + buckets.append(bucket_name) + + # wait for zone 1 to sync + zone_meta_checkpoint(z1) + + # change master back to zone 1 -> period 3 + set_master_zone(z1) + mdlog_periods += [realm.current_period.id] + + for zone_conn, bucket_name in zone_bucket: + if zone_conn.zone == z3: + continue + bucket_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) + zone_conn.conn.create_bucket(bucket_name) + buckets.append(bucket_name) + + # restart zone 3 gateway and wait for sync + z3.start() + zonegroup_meta_checkpoint(zonegroup) + + # verify that we end up with the same objects + for bucket_name in buckets: + for source_conn, _ in zone_bucket: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + + if target_conn.zone.has_buckets(): + target_conn.check_bucket_eq(source_conn, bucket_name) + + # verify that mdlogs are not empty and match for each period + for period in mdlog_periods: + master_mdlog = mdlog_list(z1, period) + assert len(master_mdlog) > 0 + for zone in zonegroup.zones: + if zone == z1: + continue + mdlog = mdlog_list(zone, period) + assert len(mdlog) == len(master_mdlog) + + # autotrim mdlogs for master zone + mdlog_autotrim(z1) + + # autotrim mdlogs for peers + for zone in zonegroup.zones: + if zone == z1: + continue + mdlog_autotrim(zone) + + # verify that mdlogs are empty for each period + for period in mdlog_periods: + for zone in zonegroup.zones: + mdlog = mdlog_list(zone, period) + assert len(mdlog) == 0 + +def test_datalog_autotrim(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + # upload an object to each zone to generate a datalog entry + for zone, bucket in zone_bucket: + k = new_key(zone, bucket.name, 'key') + k.set_contents_from_string('body') + + # wait for metadata and data sync to catch up + zonegroup_meta_checkpoint(zonegroup) + zonegroup_data_checkpoint(zonegroup_conns) + + # trim each datalog + for zone, _ in zone_bucket: + # read max markers for each shard + status = datalog_status(zone.zone) + + datalog_autotrim(zone.zone) + + for shard_id, shard_status in enumerate(status): + try: + before_trim = dateutil.parser.isoparse(shard_status['last_update']) + except: # empty timestamps look like "0.000000" and will fail here + continue + entries = datalog_list(zone.zone, ['--shard-id', str(shard_id), '--max-entries', '1']) + if not len(entries): + continue + after_trim = dateutil.parser.isoparse(entries[0]['timestamp']) + assert before_trim < after_trim, "any datalog entries must be newer than trim" + +def test_multi_zone_redirect(): + zonegroup = realm.master_zonegroup() + if len(zonegroup.rw_zones) < 2: + raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.") + + zonegroup_conns = ZonegroupConns(zonegroup) + (zc1, zc2) = zonegroup_conns.rw_zones[0:2] + + z1, z2 = (zc1.zone, zc2.zone) + + set_sync_from_all(z2, False) + + # create a bucket on the first zone + bucket_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', z1.name, bucket_name) + bucket = zc1.conn.create_bucket(bucket_name) + obj = 'testredirect' + + key = bucket.new_key(obj) + data = 'A'*512 + key.set_contents_from_string(data) + + zonegroup_meta_checkpoint(zonegroup) + + # try to read object from second zone (should fail) + bucket2 = get_bucket(zc2, bucket_name) + assert_raises(boto.exception.S3ResponseError, bucket2.get_key, obj) + + set_redirect_zone(z2, z1) + + key2 = bucket2.get_key(obj) + + eq(data, key2.get_contents_as_string(encoding='ascii')) + + key = bucket.new_key(obj) + + for x in ['a', 'b', 'c', 'd']: + data = x*512 + key.set_contents_from_string(data) + eq(data, key2.get_contents_as_string(encoding='ascii')) + + # revert config changes + set_sync_from_all(z2, True) + set_redirect_zone(z2, None) + +def test_zonegroup_remove(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + if len(zonegroup.zones) < 2: + raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.") + + zonegroup_meta_checkpoint(zonegroup) + z1, z2 = zonegroup.zones[0:2] + c1, c2 = (z1.cluster, z2.cluster) + + # get admin credentials out of existing zone + system_key = z1.data['system_key'] + admin_creds = Credentials(system_key['access_key'], system_key['secret_key']) + + # create a new zone in zonegroup on c2 and commit + zone = Zone('remove', zonegroup, c2) + zone.create(c2, admin_creds.credential_args()) + zonegroup.zones.append(zone) + zonegroup.period.update(zone, commit=True) + + zonegroup.remove(c1, zone) + + # another 'zonegroup remove' should fail with ENOENT + _, retcode = zonegroup.remove(c1, zone, check_retcode=False) + assert(retcode == 2) # ENOENT + + # delete the new zone + zone.delete(c2) + + # validate the resulting period + zonegroup.period.update(z1, commit=True) + + +def test_zg_master_zone_delete(): + + master_zg = realm.master_zonegroup() + master_zone = master_zg.master_zone + + assert(len(master_zg.zones) >= 1) + master_cluster = master_zg.zones[0].cluster + + rm_zg = ZoneGroup('remove_zg') + rm_zg.create(master_cluster) + + rm_zone = Zone('remove', rm_zg, master_cluster) + rm_zone.create(master_cluster) + master_zg.period.update(master_zone, commit=True) + + + rm_zone.delete(master_cluster) + # Period update: This should now fail as the zone will be the master zone + # in that zg + _, retcode = master_zg.period.update(master_zone, check_retcode=False) + assert(retcode == errno.EINVAL) + + # Proceed to delete the zonegroup as well, previous period now does not + # contain a dangling master_zone, this must succeed + rm_zg.delete(master_cluster) + master_zg.period.update(master_zone, commit=True) + +def test_set_bucket_website(): + buckets, zone_bucket = create_bucket_per_zone_in_realm() + for _, bucket in zone_bucket: + website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html') + try: + bucket.set_website_configuration(website_cfg) + except boto.exception.S3ResponseError as e: + if e.error_code == 'MethodNotAllowed': + raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.") + assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml()) + +def test_set_bucket_policy(): + policy = '''{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": "*" + }] +}''' + buckets, zone_bucket = create_bucket_per_zone_in_realm() + for _, bucket in zone_bucket: + bucket.set_policy(policy) + assert(bucket.get_policy().decode('ascii') == policy) + +@attr('bucket_sync_disable') +def test_bucket_sync_disable(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + zonegroup_meta_checkpoint(zonegroup) + + for bucket_name in buckets: + disable_bucket_sync(realm.meta_master_zone(), bucket_name) + + for zone in zonegroup.zones: + check_buckets_sync_status_obj_not_exist(zone, buckets) + + zonegroup_data_checkpoint(zonegroup_conns) + +@attr('bucket_sync_disable') +def test_bucket_sync_enable_right_after_disable(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + objnames = ['obj1', 'obj2', 'obj3', 'obj4'] + content = 'asdasd' + + for zone, bucket in zone_bucket: + for objname in objnames: + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string(content) + + zonegroup_meta_checkpoint(zonegroup) + + for bucket_name in buckets: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) + + for bucket_name in buckets: + disable_bucket_sync(realm.meta_master_zone(), bucket_name) + enable_bucket_sync(realm.meta_master_zone(), bucket_name) + + objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8'] + + for zone, bucket in zone_bucket: + for objname in objnames_2: + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string(content) + + for bucket_name in buckets: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) + + zonegroup_data_checkpoint(zonegroup_conns) + +@attr('bucket_sync_disable') +def test_bucket_sync_disable_enable(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ] + content = 'asdasd' + + for zone, bucket in zone_bucket: + for objname in objnames: + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string(content) + + zonegroup_meta_checkpoint(zonegroup) + + for bucket_name in buckets: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) + + for bucket_name in buckets: + disable_bucket_sync(realm.meta_master_zone(), bucket_name) + + zonegroup_meta_checkpoint(zonegroup) + + objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ] + + for zone, bucket in zone_bucket: + for objname in objnames_2: + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string(content) + + for bucket_name in buckets: + enable_bucket_sync(realm.meta_master_zone(), bucket_name) + + for bucket_name in buckets: + zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) + + zonegroup_data_checkpoint(zonegroup_conns) + +def test_multipart_object_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) + + _, bucket = zone_bucket[0] + + # initiate a multipart upload + upload = bucket.initiate_multipart_upload('MULTIPART') + mp = boto.s3.multipart.MultiPartUpload(bucket) + mp.key_name = upload.key_name + mp.id = upload.id + part_size = 5 * 1024 * 1024 # 5M min part size + mp.upload_part_from_file(StringIO('a' * part_size), 1) + mp.upload_part_from_file(StringIO('b' * part_size), 2) + mp.upload_part_from_file(StringIO('c' * part_size), 3) + mp.upload_part_from_file(StringIO('d' * part_size), 4) + mp.complete_upload() + + zonegroup_meta_checkpoint(zonegroup) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_encrypted_object_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + if len(zonegroup.rw_zones) < 2: + raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.") + + (zone1, zone2) = zonegroup_conns.rw_zones[0:2] + + # create a bucket on the first zone + bucket_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone1.name, bucket_name) + bucket = zone1.conn.create_bucket(bucket_name) + + # upload an object with sse-c encryption + sse_c_headers = { + 'x-amz-server-side-encryption-customer-algorithm': 'AES256', + 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==' + } + key = bucket.new_key('testobj-sse-c') + data = 'A'*512 + key.set_contents_from_string(data, headers=sse_c_headers) + + # upload an object with sse-kms encryption + sse_kms_headers = { + 'x-amz-server-side-encryption': 'aws:kms', + # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this) + 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', + } + key = bucket.new_key('testobj-sse-kms') + key.set_contents_from_string(data, headers=sse_kms_headers) + + # wait for the bucket metadata and data to sync + zonegroup_meta_checkpoint(zonegroup) + zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name) + + # read the encrypted objects from the second zone + bucket2 = get_bucket(zone2, bucket_name) + key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers) + eq(data, key.get_contents_as_string(headers=sse_c_headers, encoding='ascii')) + + key = bucket2.get_key('testobj-sse-kms') + eq(data, key.get_contents_as_string(encoding='ascii')) + +def test_bucket_index_log_trim(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zone = zonegroup_conns.rw_zones[0] + + # create a test bucket, upload some objects, and wait for sync + def make_test_bucket(): + name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone.name, name) + bucket = zone.conn.create_bucket(name) + for objname in ('a', 'b', 'c', 'd'): + k = new_key(zone, name, objname) + k.set_contents_from_string('foo') + zonegroup_meta_checkpoint(zonegroup) + zonegroup_bucket_checkpoint(zonegroup_conns, name) + return bucket + + # create a 'cold' bucket + cold_bucket = make_test_bucket() + + # trim with max-buckets=0 to clear counters for cold bucket. this should + # prevent it from being considered 'active' by the next autotrim + bilog_autotrim(zone.zone, [ + '--rgw-sync-log-trim-max-buckets', '0', + ]) + + # create an 'active' bucket + active_bucket = make_test_bucket() + + # trim with max-buckets=1 min-cold-buckets=0 to trim active bucket only + bilog_autotrim(zone.zone, [ + '--rgw-sync-log-trim-max-buckets', '1', + '--rgw-sync-log-trim-min-cold-buckets', '0', + ]) + + # verify active bucket has empty bilog + active_bilog = bilog_list(zone.zone, active_bucket.name) + assert(len(active_bilog) == 0) + + # verify cold bucket has nonempty bilog + cold_bilog = bilog_list(zone.zone, cold_bucket.name) + assert(len(cold_bilog) > 0) + + # trim with min-cold-buckets=999 to trim all buckets + bilog_autotrim(zone.zone, [ + '--rgw-sync-log-trim-max-buckets', '999', + '--rgw-sync-log-trim-min-cold-buckets', '999', + ]) + + # verify cold bucket has empty bilog + cold_bilog = bilog_list(zone.zone, cold_bucket.name) + assert(len(cold_bilog) == 0) + +def test_bucket_reshard_index_log_trim(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zone = zonegroup_conns.rw_zones[0] + + # create a test bucket, upload some objects, and wait for sync + def make_test_bucket(): + name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone.name, name) + bucket = zone.conn.create_bucket(name) + for objname in ('a', 'b', 'c', 'd'): + k = new_key(zone, name, objname) + k.set_contents_from_string('foo') + zonegroup_meta_checkpoint(zonegroup) + zonegroup_bucket_checkpoint(zonegroup_conns, name) + return bucket + + # create a 'test' bucket + test_bucket = make_test_bucket() + + # checking bucket layout before resharding + json_obj_1 = bucket_layout(zone.zone, test_bucket.name) + assert(len(json_obj_1['layout']['logs']) == 1) + + first_gen = json_obj_1['layout']['current_index']['gen'] + + before_reshard_bilog = bilog_list(zone.zone, test_bucket.name, ['--gen', str(first_gen)]) + assert(len(before_reshard_bilog) == 4) + + # Resharding the bucket + zone.zone.cluster.admin(['bucket', 'reshard', + '--bucket', test_bucket.name, + '--num-shards', '3', + '--yes-i-really-mean-it']) + + # checking bucket layout after 1st resharding + json_obj_2 = bucket_layout(zone.zone, test_bucket.name) + assert(len(json_obj_2['layout']['logs']) == 2) + + second_gen = json_obj_2['layout']['current_index']['gen'] + + after_reshard_bilog = bilog_list(zone.zone, test_bucket.name, ['--gen', str(second_gen)]) + assert(len(after_reshard_bilog) == 0) + + # upload more objects + for objname in ('e', 'f', 'g', 'h'): + k = new_key(zone, test_bucket.name, objname) + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name) + + # Resharding the bucket again + zone.zone.cluster.admin(['bucket', 'reshard', + '--bucket', test_bucket.name, + '--num-shards', '3', + '--yes-i-really-mean-it']) + + # checking bucket layout after 2nd resharding + json_obj_3 = bucket_layout(zone.zone, test_bucket.name) + assert(len(json_obj_3['layout']['logs']) == 3) + + zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name) + + bilog_autotrim(zone.zone) + + # checking bucket layout after 1st bilog autotrim + json_obj_4 = bucket_layout(zone.zone, test_bucket.name) + assert(len(json_obj_4['layout']['logs']) == 2) + + bilog_autotrim(zone.zone) + + # checking bucket layout after 2nd bilog autotrim + json_obj_5 = bucket_layout(zone.zone, test_bucket.name) + assert(len(json_obj_5['layout']['logs']) == 1) + + bilog_autotrim(zone.zone) + + # upload more objects + for objname in ('i', 'j', 'k', 'l'): + k = new_key(zone, test_bucket.name, objname) + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, test_bucket.name) + + # verify the bucket has non-empty bilog + test_bilog = bilog_list(zone.zone, test_bucket.name) + assert(len(test_bilog) > 0) + +@attr('bucket_reshard') +def test_bucket_reshard_incremental(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + zone = zonegroup_conns.rw_zones[0] + + # create a bucket + bucket = zone.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # upload some objects + for objname in ('a', 'b', 'c', 'd'): + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # reshard in each zone + for z in zonegroup_conns.rw_zones: + z.zone.cluster.admin(['bucket', 'reshard', + '--bucket', bucket.name, + '--num-shards', '3', + '--yes-i-really-mean-it']) + + # upload more objects + for objname in ('e', 'f', 'g', 'h'): + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +@attr('bucket_reshard') +def test_bucket_reshard_full(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + zone = zonegroup_conns.rw_zones[0] + + # create a bucket + bucket = zone.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # stop gateways in other zones so we can force the bucket to full sync + for z in zonegroup_conns.rw_zones[1:]: + z.zone.stop() + + # use try-finally to restart gateways even if something fails + try: + # upload some objects + for objname in ('a', 'b', 'c', 'd'): + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string('foo') + + # reshard on first zone + zone.zone.cluster.admin(['bucket', 'reshard', + '--bucket', bucket.name, + '--num-shards', '3', + '--yes-i-really-mean-it']) + + # upload more objects + for objname in ('e', 'f', 'g', 'h'): + k = new_key(zone, bucket.name, objname) + k.set_contents_from_string('foo') + finally: + for z in zonegroup_conns.rw_zones[1:]: + z.zone.start() + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_bucket_creation_time(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + zone_buckets = [zone.get_connection().get_all_buckets() for zone in zonegroup_conns.rw_zones] + for z1, z2 in combinations(zone_buckets, 2): + for a, b in zip(z1, z2): + eq(a.name, b.name) + eq(a.creation_date, b.creation_date) + +def get_bucket_shard_objects(zone, num_shards): + """ + Get one object for each shard of the bucket index log + """ + cmd = ['bucket', 'shard', 'objects'] + zone.zone_args() + cmd += ['--num-shards', str(num_shards)] + shardobjs_json, ret = zone.cluster.admin(cmd, read_only=True) + assert ret == 0 + shardobjs = json.loads(shardobjs_json) + return shardobjs['objs'] + +def write_most_shards(zone, bucket_name, num_shards): + """ + Write one object to most (but not all) bucket index shards. + """ + objs = get_bucket_shard_objects(zone.zone, num_shards) + random.shuffle(objs) + del objs[-(len(objs)//10):] + for obj in objs: + k = new_key(zone, bucket_name, obj) + k.set_contents_from_string('foo') + +def reshard_bucket(zone, bucket_name, num_shards): + """ + Reshard a bucket + """ + cmd = ['bucket', 'reshard'] + zone.zone_args() + cmd += ['--bucket', bucket_name] + cmd += ['--num-shards', str(num_shards)] + cmd += ['--yes-i-really-mean-it'] + zone.cluster.admin(cmd) + +def get_obj_names(zone, bucket_name, maxobjs): + """ + Get names of objects in a bucket. + """ + cmd = ['bucket', 'list'] + zone.zone_args() + cmd += ['--bucket', bucket_name] + cmd += ['--max-entries', str(maxobjs)] + objs_json, _ = zone.cluster.admin(cmd, read_only=True) + objs = json.loads(objs_json) + return [o['name'] for o in objs] + +def bucket_keys_eq(zone1, zone2, bucket_name): + """ + Ensure that two buckets have the same keys, but get the lists through + radosgw-admin rather than S3 so it can be used when radosgw isn't running. + Only works for buckets of 10,000 objects since the tests calling it don't + need more, and the output from bucket list doesn't have an obvious marker + with which to continue. + """ + keys1 = get_obj_names(zone1, bucket_name, 10000) + keys2 = get_obj_names(zone2, bucket_name, 10000) + for key1, key2 in zip_longest(keys1, keys2): + if key1 is None: + log.critical('key=%s is missing from zone=%s', key1.name, + zone1.name) + assert False + if key2 is None: + log.critical('key=%s is missing from zone=%s', key2.name, + zone2.name) + assert False + +@attr('bucket_reshard') +def test_bucket_sync_run_basic_incremental(): + """ + Create several generations of objects, then run bucket sync + run to ensure they're all processed. + """ + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + + # create a bucket write objects to it and wait for them to sync, ensuring + # we are in incremental. + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + write_most_shards(primary, bucket.name, 11) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + try: + # stop gateways in other zones so we can rely on bucket sync run + for secondary in zonegroup_conns.rw_zones[1:]: + secondary.zone.stop() + + # build up multiple generations each with some objects written to + # them. + generations = [17, 19, 23, 29, 31, 37] + for num_shards in generations: + reshard_bucket(primary.zone, bucket.name, num_shards) + write_most_shards(primary, bucket.name, num_shards) + + # bucket sync run on every secondary + for secondary in zonegroup_conns.rw_zones[1:]: + cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args() + cmd += ['--bucket', bucket.name, '--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + + bucket_keys_eq(primary.zone, secondary.zone, bucket.name) + + finally: + # Restart so bucket_checkpoint can actually fetch things from the + # secondaries. Put this in a finally block so they restart even on + # error. + for secondary in zonegroup_conns.rw_zones[1:]: + secondary.zone.start() + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def trash_bucket(zone, bucket_name): + """ + Remove objects through radosgw-admin, zapping bilog to prevent the deletes + from replicating. + """ + objs = get_obj_names(zone, bucket_name, 10000) + # Delete the objects + for obj in objs: + cmd = ['object', 'rm'] + zone.zone_args() + cmd += ['--bucket', bucket_name] + cmd += ['--object', obj] + zone.cluster.admin(cmd) + + # Zap the bilog + cmd = ['bilog', 'trim'] + zone.zone_args() + cmd += ['--bucket', bucket_name] + zone.cluster.admin(cmd) + +@attr('bucket_reshard') +def test_zap_init_bucket_sync_run(): + """ + Create several generations of objects, trash them, then run bucket sync init + and bucket sync run. + """ + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # Write zeroth generation + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * 11}') + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # Write several more generations + generations = [17, 19, 23, 29, 31, 37] + for num_shards in generations: + reshard_bucket(primary.zone, bucket.name, num_shards) + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * num_shards}') + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + + # Stop gateways, trash bucket, init, sync, and restart for every secondary + for secondary in zonegroup_conns.rw_zones[1:]: + try: + secondary.zone.stop() + + trash_bucket(secondary.zone, bucket.name) + + cmd = ['bucket', 'sync', 'init'] + secondary.zone.zone_args() + cmd += ['--bucket', bucket.name] + cmd += ['--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + + cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args() + cmd += ['--bucket', bucket.name, '--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + + bucket_keys_eq(primary.zone, secondary.zone, bucket.name) + + finally: + # Do this as a finally so we bring the zone back up even on error. + secondary.zone.start() + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + +def test_role_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + roles, zone_role = create_role_per_zone(zonegroup_conns) + + zonegroup_meta_checkpoint(zonegroup) + + for source_conn, role in zone_role: + for target_conn in zonegroup_conns.zones: + if source_conn.zone == target_conn.zone: + continue + + check_role_eq(source_conn, target_conn, role) + +@attr('data_sync_init') +def test_bucket_full_sync_after_data_sync_init(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + try: + # stop secondary zone before it starts a bucket full sync + secondary.zone.stop() + + # write some objects that don't sync yet + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * 11}') + k.set_contents_from_string('foo') + + cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() + cmd += ['--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + finally: + # Do this as a finally so we bring the zone back up even on error. + secondary.zone.start() + + # expect all objects to replicate via 'bucket full sync' + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + zonegroup_data_checkpoint(zonegroup_conns) + +@attr('data_sync_init') +@attr('bucket_reshard') +def test_resharded_bucket_full_sync_after_data_sync_init(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + try: + # stop secondary zone before it starts a bucket full sync + secondary.zone.stop() + + # Write zeroth generation + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * 11}') + k.set_contents_from_string('foo') + + # Write several more generations + generations = [17, 19, 23, 29, 31, 37] + for num_shards in generations: + reshard_bucket(primary.zone, bucket.name, num_shards) + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * num_shards}') + k.set_contents_from_string('foo') + + cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() + cmd += ['--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + finally: + # Do this as a finally so we bring the zone back up even on error. + secondary.zone.start() + + # expect all objects to replicate via 'bucket full sync' + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + zonegroup_data_checkpoint(zonegroup_conns) + +@attr('data_sync_init') +def test_bucket_incremental_sync_after_data_sync_init(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # upload a dummy object and wait for sync. this forces each zone to finish + # a full sync and switch to incremental + k = new_key(primary, bucket, 'dummy') + k.set_contents_from_string('foo') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + try: + # stop secondary zone before it syncs the rest + secondary.zone.stop() + + # Write more objects to primary + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * 11}') + k.set_contents_from_string('foo') + + cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() + cmd += ['--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + finally: + # Do this as a finally so we bring the zone back up even on error. + secondary.zone.start() + + # expect remaining objects to replicate via 'bucket incremental sync' + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + zonegroup_data_checkpoint(zonegroup_conns) + +@attr('data_sync_init') +@attr('bucket_reshard') +def test_resharded_bucket_incremental_sync_latest_after_data_sync_init(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # Write zeroth generation to primary + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * 11}') + k.set_contents_from_string('foo') + + # Write several more generations + generations = [17, 19, 23, 29, 31, 37] + for num_shards in generations: + reshard_bucket(primary.zone, bucket.name, num_shards) + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * num_shards}') + k.set_contents_from_string('foo') + + # wait for the secondary to catch up to the latest gen + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + try: + # stop secondary zone before it syncs the rest + secondary.zone.stop() + + # write some more objects to the last gen + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}') + k.set_contents_from_string('foo') + + cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() + cmd += ['--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + finally: + # Do this as a finally so we bring the zone back up even on error. + secondary.zone.start() + + # expect remaining objects in last gen to replicate via 'bucket incremental sync' + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + zonegroup_data_checkpoint(zonegroup_conns) + +@attr('data_sync_init') +@attr('bucket_reshard') +def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # Write zeroth generation to primary + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * 11}') + k.set_contents_from_string('foo') + + # wait for the secondary to catch up + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + try: + # stop secondary zone before it syncs later generations + secondary.zone.stop() + + # Write several more generations + generations = [17, 19, 23, 29, 31, 37] + for num_shards in generations: + reshard_bucket(primary.zone, bucket.name, num_shards) + for obj in range(1, 6): + k = new_key(primary, bucket.name, f'obj{obj * num_shards}') + k.set_contents_from_string('foo') + + cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args() + cmd += ['--source-zone', primary.name] + secondary.zone.cluster.admin(cmd) + finally: + # Do this as a finally so we bring the zone back up even on error. + secondary.zone.start() + + # expect all generations to replicate via 'bucket incremental sync' + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + zonegroup_data_checkpoint(zonegroup_conns) + +def sync_info(cluster, bucket = None): + cmd = ['sync', 'info'] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to get sync policy' + + return json.loads(result_json) + +def get_sync_policy(cluster, bucket = None): + cmd = ['sync', 'policy', 'get'] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to get sync policy' + + return json.loads(result_json) + +def create_sync_policy_group(cluster, group, status = "allowed", bucket = None): + cmd = ['sync', 'group', 'create', '--group-id', group, '--status' , status] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to create sync policy group id=%s, bucket=%s' % (group, bucket) + return json.loads(result_json) + +def set_sync_policy_group_status(cluster, group, status, bucket = None): + cmd = ['sync', 'group', 'modify', '--group-id', group, '--status' , status] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to set sync policy group id=%s, bucket=%s' % (group, bucket) + return json.loads(result_json) + +def get_sync_policy_group(cluster, group, bucket = None): + cmd = ['sync', 'group', 'get', '--group-id', group] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to get sync policy group id=%s, bucket=%s' % (group, bucket) + return json.loads(result_json) + +def remove_sync_policy_group(cluster, group, bucket = None): + cmd = ['sync', 'group', 'remove', '--group-id', group] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to remove sync policy group id=%s, bucket=%s' % (group, bucket) + return json.loads(result_json) + +def create_sync_group_flow_symmetrical(cluster, group, flow_id, zones, bucket = None): + cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical', '--zones=%s' % zones] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to create sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket) + return json.loads(result_json) + +def create_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None): + cmd = ['sync', 'group', 'flow', 'create', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to create sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket) + return json.loads(result_json) + +def remove_sync_group_flow_symmetrical(cluster, group, flow_id, zones = None, bucket = None): + cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'symmetrical'] + if zones: + cmd += ['--zones=%s' % zones] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to remove sync group flow symmetrical groupid=%s, flow_id=%s, zones=%s, bucket=%s' % (group, flow_id, zones, bucket) + return json.loads(result_json) + +def remove_sync_group_flow_directional(cluster, group, flow_id, src_zones, dest_zones, bucket = None): + cmd = ['sync', 'group', 'flow', 'remove', '--group-id', group, '--flow-id' , flow_id, '--flow-type', 'directional', '--source-zone=%s' % src_zones, '--dest-zone=%s' % dest_zones] + if bucket: + cmd += ['--bucket', bucket] + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to remove sync group flow directional groupid=%s, flow_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, flow_id, src_zones, dest_zones, bucket) + return json.loads(result_json) + +def create_sync_group_pipe(cluster, group, pipe_id, src_zones, dest_zones, bucket = None, args = []): + cmd = ['sync', 'group', 'pipe', 'create', '--group-id', group, '--pipe-id' , pipe_id, '--source-zones=%s' % src_zones, '--dest-zones=%s' % dest_zones] + if bucket: + b_args = '--bucket=' + bucket + cmd.append(b_args) + if args: + cmd += args + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to create sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket) + return json.loads(result_json) + +def remove_sync_group_pipe(cluster, group, pipe_id, bucket = None, args = None): + cmd = ['sync', 'group', 'pipe', 'remove', '--group-id', group, '--pipe-id' , pipe_id] + if bucket: + b_args = '--bucket=' + bucket + cmd.append(b_args) + if args: + cmd.append(args) + (result_json, retcode) = cluster.admin(cmd) + if retcode != 0: + assert False, 'failed to remove sync group pipe groupid=%s, pipe_id=%s, src_zones=%s, dest_zones=%s, bucket=%s' % (group, pipe_id, src_zones, dest_zones, bucket) + return json.loads(result_json) + +def create_zone_bucket(zone): + b_name = gen_bucket_name() + log.info('create bucket zone=%s name=%s', zone.name, b_name) + bucket = zone.create_bucket(b_name) + return bucket + +def create_object(zone_conn, bucket, objname, content): + k = new_key(zone_conn, bucket.name, objname) + k.set_contents_from_string(content) + +def create_objects(zone_conn, bucket, obj_arr, content): + for objname in obj_arr: + create_object(zone_conn, bucket, objname, content) + +def check_object_exists(bucket, objname, content = None): + k = bucket.get_key(objname) + assert_not_equal(k, None) + if (content != None): + assert_equal(k.get_contents_as_string(encoding='ascii'), content) + +def check_objects_exist(bucket, obj_arr, content = None): + for objname in obj_arr: + check_object_exists(bucket, objname, content) + +def check_object_not_exists(bucket, objname): + k = bucket.get_key(objname) + assert_equal(k, None) + +def check_objects_not_exist(bucket, obj_arr): + for objname in obj_arr: + check_object_not_exists(bucket, objname) + +@attr('sync_policy') +def test_sync_policy_config_zonegroup(): + """ + test_sync_policy_config_zonegroup: + test configuration of all sync commands + """ + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_conns = ZonegroupConns(zonegroup) + z1, z2 = zonegroup.zones[0:2] + c1, c2 = (z1.cluster, z2.cluster) + + zones = z1.name+","+z2.name + + c1.admin(['sync', 'policy', 'get']) + + # (a) zonegroup level + create_sync_policy_group(c1, "sync-group") + set_sync_policy_group_status(c1, "sync-group", "enabled") + get_sync_policy_group(c1, "sync-group") + + get_sync_policy(c1) + + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + create_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name) + + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + get_sync_policy_group(c1, "sync-group") + + zonegroup.period.update(z1, commit=True) + + # (b) bucket level + zc1, zc2 = zonegroup_conns.zones[0:2] + bucket = create_zone_bucket(zc1) + bucket_name = bucket.name + + create_sync_policy_group(c1, "sync-bucket", "allowed", bucket_name) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucket_name) + get_sync_policy_group(c1, "sync-bucket", bucket_name) + + get_sync_policy(c1, bucket_name) + + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name) + create_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name) + + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucket_name) + get_sync_policy_group(c1, "sync-bucket", bucket_name) + + zonegroup_meta_checkpoint(zonegroup) + + remove_sync_group_pipe(c1, "sync-bucket", "sync-pipe", bucket_name) + remove_sync_group_flow_directional(c1, "sync-bucket", "sync-flow2", z1.name, z2.name, bucket_name) + remove_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flow1", zones, bucket_name) + remove_sync_policy_group(c1, "sync-bucket", bucket_name) + + get_sync_policy(c1, bucket_name) + + zonegroup_meta_checkpoint(zonegroup) + + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + remove_sync_group_flow_directional(c1, "sync-group", "sync-flow2", z1.name, z2.name) + remove_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1") + remove_sync_policy_group(c1, "sync-group") + + get_sync_policy(c1) + + zonegroup.period.update(z1, commit=True) + + return + +@attr('sync_policy') +def test_sync_flow_symmetrical_zonegroup_all(): + """ + test_sync_flow_symmetrical_zonegroup_all: + allows sync from all the zones to all other zones (default case) + """ + + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_conns = ZonegroupConns(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + c1.admin(['sync', 'policy', 'get']) + + zones = zoneA.name + ',' + zoneB.name + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + set_sync_policy_group_status(c1, "sync-group", "enabled") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + objnames = [ 'obj1', 'obj2' ] + content = 'asdasd' + buckets = [] + + # create bucket & object in all zones + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + create_object(zcA, bucketA, objnames[0], content) + + bucketB = create_zone_bucket(zcB) + buckets.append(bucketB) + create_object(zcB, bucketB, objnames[1], content) + + zonegroup_meta_checkpoint(zonegroup) + # 'zonegroup_data_checkpoint' currently fails for the zones not + # allowed to sync. So as a workaround, data checkpoint is done + # for only the ones configured. + zone_data_checkpoint(zoneB, zoneA) + + # verify if objects are synced accross the zone + bucket = get_bucket(zcB, bucketA.name) + check_object_exists(bucket, objnames[0], content) + + bucket = get_bucket(zcA, bucketB.name) + check_object_exists(bucket, objnames[1], content) + + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +def test_sync_flow_symmetrical_zonegroup_select(): + """ + test_sync_flow_symmetrical_zonegroup_select: + allow sync between zoneA & zoneB + verify zoneC doesnt sync the data + """ + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + if len(zonegroup.zones) < 3: + raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] + (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + set_sync_policy_group_status(c1, "sync-group", "enabled") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + buckets = [] + content = 'asdasd' + + # create bucketA & objects in zoneA + objnamesA = [ 'obj1', 'obj2', 'obj3' ] + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + create_objects(zcA, bucketA, objnamesA, content) + + # create bucketB & objects in zoneB + objnamesB = [ 'obj4', 'obj5', 'obj6' ] + bucketB = create_zone_bucket(zcB) + buckets.append(bucketB) + create_objects(zcB, bucketB, objnamesB, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + zone_data_checkpoint(zoneA, zoneB) + + # verify if objnamesA synced to only zoneB but not zoneC + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesA, content) + + bucket = get_bucket(zcC, bucketA.name) + check_objects_not_exist(bucket, objnamesA) + + # verify if objnamesB synced to only zoneA but not zoneC + bucket = get_bucket(zcA, bucketB.name) + check_objects_exist(bucket, objnamesB, content) + + bucket = get_bucket(zcC, bucketB.name) + check_objects_not_exist(bucket, objnamesB) + + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +def test_sync_flow_directional_zonegroup_select(): + """ + test_sync_flow_directional_zonegroup_select: + allow sync from only zoneA to zoneB + + verify that data doesn't get synced to zoneC and + zoneA shouldn't sync data from zoneB either + """ + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + if len(zonegroup.zones) < 3: + raise SkipTest("test_sync_flow_symmetrical_zonegroup_select skipped. Requires 3 or more zones in master zonegroup.") + + zonegroup_meta_checkpoint(zonegroup) + + (zoneA, zoneB, zoneC) = zonegroup.zones[0:3] + (zcA, zcB, zcC) = zonegroup_conns.zones[0:3] + + c1 = zoneA.cluster + + # configure sync policy + zones = zoneA.name + ',' + zoneB.name + c1.admin(['sync', 'policy', 'get']) + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name) + set_sync_policy_group_status(c1, "sync-group", "enabled") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + buckets = [] + content = 'asdasd' + + # create bucketA & objects in zoneA + objnamesA = [ 'obj1', 'obj2', 'obj3' ] + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + create_objects(zcA, bucketA, objnamesA, content) + + # create bucketB & objects in zoneB + objnamesB = [ 'obj4', 'obj5', 'obj6' ] + bucketB = create_zone_bucket(zcB) + buckets.append(bucketB) + create_objects(zcB, bucketB, objnamesB, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify if objnamesA synced to only zoneB but not zoneC + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesA, content) + + bucket = get_bucket(zcC, bucketA.name) + check_objects_not_exist(bucket, objnamesA) + + # verify if objnamesB are not synced to either zoneA or zoneC + bucket = get_bucket(zcA, bucketB.name) + check_objects_not_exist(bucket, objnamesB) + + bucket = get_bucket(zcC, bucketB.name) + check_objects_not_exist(bucket, objnamesB) + + """ + verify the same at bucketA level + configure another policy at bucketA level with src and dest + zones specified to zoneA and zoneB resp. + + verify zoneA bucketA syncs to zoneB BucketA but not viceversa. + """ + # reconfigure zonegroup pipe & flow + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + args = ['--source-bucket=*', '--dest-bucket=*'] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zoneA.name, zoneB.name, bucketA.name, args) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + + zonegroup_meta_checkpoint(zonegroup) + + # create objects in bucketA in zoneA and zoneB + objnamesC = [ 'obj7', 'obj8', 'obj9' ] + objnamesD = [ 'obj10', 'obj11', 'obj12' ] + create_objects(zcA, bucketA, objnamesC, content) + create_objects(zcB, bucketA, objnamesD, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objnamesC are synced to bucketA in zoneB + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesC, content) + + # verify that objnamesD are not synced to bucketA in zoneA + bucket = get_bucket(zcA, bucketA.name) + check_objects_not_exist(bucket, objnamesD) + + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +def test_sync_single_bucket(): + """ + test_sync_single_bucket: + Allow data sync for only bucketA but not for other buckets via + below 2 methods + + (a) zonegroup: symmetrical flow but configure pipe for only bucketA. + (b) bucket level: configure policy for bucketA + """ + + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_conns = ZonegroupConns(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + + c1 = zoneA.cluster + + c1.admin(['sync', 'policy', 'get']) + + zones = zoneA.name + ',' + zoneB.name + get_sync_policy(c1) + + objnames = [ 'obj1', 'obj2', 'obj3' ] + content = 'asdasd' + buckets = [] + + # create bucketA & bucketB in zoneA + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + bucketB = create_zone_bucket(zcA) + buckets.append(bucketB) + + zonegroup_meta_checkpoint(zonegroup) + + """ + Method (a): configure pipe for only bucketA + """ + # configure sync policy & pipe for only bucketA + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketA.name] + + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones, None, args) + set_sync_policy_group_status(c1, "sync-group", "enabled") + get_sync_policy(c1) + zonegroup.period.update(zoneA, commit=True) + + sync_info(c1) + + # create objects in bucketA & bucketB + create_objects(zcA, bucketA, objnames, content) + create_object(zcA, bucketB, objnames, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify if bucketA objects are synced + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnames, content) + + # bucketB objects should not be synced + bucket = get_bucket(zcB, bucketB.name) + check_objects_not_exist(bucket, objnames) + + + """ + Method (b): configure policy at only bucketA level + """ + # reconfigure group pipe + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + + # create object in bucketA + create_object(zcA, bucketA, objnames[2], content) + + # create object in bucketA too + create_object(zcA, bucketB, objnames[2], content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify if bucketA objects are synced + bucket = get_bucket(zcB, bucketA.name) + check_object_exists(bucket, objnames[2], content) + + # bucketB objects should not be synced + bucket = get_bucket(zcB, bucketB.name) + check_object_not_exists(bucket, objnames[2]) + + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +def test_sync_different_buckets(): + """ + test_sync_different_buckets: + sync zoneA bucketA to zoneB bucketB via below methods + + (a) zonegroup: directional flow but configure pipe for zoneA bucketA to zoneB bucketB + (b) bucket: configure another policy at bucketA level with pipe set to + another bucket(bucketB) in target zone. + + sync zoneA bucketA from zoneB bucketB + (c) configure another policy at bucketA level with pipe set from + another bucket(bucketB) in source zone. + + """ + + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_conns = ZonegroupConns(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + zones = zoneA.name + ',' + zoneB.name + + c1 = zoneA.cluster + + c1.admin(['sync', 'policy', 'get']) + + objnames = [ 'obj1', 'obj2' ] + objnamesB = [ 'obj3', 'obj4' ] + content = 'asdasd' + buckets = [] + + # create bucketA & bucketB in zoneA + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + bucketB = create_zone_bucket(zcA) + buckets.append(bucketB) + + zonegroup_meta_checkpoint(zonegroup) + + """ + Method (a): zonegroup - configure pipe for only bucketA + """ + # configure pipe from zoneA bucketA to zoneB bucketB + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + bucketB.name] + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args) + set_sync_policy_group_status(c1, "sync-group", "enabled") + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # create objects in bucketA + create_objects(zcA, bucketA, objnames, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objects are synced to bucketB in zoneB + # but not to bucketA + bucket = get_bucket(zcB, bucketA.name) + check_objects_not_exist(bucket, objnames) + + bucket = get_bucket(zcB, bucketB.name) + check_objects_exist(bucket, objnames, content) + """ + Method (b): configure policy at only bucketA level with pipe + set to bucketB in target zone + """ + + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + args = ['--source-bucket=*', '--dest-bucket=' + bucketB.name] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipeA", zones, zones, bucketA.name, args) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + + objnamesC = [ 'obj5', 'obj6' ] + + zonegroup_meta_checkpoint(zonegroup) + # create objects in bucketA + create_objects(zcA, bucketA, objnamesC, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + """ + # verify that objects are synced to bucketB in zoneB + # but not to bucketA + """ + bucket = get_bucket(zcB, bucketA.name) + check_objects_not_exist(bucket, objnamesC) + + bucket = get_bucket(zcB, bucketB.name) + check_objects_exist(bucket, objnamesC, content) + + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + zonegroup_meta_checkpoint(zonegroup) + get_sync_policy(c1, bucketA.name) + + """ + Method (c): configure policy at only bucketA level with pipe + set from bucketB in source zone + verify zoneA bucketA syncs from zoneB BucketB but not bucketA + """ + + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + args = ['--source-bucket=' + bucketB.name, '--dest-bucket=' + '*'] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe", zones, zones, bucketA.name, args) + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1, bucketA.name) + + # create objects in bucketA & B in ZoneB + objnamesD = [ 'obj7', 'obj8' ] + objnamesE = [ 'obj9', 'obj10' ] + + create_objects(zcB, bucketA, objnamesD, content) + create_objects(zcB, bucketB, objnamesE, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneA, zoneB) + """ + # verify that objects from only bucketB are synced to + # bucketA in zoneA + """ + bucket = get_bucket(zcA, bucketA.name) + check_objects_not_exist(bucket, objnamesD) + check_objects_exist(bucket, objnamesE, content) + + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +def test_sync_multiple_buckets_to_single(): + """ + test_sync_multiple_buckets_to_single: + directional flow + (a) pipe: sync zoneA bucketA,bucketB to zoneB bucketB + + (b) configure another policy at bucketA level with pipe configured + to sync from multiple buckets (bucketA & bucketB) + + verify zoneA bucketA & bucketB syncs to zoneB BucketB + """ + + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_conns = ZonegroupConns(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + zones = zoneA.name + ',' + zoneB.name + + c1 = zoneA.cluster + + c1.admin(['sync', 'policy', 'get']) + + objnamesA = [ 'obj1', 'obj2' ] + objnamesB = [ 'obj3', 'obj4' ] + content = 'asdasd' + buckets = [] + + # create bucketA & bucketB in zoneA + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + bucketB = create_zone_bucket(zcA) + buckets.append(bucketB) + + zonegroup_meta_checkpoint(zonegroup) + + # configure pipe from zoneA bucketA,bucketB to zoneB bucketB + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) + source_buckets = [ bucketA.name, bucketB.name ] + for source_bucket in source_buckets: + args = ['--source-bucket=' + source_bucket, '--dest-bucket=' + bucketB.name] + create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, None, args) + + set_sync_policy_group_status(c1, "sync-group", "enabled") + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # create objects in bucketA & bucketB + create_objects(zcA, bucketA, objnamesA, content) + create_objects(zcA, bucketB, objnamesB, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that both zoneA bucketA & bucketB objects are synced to + # bucketB in zoneB but not to bucketA + bucket = get_bucket(zcB, bucketA.name) + check_objects_not_exist(bucket, objnamesA) + check_objects_not_exist(bucket, objnamesB) + + bucket = get_bucket(zcB, bucketB.name) + check_objects_exist(bucket, objnamesA, content) + check_objects_exist(bucket, objnamesB, content) + + """ + Method (b): configure at bucket level + """ + # reconfigure pipe & flow + for source_bucket in source_buckets: + remove_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % source_bucket) + remove_sync_group_flow_directional(c1, "sync-group", "sync-flow", zoneA.name, zoneB.name) + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zones, zones) + + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + objnamesC = [ 'obj5', 'obj6' ] + objnamesD = [ 'obj7', 'obj8' ] + + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + source_buckets = [ bucketA.name, bucketB.name ] + for source_bucket in source_buckets: + args = ['--source-bucket=' + source_bucket, '--dest-bucket=' + '*'] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % source_bucket, zoneA.name, zoneB.name, bucketA.name, args) + + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1) + + zonegroup_meta_checkpoint(zonegroup) + # create objects in bucketA + create_objects(zcA, bucketA, objnamesC, content) + create_objects(zcA, bucketB, objnamesD, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that both zoneA bucketA & bucketB objects are synced to + # bucketA in zoneB but not to bucketB + bucket = get_bucket(zcB, bucketB.name) + check_objects_not_exist(bucket, objnamesC) + check_objects_not_exist(bucket, objnamesD) + + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesD, content) + check_objects_exist(bucket, objnamesD, content) + + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + remove_sync_policy_group(c1, "sync-group") + return + +@attr('sync_policy') +def test_sync_single_bucket_to_multiple(): + """ + test_sync_single_bucket_to_multiple: + directional flow + (a) pipe: sync zoneA bucketA to zoneB bucketA & bucketB + + (b) configure another policy at bucketA level with pipe configured + to sync to multiple buckets (bucketA & bucketB) + + verify zoneA bucketA syncs to zoneB bucketA & bucketB + """ + + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_conns = ZonegroupConns(zonegroup) + + (zoneA, zoneB) = zonegroup.zones[0:2] + (zcA, zcB) = zonegroup_conns.zones[0:2] + zones = zoneA.name + ',' + zoneB.name + + c1 = zoneA.cluster + + c1.admin(['sync', 'policy', 'get']) + + objnamesA = [ 'obj1', 'obj2' ] + content = 'asdasd' + buckets = [] + + # create bucketA & bucketB in zoneA + bucketA = create_zone_bucket(zcA) + buckets.append(bucketA) + bucketB = create_zone_bucket(zcA) + buckets.append(bucketB) + + zonegroup_meta_checkpoint(zonegroup) + + # configure pipe from zoneA bucketA to zoneB bucketA, bucketB + create_sync_policy_group(c1, "sync-group") + create_sync_group_flow_symmetrical(c1, "sync-group", "sync-flow1", zones) + + dest_buckets = [ bucketA.name, bucketB.name ] + for dest_bucket in dest_buckets: + args = ['--source-bucket=' + bucketA.name, '--dest-bucket=' + dest_bucket] + create_sync_group_pipe(c1, "sync-group", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, None, args) + + create_sync_group_pipe(c1, "sync-group", "sync-pipe", zoneA.name, zoneB.name, None, args) + set_sync_policy_group_status(c1, "sync-group", "enabled") + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + # create objects in bucketA + create_objects(zcA, bucketA, objnamesA, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objects from zoneA bucketA are synced to both + # bucketA & bucketB in zoneB + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesA, content) + + bucket = get_bucket(zcB, bucketB.name) + check_objects_exist(bucket, objnamesA, content) + + """ + Method (b): configure at bucket level + """ + remove_sync_group_pipe(c1, "sync-group", "sync-pipe") + create_sync_group_pipe(c1, "sync-group", "sync-pipe", '*', '*') + + # change state to allowed + set_sync_policy_group_status(c1, "sync-group", "allowed") + + zonegroup.period.update(zoneA, commit=True) + get_sync_policy(c1) + + objnamesB = [ 'obj3', 'obj4' ] + + # configure sync policy for only bucketA and enable it + create_sync_policy_group(c1, "sync-bucket", "allowed", bucketA.name) + create_sync_group_flow_symmetrical(c1, "sync-bucket", "sync-flowA", zones, bucketA.name) + dest_buckets = [ bucketA.name, bucketB.name ] + for dest_bucket in dest_buckets: + args = ['--source-bucket=' + '*', '--dest-bucket=' + dest_bucket] + create_sync_group_pipe(c1, "sync-bucket", "sync-pipe-%s" % dest_bucket, zoneA.name, zoneB.name, bucketA.name, args) + + set_sync_policy_group_status(c1, "sync-bucket", "enabled", bucketA.name) + + get_sync_policy(c1) + + zonegroup_meta_checkpoint(zonegroup) + # create objects in bucketA + create_objects(zcA, bucketA, objnamesB, content) + + zonegroup_meta_checkpoint(zonegroup) + zone_data_checkpoint(zoneB, zoneA) + + # verify that objects from zoneA bucketA are synced to both + # bucketA & bucketB in zoneB + bucket = get_bucket(zcB, bucketA.name) + check_objects_exist(bucket, objnamesB, content) + + bucket = get_bucket(zcB, bucketB.name) + check_objects_exist(bucket, objnamesB, content) + + remove_sync_policy_group(c1, "sync-bucket", bucketA.name) + remove_sync_policy_group(c1, "sync-group") + return diff --git a/src/test/rgw/rgw_multi/tests_az.py b/src/test/rgw/rgw_multi/tests_az.py new file mode 100644 index 000000000..13ec832a2 --- /dev/null +++ b/src/test/rgw/rgw_multi/tests_az.py @@ -0,0 +1,597 @@ +import logging + +from nose import SkipTest +from nose.tools import assert_not_equal, assert_equal + +from boto.s3.deletemarker import DeleteMarker + +from .tests import get_realm, \ + ZonegroupConns, \ + zonegroup_meta_checkpoint, \ + zone_meta_checkpoint, \ + zone_bucket_checkpoint, \ + zone_data_checkpoint, \ + zonegroup_bucket_checkpoint, \ + check_bucket_eq, \ + gen_bucket_name, \ + get_user, \ + get_tenant + +from .zone_az import print_connection_info + + +# configure logging for the tests module +log = logging.getLogger(__name__) + + +########################################## +# utility functions for archive zone tests +########################################## + +def check_az_configured(): + """check if at least one archive zone exist""" + realm = get_realm() + zonegroup = realm.master_zonegroup() + + az_zones = zonegroup.zones_by_type.get("archive") + if az_zones is None or len(az_zones) != 1: + raise SkipTest("Requires one archive zone") + + +def is_az_zone(zone_conn): + """check if a specific zone is archive zone""" + if not zone_conn: + return False + return zone_conn.zone.tier_type() == "archive" + + +def init_env(): + """initialize the environment""" + check_az_configured() + + realm = get_realm() + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + zonegroup_meta_checkpoint(zonegroup) + + az_zones = [] + zones = [] + for conn in zonegroup_conns.zones: + if is_az_zone(conn): + zone_meta_checkpoint(conn.zone) + az_zones.append(conn) + elif not conn.zone.is_read_only(): + zones.append(conn) + + assert_not_equal(len(zones), 0) + assert_not_equal(len(az_zones), 0) + return zones, az_zones + + +def zone_full_checkpoint(target_zone, source_zone): + zone_meta_checkpoint(target_zone) + zone_data_checkpoint(target_zone, source_zone) + + +def check_bucket_exists_on_zone(zone, bucket_name): + try: + zone.conn.get_bucket(bucket_name) + except: + return False + return True + + +def check_key_exists(key): + try: + key.get_contents_as_string() + except: + return False + return True + + +def get_versioning_status(bucket): + res = bucket.get_versioning_status() + key = 'Versioning' + if not key in res: + return None + else: + return res[key] + + +def get_versioned_objs(bucket): + b = [] + for b_entry in bucket.list_versions(): + if isinstance(b_entry, DeleteMarker): + continue + d = {} + d['version_id'] = b_entry.version_id + d['size'] = b_entry.size + d['etag'] = b_entry.etag + d['is_latest'] = b_entry.is_latest + b.append({b_entry.key:d}) + return b + + +def get_versioned_entries(bucket): + dm = [] + ver = [] + for b_entry in bucket.list_versions(): + if isinstance(b_entry, DeleteMarker): + d = {} + d['version_id'] = b_entry.version_id + d['is_latest'] = b_entry.is_latest + dm.append({b_entry.name:d}) + else: + d = {} + d['version_id'] = b_entry.version_id + d['size'] = b_entry.size + d['etag'] = b_entry.etag + d['is_latest'] = b_entry.is_latest + ver.append({b_entry.key:d}) + return (dm, ver) + + +def get_number_buckets_by_zone(zone): + return len(zone.conn.get_all_buckets()) + + +def get_bucket_names_by_zone(zone): + return [b.name for b in zone.conn.get_all_buckets()] + + +def get_full_bucket_name(partial_bucket_name, bucket_names_az): + full_bucket_name = None + for bucket_name in bucket_names_az: + if bucket_name.startswith(partial_bucket_name): + full_bucket_name = bucket_name + break + return full_bucket_name + + +#################### +# archive zone tests +#################### + + +def test_az_info(): + """ log information for manual testing """ + return SkipTest("only used in manual testing") + zones, az_zones = init_env() + realm = get_realm() + zonegroup = realm.master_zonegroup() + bucket_name = gen_bucket_name() + # create bucket on the first of the rados zones + bucket = zones[0].create_bucket(bucket_name) + # create objects in the bucket + number_of_objects = 3 + for i in range(number_of_objects): + key = bucket.new_key(str(i)) + key.set_contents_from_string('bar') + print('Zonegroup: ' + zonegroup.name) + print('user: ' + get_user()) + print('tenant: ' + get_tenant()) + print('Master Zone') + print_connection_info(zones[0].conn) + print('Archive Zone') + print_connection_info(az_zones[0].conn) + print('Bucket: ' + bucket_name) + + +def test_az_create_empty_bucket(): + """ test empty bucket replication """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create bucket on the non archive zone + zones[0].create_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # bucket exist on the archive zone + p = check_bucket_exists_on_zone(az_zones[0], bucket_name) + assert_equal(p, True) + + +def test_az_check_empty_bucket_versioning(): + """ test bucket vesioning with empty bucket """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create bucket on the non archive zone + bucket = zones[0].create_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # get bucket on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + # check for non bucket versioning + p1 = get_versioning_status(bucket) is None + assert_equal(p1, True) + p2 = get_versioning_status(bucket_az) is None + assert_equal(p2, True) + + +def test_az_object_replication(): + """ test object replication """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create bucket on the non archive zone + bucket = zones[0].create_bucket(bucket_name) + key = bucket.new_key("foo") + key.set_contents_from_string("bar") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check object on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + key_az = bucket_az.get_key("foo") + p1 = key_az.get_contents_as_string(encoding='ascii') == "bar" + assert_equal(p1, True) + + +def test_az_object_replication_versioning(): + """ test object replication versioning """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create object on the non archive zone + bucket = zones[0].create_bucket(bucket_name) + key = bucket.new_key("foo") + key.set_contents_from_string("bar") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check object content on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + key_az = bucket_az.get_key("foo") + p1 = key_az.get_contents_as_string(encoding='ascii') == "bar" + assert_equal(p1, True) + # grab object versioning and etag + for b_version in bucket.list_versions(): + b_version_id = b_version.version_id + b_version_etag = b_version.etag + for b_az_version in bucket_az.list_versions(): + b_az_version_id = b_az_version.version_id + b_az_version_etag = b_az_version.etag + # check + p2 = b_version_id == 'null' + assert_equal(p2, True) + p3 = b_az_version_id != 'null' + assert_equal(p3, True) + p4 = b_version_etag == b_az_version_etag + assert_equal(p4, True) + + +def test_az_lazy_activation_of_versioned_bucket(): + """ test lazy activation of versioned bucket """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create object on the non archive zone + bucket = zones[0].create_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # get bucket on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + # check for non bucket versioning + p1 = get_versioning_status(bucket) is None + assert_equal(p1, True) + p2 = get_versioning_status(bucket_az) is None + assert_equal(p2, True) + # create object on non archive zone + key = bucket.new_key("foo") + key.set_contents_from_string("bar") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check lazy versioned buckets + p3 = get_versioning_status(bucket) is None + assert_equal(p3, True) + p4 = get_versioning_status(bucket_az) == 'Enabled' + assert_equal(p4, True) + + +def test_az_archive_zone_double_object_replication_versioning(): + """ test archive zone double object replication versioning """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create object on the non archive zone + bucket = zones[0].create_bucket(bucket_name) + key = bucket.new_key("foo") + key.set_contents_from_string("bar") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # get bucket on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + # check for non bucket versioning + p1 = get_versioning_status(bucket) is None + assert_equal(p1, True) + p2 = get_versioning_status(bucket_az) == 'Enabled' + assert_equal(p2, True) + # overwrite object on non archive zone + key = bucket.new_key("foo") + key.set_contents_from_string("ouch") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check lazy versioned buckets + p3 = get_versioning_status(bucket) is None + assert_equal(p3, True) + p4 = get_versioning_status(bucket_az) == 'Enabled' + assert_equal(p4, True) + # get versioned objects + objs = get_versioned_objs(bucket) + objs_az = get_versioned_objs(bucket_az) + # check version_id, size, and is_latest on non archive zone + p5 = objs[0]['foo']['version_id'] == 'null' + assert_equal(p5, True) + p6 = objs[0]['foo']['size'] == 4 + assert_equal(p6, True) + p7 = objs[0]['foo']['is_latest'] == True + assert_equal(p7, True) + # check version_id, size, is_latest on archive zone + latest_obj_az_etag = None + for obj_az in objs_az: + current_obj_az = obj_az['foo'] + if current_obj_az['is_latest'] == True: + p8 = current_obj_az['size'] == 4 + assert_equal(p8, True) + latest_obj_az_etag = current_obj_az['etag'] + else: + p9 = current_obj_az['size'] == 3 + assert_equal(p9, True) + assert_not_equal(current_obj_az['version_id'], 'null') + # check last versions' etags + p10 = objs[0]['foo']['etag'] == latest_obj_az_etag + assert_equal(p10, True) + + +def test_az_deleted_object_replication(): + """ test zone deleted object replication """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create object on the non archive zone + bucket = zones[0].create_bucket(bucket_name) + key = bucket.new_key("foo") + key.set_contents_from_string("bar") + p1 = key.get_contents_as_string(encoding='ascii') == "bar" + assert_equal(p1, True) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # update object on non archive zone + key.set_contents_from_string("soup") + p2 = key.get_contents_as_string(encoding='ascii') == "soup" + assert_equal(p2, True) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # delete object on non archive zone + key.delete() + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check object on non archive zone + p3 = check_key_exists(key) == False + assert_equal(p3, True) + # check objects on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + key_az = bucket_az.get_key("foo") + p4 = check_key_exists(key_az) == True + assert_equal(p4, True) + p5 = key_az.get_contents_as_string(encoding='ascii') == "soup" + assert_equal(p5, True) + b_ver_az = get_versioned_objs(bucket_az) + p6 = len(b_ver_az) == 2 + assert_equal(p6, True) + + +def test_az_bucket_renaming_on_empty_bucket_deletion(): + """ test bucket renaming on empty bucket deletion """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # grab number of buckets on non archive zone + num_buckets = get_number_buckets_by_zone(zones[0]) + # grab number of buckets on archive zone + num_buckets_az = get_number_buckets_by_zone(az_zones[0]) + # create bucket on non archive zone + bucket = zones[0].create_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # delete bucket in non archive zone + zones[0].delete_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check no new buckets on non archive zone + p1 = get_number_buckets_by_zone(zones[0]) == num_buckets + assert_equal(p1, True) + # check non deletion on bucket on archive zone + p2 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1) + assert_equal(p2, True) + # check bucket renaming + bucket_names_az = get_bucket_names_by_zone(az_zones[0]) + new_bucket_name = bucket_name + '-deleted-' + p3 = any(bucket_name.startswith(new_bucket_name) for bucket_name in bucket_names_az) + assert_equal(p3, True) + + +def test_az_old_object_version_in_archive_zone(): + """ test old object version in archive zone """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # grab number of buckets on non archive zone + num_buckets = get_number_buckets_by_zone(zones[0]) + # grab number of buckets on archive zone + num_buckets_az = get_number_buckets_by_zone(az_zones[0]) + # create bucket on non archive zone + bucket = zones[0].create_bucket(bucket_name) + # create object on non archive zone + key = bucket.new_key("foo") + key.set_contents_from_string("zero") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # save object version on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + b_ver_az = get_versioned_objs(bucket_az) + obj_az_version_id = b_ver_az[0]['foo']['version_id'] + # update object on non archive zone + key.set_contents_from_string("one") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # delete object on non archive zone + key.delete() + # delete bucket on non archive zone + zones[0].delete_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check same buckets on non archive zone + p1 = get_number_buckets_by_zone(zones[0]) == num_buckets + assert_equal(p1, True) + # check for new bucket on archive zone + p2 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1) + assert_equal(p2, True) + # get new bucket name on archive zone + bucket_names_az = get_bucket_names_by_zone(az_zones[0]) + new_bucket_name_az = get_full_bucket_name(bucket_name + '-deleted-', bucket_names_az) + p3 = new_bucket_name_az is not None + assert_equal(p3, True) + # check number of objects on archive zone + new_bucket_az = az_zones[0].conn.get_bucket(new_bucket_name_az) + new_b_ver_az = get_versioned_objs(new_bucket_az) + p4 = len(new_b_ver_az) == 2 + assert_equal(p4, True) + # check versioned objects on archive zone + new_key_az = new_bucket_az.get_key("foo", version_id=obj_az_version_id) + p5 = new_key_az.get_contents_as_string(encoding='ascii') == "zero" + assert_equal(p5, True) + new_key_latest_az = new_bucket_az.get_key("foo") + p6 = new_key_latest_az.get_contents_as_string(encoding='ascii') == "one" + assert_equal(p6, True) + + +def test_az_force_bucket_renaming_if_same_bucket_name(): + """ test force bucket renaming if same bucket name """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # grab number of buckets on non archive zone + num_buckets = get_number_buckets_by_zone(zones[0]) + # grab number of buckets on archive zone + num_buckets_az = get_number_buckets_by_zone(az_zones[0]) + # create bucket on non archive zone + bucket = zones[0].create_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check same buckets on non archive zone + p1 = get_number_buckets_by_zone(zones[0]) == (num_buckets + 1) + assert_equal(p1, True) + # check for new bucket on archive zone + p2 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1) + assert_equal(p2, True) + # delete bucket on non archive zone + zones[0].delete_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check number of buckets on non archive zone + p3 = get_number_buckets_by_zone(zones[0]) == num_buckets + assert_equal(p3, True) + # check number of buckets on archive zone + p4 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 1) + assert_equal(p4, True) + # get new bucket name on archive zone + bucket_names_az = get_bucket_names_by_zone(az_zones[0]) + new_bucket_name_az = get_full_bucket_name(bucket_name + '-deleted-', bucket_names_az) + p5 = new_bucket_name_az is not None + assert_equal(p5, True) + # create bucket on non archive zone + _ = zones[0].create_bucket(new_bucket_name_az) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check number of buckets on non archive zone + p6 = get_number_buckets_by_zone(zones[0]) == (num_buckets + 1) + assert_equal(p6, True) + # check number of buckets on archive zone + p7 = get_number_buckets_by_zone(az_zones[0]) == (num_buckets_az + 2) + assert_equal(p7, True) + + +def test_az_versioning_support_in_zones(): + """ test versioning support on zones """ + zones, az_zones = init_env() + bucket_name = gen_bucket_name() + # create bucket on non archive zone + bucket = zones[0].create_bucket(bucket_name) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # get bucket on archive zone + bucket_az = az_zones[0].conn.get_bucket(bucket_name) + # check non versioned buckets + p1 = get_versioning_status(bucket) is None + assert_equal(p1, True) + p2 = get_versioning_status(bucket_az) is None + assert_equal(p2, True) + # create object on non archive zone + key = bucket.new_key("foo") + key.set_contents_from_string("zero") + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check bucket versioning + p3 = get_versioning_status(bucket) is None + assert_equal(p3, True) + p4 = get_versioning_status(bucket_az) == 'Enabled' + assert_equal(p4, True) + # enable bucket versioning on non archive zone + bucket.configure_versioning(True) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check bucket versioning + p5 = get_versioning_status(bucket) == 'Enabled' + assert_equal(p5, True) + p6 = get_versioning_status(bucket_az) == 'Enabled' + assert_equal(p6, True) + # delete object on non archive zone + key.delete() + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check delete-markers and versions on non archive zone + (b_dm, b_ver) = get_versioned_entries(bucket) + p7 = len(b_dm) == 1 + assert_equal(p7, True) + p8 = len(b_ver) == 1 + assert_equal(p8, True) + # check delete-markers and versions on archive zone + (b_dm_az, b_ver_az) = get_versioned_entries(bucket_az) + p9 = len(b_dm_az) == 1 + assert_equal(p9, True) + p10 = len(b_ver_az) == 1 + assert_equal(p10, True) + # delete delete-marker on non archive zone + dm_version_id = b_dm[0]['foo']['version_id'] + bucket.delete_key("foo", version_id=dm_version_id) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check delete-markers and versions on non archive zone + (b_dm, b_ver) = get_versioned_entries(bucket) + p11 = len(b_dm) == 0 + assert_equal(p11, True) + p12 = len(b_ver) == 1 + assert_equal(p12, True) + # check delete-markers and versions on archive zone + (b_dm_az, b_ver_az) = get_versioned_entries(bucket_az) + p13 = len(b_dm_az) == 1 + assert_equal(p13, True) + p14 = len(b_ver_az) == 1 + assert_equal(p14, True) + # delete delete-marker on archive zone + dm_az_version_id = b_dm_az[0]['foo']['version_id'] + bucket_az.delete_key("foo", version_id=dm_az_version_id) + # sync + zone_full_checkpoint(az_zones[0].zone, zones[0].zone) + # check delete-markers and versions on non archive zone + (b_dm, b_ver) = get_versioned_entries(bucket) + p15 = len(b_dm) == 0 + assert_equal(p15, True) + p16 = len(b_ver) == 1 + assert_equal(p16, True) + # check delete-markers and versions on archive zone + (b_dm_az, b_ver_az) = get_versioned_entries(bucket_az) + p17 = len(b_dm_az) == 0 + assert_equal(p17, True) + p17 = len(b_ver_az) == 1 + assert_equal(p17, True) + # check body in zones + obj_version_id = b_ver[0]['foo']['version_id'] + key = bucket.get_key("foo", version_id=obj_version_id) + p18 = key.get_contents_as_string(encoding='ascii') == "zero" + assert_equal(p18, True) + obj_az_version_id = b_ver_az[0]['foo']['version_id'] + key_az = bucket_az.get_key("foo", version_id=obj_az_version_id) + p19 = key_az.get_contents_as_string(encoding='ascii') == "zero" + assert_equal(p19, True) diff --git a/src/test/rgw/rgw_multi/tests_es.py b/src/test/rgw/rgw_multi/tests_es.py new file mode 100644 index 000000000..08c11718b --- /dev/null +++ b/src/test/rgw/rgw_multi/tests_es.py @@ -0,0 +1,276 @@ +import json +import logging + +import boto +import boto.s3.connection + +import datetime +import dateutil + +from itertools import zip_longest # type: ignore + +from nose.tools import eq_ as eq + +from .multisite import * +from .tests import * +from .zone_es import * + +log = logging.getLogger(__name__) + + +def check_es_configured(): + realm = get_realm() + zonegroup = realm.master_zonegroup() + + es_zones = zonegroup.zones_by_type.get("elasticsearch") + if not es_zones: + raise SkipTest("Requires at least one ES zone") + +def is_es_zone(zone_conn): + if not zone_conn: + return False + + return zone_conn.zone.tier_type() == "elasticsearch" + +def verify_search(bucket_name, src_keys, result_keys, f): + check_keys = [] + for k in src_keys: + if bucket_name: + if bucket_name != k.bucket.name: + continue + if f(k): + check_keys.append(k) + check_keys.sort(key = lambda l: (l.bucket.name, l.name, l.version_id)) + + log.debug('check keys:' + dump_json(check_keys)) + log.debug('result keys:' + dump_json(result_keys)) + + for k1, k2 in zip_longest(check_keys, result_keys): + assert k1 + assert k2 + check_object_eq(k1, k2) + +def do_check_mdsearch(conn, bucket, src_keys, req_str, src_filter): + if bucket: + bucket_name = bucket.name + else: + bucket_name = '' + req = MDSearch(conn, bucket_name, req_str) + result_keys = req.search(sort_key = lambda k: (k.bucket.name, k.name, k.version_id)) + verify_search(bucket_name, src_keys, result_keys, src_filter) + +def init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = None): + check_es_configured() + + realm = get_realm() + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns, buckets_per_zone = buckets_per_zone) + + if bucket_init_cb: + for zone_conn, bucket in zone_bucket: + bucket_init_cb(zone_conn, bucket) + + src_keys = [] + + owner = None + + obj_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) + + # don't wait for meta sync just yet + for zone, bucket in zone_bucket: + for count in range(num_keys): + objname = obj_prefix + str(count) + k = new_key(zone, bucket.name, objname) + # k.set_contents_from_string(content + 'x' * count) + if not create_obj: + continue + + create_obj(k, count) + + if not owner: + for list_key in bucket.list_versions(): + owner = list_key.owner + break + + k = bucket.get_key(k.name, version_id = k.version_id) + k.owner = owner # owner is not set when doing get_key() + + src_keys.append(k) + + zonegroup_meta_checkpoint(zonegroup) + + sources = [] + targets = [] + for target_conn in zonegroup_conns.zones: + if not is_es_zone(target_conn): + sources.append(target_conn) + continue + + targets.append(target_conn) + + buckets = [] + # make sure all targets are synced + for source_conn, bucket in zone_bucket: + buckets.append(bucket) + for target_conn in targets: + zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) + + return targets, sources, buckets, src_keys + +def test_es_object_search(): + min_size = 10 + content = 'a' * min_size + + def create_obj(k, i): + k.set_contents_from_string(content + 'x' * i) + + targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 2) + + for target_conn in targets: + + # bucket checks + for bucket in buckets: + # check name + do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'bucket == ' + bucket.name, lambda k: k.bucket.name == bucket.name) + + # check on all buckets + for key in src_keys: + # limiting to checking specific key name, otherwise could get results from + # other runs / tests + do_check_mdsearch(target_conn.conn, None, src_keys , 'name == ' + key.name, lambda k: k.name == key.name) + + # check on specific bucket + for bucket in buckets: + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name < ' + key.name, lambda k: k.name < key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name <= ' + key.name, lambda k: k.name <= key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + key.name, lambda k: k.name == key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name >= ' + key.name, lambda k: k.name >= key.name) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name > ' + key.name, lambda k: k.name > key.name) + + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'name == ' + src_keys[0].name + ' or name >= ' + src_keys[2].name, + lambda k: k.name == src_keys[0].name or k.name >= src_keys[2].name) + + # check etag + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag < ' + key.etag[1:-1], lambda k: k.etag < key.etag) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag == ' + key.etag[1:-1], lambda k: k.etag == key.etag) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'etag > ' + key.etag[1:-1], lambda k: k.etag > key.etag) + + # check size + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size < ' + str(key.size), lambda k: k.size < key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size <= ' + str(key.size), lambda k: k.size <= key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size == ' + str(key.size), lambda k: k.size == key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size >= ' + str(key.size), lambda k: k.size >= key.size) + for key in src_keys: + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'size > ' + str(key.size), lambda k: k.size > key.size) + +def date_from_str(s): + return dateutil.parser.parse(s) + +def test_es_object_search_custom(): + min_size = 10 + content = 'a' * min_size + + def bucket_init(zone_conn, bucket): + req = MDSearchConfig(zone_conn.conn, bucket.name) + req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date') + + def create_obj(k, i): + date = datetime.datetime.now() + datetime.timedelta(seconds=1) * i + date_str = date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + k.set_contents_from_string(content + 'x' * i, headers = { 'X-Amz-Meta-Foo-Str': str(i * 5), + 'X-Amz-Meta-Foo-Int': str(i * 5), + 'X-Amz-Meta-Foo-Date': date_str}) + + targets, _, buckets, src_keys = init_env(create_obj, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init) + + + for target_conn in targets: + + # bucket checks + for bucket in buckets: + str_vals = [] + for key in src_keys: + # check string values + val = key.get_metadata('foo-str') + str_vals.append(val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str < ' + val, lambda k: k.get_metadata('foo-str') < val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + val, lambda k: k.get_metadata('foo-str') <= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str == ' + val, lambda k: k.get_metadata('foo-str') == val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + val, lambda k: k.get_metadata('foo-str') >= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str > ' + val, lambda k: k.get_metadata('foo-str') > val) + + # check int values + sval = key.get_metadata('foo-int') + val = int(sval) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int < ' + sval, lambda k: int(k.get_metadata('foo-int')) < val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int <= ' + sval, lambda k: int(k.get_metadata('foo-int')) <= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int == ' + sval, lambda k: int(k.get_metadata('foo-int')) == val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int >= ' + sval, lambda k: int(k.get_metadata('foo-int')) >= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-int > ' + sval, lambda k: int(k.get_metadata('foo-int')) > val) + + # check int values + sval = key.get_metadata('foo-date') + val = date_from_str(sval) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date < ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) < val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date <= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) <= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date == ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) == val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date >= ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) >= val) + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-date > ' + sval, lambda k: date_from_str(k.get_metadata('foo-date')) > val) + + # 'or' query + for i in range(len(src_keys) // 2): + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str <= ' + str_vals[i] + ' or x-amz-meta-foo-str >= ' + str_vals[-i], + lambda k: k.get_metadata('foo-str') <= str_vals[i] or k.get_metadata('foo-str') >= str_vals[-i] ) + + # 'and' query + for i in range(len(src_keys) // 2): + do_check_mdsearch(target_conn.conn, bucket, src_keys , 'x-amz-meta-foo-str >= ' + str_vals[i] + ' and x-amz-meta-foo-str <= ' + str_vals[i + 1], + lambda k: k.get_metadata('foo-str') >= str_vals[i] and k.get_metadata('foo-str') <= str_vals[i + 1] ) + # more complicated query + for i in range(len(src_keys) // 2): + do_check_mdsearch(target_conn.conn, None, src_keys , 'bucket == ' + bucket.name + ' and x-amz-meta-foo-str >= ' + str_vals[i] + + ' and (x-amz-meta-foo-str <= ' + str_vals[i + 1] + ')', + lambda k: k.bucket.name == bucket.name and (k.get_metadata('foo-str') >= str_vals[i] and + k.get_metadata('foo-str') <= str_vals[i + 1]) ) + +def test_es_bucket_conf(): + min_size = 0 + + def bucket_init(zone_conn, bucket): + req = MDSearchConfig(zone_conn.conn, bucket.name) + req.set_config('x-amz-meta-foo-str; string, x-amz-meta-foo-int; int, x-amz-meta-foo-date; date') + + targets, sources, buckets, _ = init_env(None, num_keys = 5, buckets_per_zone = 1, bucket_init_cb = bucket_init) + + for source_conn in sources: + for bucket in buckets: + req = MDSearchConfig(source_conn.conn, bucket.name) + conf = req.get_config() + + d = {} + + for entry in conf: + d[entry['Key']] = entry['Type'] + + eq(len(d), 3) + eq(d['x-amz-meta-foo-str'], 'str') + eq(d['x-amz-meta-foo-int'], 'int') + eq(d['x-amz-meta-foo-date'], 'date') + + req.del_config() + + conf = req.get_config() + + eq(len(conf), 0) + + break # no need to iterate over all zones diff --git a/src/test/rgw/rgw_multi/tools.py b/src/test/rgw/rgw_multi/tools.py new file mode 100644 index 000000000..dd7f91ade --- /dev/null +++ b/src/test/rgw/rgw_multi/tools.py @@ -0,0 +1,97 @@ +import json +import boto + +def append_attr_value(d, attr, attrv): + if attrv and len(str(attrv)) > 0: + d[attr] = attrv + +def append_attr(d, k, attr): + try: + attrv = getattr(k, attr) + except: + return + append_attr_value(d, attr, attrv) + +def get_attrs(k, attrs): + d = {} + for a in attrs: + append_attr(d, k, a) + + return d + +def append_query_arg(s, n, v): + if not v: + return s + nv = '{n}={v}'.format(n=n, v=v) + if not s: + return nv + return '{s}&{nv}'.format(s=s, nv=nv) + +class KeyJSONEncoder(boto.s3.key.Key): + @staticmethod + def default(k, versioned=False): + attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control', + 'content_type', 'content_disposition', 'content_language', + 'owner', 'storage_class', 'md5', 'version_id', 'encrypted', + 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag'] + d = get_attrs(k, attrs) + d['etag'] = k.etag[1:-1] + if versioned: + d['is_latest'] = k.is_latest + return d + +class DeleteMarkerJSONEncoder(boto.s3.key.Key): + @staticmethod + def default(k): + attrs = ['name', 'version_id', 'last_modified', 'owner'] + d = get_attrs(k, attrs) + d['delete_marker'] = True + d['is_latest'] = k.is_latest + return d + +class UserJSONEncoder(boto.s3.user.User): + @staticmethod + def default(k): + attrs = ['id', 'display_name'] + return get_attrs(k, attrs) + +class BucketJSONEncoder(boto.s3.bucket.Bucket): + @staticmethod + def default(k): + attrs = ['name', 'creation_date'] + return get_attrs(k, attrs) + +class BotoJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, boto.s3.key.Key): + return KeyJSONEncoder.default(obj) + if isinstance(obj, boto.s3.deletemarker.DeleteMarker): + return DeleteMarkerJSONEncoder.default(obj) + if isinstance(obj, boto.s3.user.User): + return UserJSONEncoder.default(obj) + if isinstance(obj, boto.s3.prefix.Prefix): + return (lambda x: {'prefix': x.name})(obj) + if isinstance(obj, boto.s3.bucket.Bucket): + return BucketJSONEncoder.default(obj) + return json.JSONEncoder.default(self, obj) + + +def dump_json(o, cls=BotoJSONEncoder): + return json.dumps(o, cls=cls, indent=4) + +def assert_raises(excClass, callableObj, *args, **kwargs): + """ + Like unittest.TestCase.assertRaises, but returns the exception. + """ + try: + callableObj(*args, **kwargs) + except excClass as e: + return e + else: + if hasattr(excClass, '__name__'): + excName = excClass.__name__ + else: + excName = str(excClass) + raise AssertionError("%s not raised" % excName) + + diff --git a/src/test/rgw/rgw_multi/zone_az.py b/src/test/rgw/rgw_multi/zone_az.py new file mode 100644 index 000000000..f9cd43574 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_az.py @@ -0,0 +1,42 @@ +import logging + +from .multisite import Zone + + +log = logging.getLogger('rgw_multi.tests') + + +class AZone(Zone): # pylint: disable=too-many-ancestors + """ archive zone class """ + def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None): + super(AZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + def is_read_only(self): + return False + + def tier_type(self): + return "archive" + + def create(self, cluster, args=None, **kwargs): + if args is None: + args = '' + args += ['--tier-type', self.tier_type()] + return self.json_command(cluster, 'create', args) + + def has_buckets(self): + return False + + def has_roles(self): + return True + +class AZoneConfig: + """ archive zone configuration """ + def __init__(self, cfg, section): + pass + + +def print_connection_info(conn): + """print info of connection""" + print("Host: " + conn.host+':'+str(conn.port)) + print("AWS Secret Key: " + conn.aws_secret_access_key) + print("AWS Access Key: " + conn.aws_access_key_id) diff --git a/src/test/rgw/rgw_multi/zone_cloud.py b/src/test/rgw/rgw_multi/zone_cloud.py new file mode 100644 index 000000000..dd5640cf2 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_cloud.py @@ -0,0 +1,326 @@ +import json +import requests.compat +import logging + +import boto +import boto.s3.connection + +import dateutil.parser +import datetime + +import re + +from nose.tools import eq_ as eq +from itertools import zip_longest # type: ignore +from urllib.parse import urlparse + +from .multisite import * +from .tools import * + +log = logging.getLogger(__name__) + +def get_key_ver(k): + if not k.version_id: + return 'null' + return k.version_id + +def unquote(s): + if s[0] == '"' and s[-1] == '"': + return s[1:-1] + return s + +def check_object_eq(k1, k2, check_extra = True): + assert k1 + assert k2 + log.debug('comparing key name=%s', k1.name) + eq(k1.name, k2.name) + eq(k1.metadata, k2.metadata) + # eq(k1.cache_control, k2.cache_control) + eq(k1.content_type, k2.content_type) + eq(k1.content_encoding, k2.content_encoding) + eq(k1.content_disposition, k2.content_disposition) + eq(k1.content_language, k2.content_language) + + eq(unquote(k1.etag), unquote(k2.etag)) + + mtime1 = dateutil.parser.parse(k1.last_modified) + mtime2 = dateutil.parser.parse(k2.last_modified) + log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified) + assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution + # if check_extra: + # eq(k1.owner.id, k2.owner.id) + # eq(k1.owner.display_name, k2.owner.display_name) + # eq(k1.storage_class, k2.storage_class) + eq(k1.size, k2.size) + eq(get_key_ver(k1), get_key_ver(k2)) + # eq(k1.encrypted, k2.encrypted) + +def make_request(conn, method, bucket, key, query_args, headers): + result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers) + if result.status // 100 != 2: + raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) + return result + +class CloudKey: + def __init__(self, zone_bucket, k): + self.zone_bucket = zone_bucket + + # we need two keys: when listing buckets, we get keys that only contain partial data + # but we need to have the full data so that we could use all the meta-rgwx- headers + # that are needed in order to create a correct representation of the object + self.key = k + self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update() + self.update() + + def update(self): + k = self.key + rk = self.rgwx_key + + self.size = rk.size + orig_name = rk.metadata.get('rgwx-source-key') + if not orig_name: + self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id) + rk = self.rgwx_key + orig_name = rk.metadata.get('rgwx-source-key') + + self.name = orig_name + self.version_id = rk.metadata.get('rgwx-source-version-id') + + ve = rk.metadata.get('rgwx-versioned-epoch') + if ve: + self.versioned_epoch = int(ve) + else: + self.versioned_epoch = 0 + + mt = rk.metadata.get('rgwx-source-mtime') + if mt: + self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT') + else: + self.last_modified = k.last_modified + + et = rk.metadata.get('rgwx-source-etag') + if rk.etag.find('-') >= 0 or et.find('-') >= 0: + # in this case we will use the source etag as it was uploaded via multipart upload + # in one of the zones, so there's no way to make sure etags are calculated the same + # way. In the other case we'd just want to keep the etag that was generated in the + # regular upload mechanism, which should be consistent in both ends + self.etag = et + else: + self.etag = rk.etag + + if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get + self.etag = '"' + self.etag + '"' + + new_meta = {} + for meta_key, meta_val in k.metadata.items(): + if not meta_key.startswith('rgwx-'): + new_meta[meta_key] = meta_val + + self.metadata = new_meta + + self.cache_control = k.cache_control + self.content_type = k.content_type + self.content_encoding = k.content_encoding + self.content_disposition = k.content_disposition + self.content_language = k.content_language + + + def get_contents_as_string(self, encoding=None): + r = self.key.get_contents_as_string(encoding=encoding) + + # the previous call changed the status of the source object, as it loaded + # its metadata + + self.rgwx_key = self.key + self.update() + + return r + + +class CloudZoneBucket: + def __init__(self, zone_conn, target_path, name): + self.zone_conn = zone_conn + self.name = name + self.cloud_conn = zone_conn.zone.cloud_conn + + target_path = target_path[:] + if target_path[-1] != '/': + target_path += '/' + target_path = target_path.replace('${bucket}', name) + + tp = target_path.split('/', 1) + + if len(tp) == 1: + self.target_bucket = target_path + self.target_prefix = '' + else: + self.target_bucket = tp[0] + self.target_prefix = tp[1] + + log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix) + self.bucket = self.cloud_conn.get_bucket(self.target_bucket) + + def get_all_versions(self): + l = [] + + for k in self.bucket.get_all_keys(prefix=self.target_prefix): + new_key = CloudKey(self, k) + + log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch) + l.append(new_key) + + + sort_key = lambda k: (k.name, -k.versioned_epoch) + l.sort(key = sort_key) + + for new_key in l: + yield new_key + + def get_key(self, name, version_id=None): + return CloudKey(self, self.bucket.get_key(name, version_id=version_id)) + + +def parse_endpoint(endpoint): + o = urlparse(endpoint) + + netloc = o.netloc.split(':') + + host = netloc[0] + + if len(netloc) > 1: + port = int(netloc[1]) + else: + port = o.port + + is_secure = False + + if o.scheme == 'https': + is_secure = True + + if not port: + if is_secure: + port = 443 + else: + port = 80 + + return host, port, is_secure + + +class CloudZone(Zone): + def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path, + zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + self.cloud_endpoint = cloud_endpoint + self.credentials = credentials + self.source_bucket = source_bucket + self.target_path = target_path + + self.target_path = self.target_path.replace('${zone}', name) + # self.target_path = self.target_path.replace('${zone_id}', zone_id) + self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name) + self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id) + + log.debug('target_path=%s', self.target_path) + + host, port, is_secure = parse_endpoint(cloud_endpoint) + + self.cloud_conn = boto.connect_s3( + aws_access_key_id = credentials.access_key, + aws_secret_access_key = credentials.secret, + host = host, + port = port, + is_secure = is_secure, + calling_format = boto.s3.connection.OrdinaryCallingFormat()) + super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + + def is_read_only(self): + return True + + def tier_type(self): + return "cloud" + + def create(self, cluster, args = None, check_retcode = True): + """ create the object with the given arguments """ + + if args is None: + args = '' + + tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint, + 'connection.access_key=' + self.credentials.access_key, + 'connection.secret=' + self.credentials.secret, + 'target_path=' + re.escape(self.target_path)]) + + args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] + + return self.json_command(cluster, 'create', args, check_retcode=check_retcode) + + def has_buckets(self): + return False + + def has_roles(self): + return False + + class Conn(ZoneConn): + def __init__(self, zone, credentials): + super(CloudZone.Conn, self).__init__(zone, credentials) + + def get_bucket(self, bucket_name): + return CloudZoneBucket(self, self.zone.target_path, bucket_name) + + def create_bucket(self, name): + # should not be here, a bug in the test suite + log.critical('Conn.create_bucket() should not be called in cloud zone') + assert False + + def check_bucket_eq(self, zone_conn, bucket_name): + assert(zone_conn.zone.tier_type() == "rados") + + log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name) + b1 = self.get_bucket(bucket_name) + b2 = zone_conn.get_bucket(bucket_name) + + log.debug('bucket1 objects:') + for o in b1.get_all_versions(): + log.debug('o=%s', o.name) + log.debug('bucket2 objects:') + for o in b2.get_all_versions(): + log.debug('o=%s', o.name) + + for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): + if k1 is None: + log.critical('key=%s is missing from zone=%s', k2.name, self.name) + assert False + if k2 is None: + log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) + assert False + + check_object_eq(k1, k2) + + + log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + + return True + + def create_role(self, path, rolename, policy_document, tag_list): + assert False + + def get_conn(self, credentials): + return self.Conn(self, credentials) + + +class CloudZoneConfig: + def __init__(self, cfg, section): + self.endpoint = cfg.get(section, 'endpoint') + access_key = cfg.get(section, 'access_key') + secret = cfg.get(section, 'secret') + self.credentials = Credentials(access_key, secret) + try: + self.target_path = cfg.get(section, 'target_path') + except: + self.target_path = 'rgw-${zonegroup_id}/${bucket}' + + try: + self.source_bucket = cfg.get(section, 'source_bucket') + except: + self.source_bucket = '*' + diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py new file mode 100644 index 000000000..e98b3fdd8 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_es.py @@ -0,0 +1,256 @@ +import json +import requests.compat +import logging + +import boto +import boto.s3.connection + +import dateutil.parser + +from nose.tools import eq_ as eq +from itertools import zip_longest # type: ignore + +from .multisite import * +from .tools import * + +log = logging.getLogger(__name__) + +def get_key_ver(k): + if not k.version_id: + return 'null' + return k.version_id + +def check_object_eq(k1, k2, check_extra = True): + assert k1 + assert k2 + log.debug('comparing key name=%s', k1.name) + eq(k1.name, k2.name) + eq(k1.metadata, k2.metadata) + # eq(k1.cache_control, k2.cache_control) + eq(k1.content_type, k2.content_type) + # eq(k1.content_encoding, k2.content_encoding) + # eq(k1.content_disposition, k2.content_disposition) + # eq(k1.content_language, k2.content_language) + eq(k1.etag, k2.etag) + mtime1 = dateutil.parser.parse(k1.last_modified) + mtime2 = dateutil.parser.parse(k2.last_modified) + assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution + if check_extra: + eq(k1.owner.id, k2.owner.id) + eq(k1.owner.display_name, k2.owner.display_name) + # eq(k1.storage_class, k2.storage_class) + eq(k1.size, k2.size) + eq(get_key_ver(k1), get_key_ver(k2)) + # eq(k1.encrypted, k2.encrypted) + +def make_request(conn, method, bucket, key, query_args, headers): + result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers) + if result.status // 100 != 2: + raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) + return result + + +class MDSearch: + def __init__(self, conn, bucket_name, query, query_args = None, marker = None): + self.conn = conn + self.bucket_name = bucket_name or '' + if bucket_name: + self.bucket = boto.s3.bucket.Bucket(name=bucket_name) + else: + self.bucket = None + self.query = query + self.query_args = query_args + self.max_keys = None + self.marker = marker + + def raw_search(self): + q = self.query or '' + query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q)) + if self.max_keys is not None: + query_args = append_query_arg(query_args, 'max-keys', self.max_keys) + if self.marker: + query_args = append_query_arg(query_args, 'marker', self.marker) + + query_args = append_query_arg(query_args, 'format', 'json') + + headers = {} + + result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers) + + l = [] + + result_dict = json.loads(result.read()) + + for entry in result_dict['Objects']: + bucket = self.conn.get_bucket(entry['Bucket'], validate = False) + k = boto.s3.key.Key(bucket, entry['Key']) + + k.version_id = entry['Instance'] + k.etag = entry['ETag'] + k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName']) + k.last_modified = entry['LastModified'] + k.size = entry['Size'] + k.content_type = entry['ContentType'] + k.versioned_epoch = entry['VersionedEpoch'] + + k.metadata = {} + for e in entry['CustomMetadata']: + k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response + + l.append(k) + + return result_dict, l + + def search(self, drain = True, sort = True, sort_key = None): + l = [] + + is_done = False + + while not is_done: + result, result_keys = self.raw_search() + + l = l + result_keys + + is_done = not (drain and (result['IsTruncated'] == "true")) + marker = result['Marker'] + + if sort: + if not sort_key: + sort_key = lambda k: (k.name, -k.versioned_epoch) + l.sort(key = sort_key) + + return l + + +class MDSearchConfig: + def __init__(self, conn, bucket_name): + self.conn = conn + self.bucket_name = bucket_name or '' + if bucket_name: + self.bucket = boto.s3.bucket.Bucket(name=bucket_name) + else: + self.bucket = None + + def send_request(self, conf, method): + query_args = 'mdsearch' + headers = None + if conf: + headers = { 'X-Amz-Meta-Search': conf } + + query_args = append_query_arg(query_args, 'format', 'json') + + return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers) + + def get_config(self): + result = self.send_request(None, 'GET') + return json.loads(result.read()) + + def set_config(self, conf): + self.send_request(conf, 'POST') + + def del_config(self): + self.send_request(None, 'DELETE') + + +class ESZoneBucket: + def __init__(self, zone_conn, name, conn): + self.zone_conn = zone_conn + self.name = name + self.conn = conn + + self.bucket = boto.s3.bucket.Bucket(name=name) + + def get_all_versions(self): + + marker = None + is_done = False + + req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker) + + for k in req.search(): + yield k + + + + +class ESZone(Zone): + def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + self.es_endpoint = es_endpoint + super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + def is_read_only(self): + return True + + def tier_type(self): + return "elasticsearch" + + def create(self, cluster, args = None, check_retcode = True): + """ create the object with the given arguments """ + + if args is None: + args = '' + + tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ]) + + args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] + + return self.json_command(cluster, 'create', args, check_retcode=check_retcode) + + def has_buckets(self): + return False + + def has_roles(self): + return False + + class Conn(ZoneConn): + def __init__(self, zone, credentials): + super(ESZone.Conn, self).__init__(zone, credentials) + + def get_bucket(self, bucket_name): + return ESZoneBucket(self, bucket_name, self.conn) + + def create_bucket(self, name): + # should not be here, a bug in the test suite + log.critical('Conn.create_bucket() should not be called in ES zone') + assert False + + def check_bucket_eq(self, zone_conn, bucket_name): + assert(zone_conn.zone.tier_type() == "rados") + + log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name) + b1 = self.get_bucket(bucket_name) + b2 = zone_conn.get_bucket(bucket_name) + + log.debug('bucket1 objects:') + for o in b1.get_all_versions(): + log.debug('o=%s', o.name) + log.debug('bucket2 objects:') + for o in b2.get_all_versions(): + log.debug('o=%s', o.name) + + for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): + if k1 is None: + log.critical('key=%s is missing from zone=%s', k2.name, self.name) + assert False + if k2 is None: + log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) + assert False + + check_object_eq(k1, k2) + + + log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + + return True + + def create_role(self, path, rolename, policy_document, tag_list): + assert False + + def get_conn(self, credentials): + return self.Conn(self, credentials) + + +class ESZoneConfig: + def __init__(self, cfg, section): + self.endpoint = cfg.get(section, 'endpoint') + diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py new file mode 100644 index 000000000..ac4edd004 --- /dev/null +++ b/src/test/rgw/rgw_multi/zone_rados.py @@ -0,0 +1,134 @@ +import logging +from boto.s3.deletemarker import DeleteMarker + +from itertools import zip_longest # type: ignore + +from nose.tools import eq_ as eq + +from .multisite import * + +log = logging.getLogger(__name__) + +def check_object_eq(k1, k2, check_extra = True): + assert k1 + assert k2 + log.debug('comparing key name=%s', k1.name) + eq(k1.name, k2.name) + eq(k1.version_id, k2.version_id) + eq(k1.is_latest, k2.is_latest) + eq(k1.last_modified, k2.last_modified) + if isinstance(k1, DeleteMarker): + assert isinstance(k2, DeleteMarker) + return + + eq(k1.get_contents_as_string(), k2.get_contents_as_string()) + eq(k1.metadata, k2.metadata) + eq(k1.cache_control, k2.cache_control) + eq(k1.content_type, k2.content_type) + eq(k1.content_encoding, k2.content_encoding) + eq(k1.content_disposition, k2.content_disposition) + eq(k1.content_language, k2.content_language) + eq(k1.etag, k2.etag) + if check_extra: + eq(k1.owner.id, k2.owner.id) + eq(k1.owner.display_name, k2.owner.display_name) + eq(k1.storage_class, k2.storage_class) + eq(k1.size, k2.size) + eq(k1.encrypted, k2.encrypted) + +class RadosZone(Zone): + def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): + super(RadosZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) + + def tier_type(self): + return "rados" + + + class Conn(ZoneConn): + def __init__(self, zone, credentials): + super(RadosZone.Conn, self).__init__(zone, credentials) + + def get_bucket(self, name): + return self.conn.get_bucket(name) + + def create_bucket(self, name): + return self.conn.create_bucket(name) + + def delete_bucket(self, name): + return self.conn.delete_bucket(name) + + def check_bucket_eq(self, zone_conn, bucket_name): + log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + b1 = self.get_bucket(bucket_name) + b2 = zone_conn.get_bucket(bucket_name) + + b1_versions = b1.list_versions() + log.debug('bucket1 objects:') + for o in b1_versions: + log.debug('o=%s', o.name) + + b2_versions = b2.list_versions() + log.debug('bucket2 objects:') + for o in b2_versions: + log.debug('o=%s', o.name) + + for k1, k2 in zip_longest(b1_versions, b2_versions): + if k1 is None: + log.critical('key=%s is missing from zone=%s', k2.name, self.name) + assert False + if k2 is None: + log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) + assert False + + check_object_eq(k1, k2) + + if isinstance(k1, DeleteMarker): + # verify that HEAD sees a delete marker + assert b1.get_key(k1.name) is None + assert b2.get_key(k2.name) is None + else: + # now get the keys through a HEAD operation, verify that the available data is the same + k1_head = b1.get_key(k1.name, version_id=k1.version_id) + k2_head = b2.get_key(k2.name, version_id=k2.version_id) + check_object_eq(k1_head, k2_head, False) + + if k1.version_id: + # compare the olh to make sure they agree about the current version + k1_olh = b1.get_key(k1.name) + k2_olh = b2.get_key(k2.name) + # if there's a delete marker, HEAD will return None + if k1_olh or k2_olh: + check_object_eq(k1_olh, k2_olh, False) + + log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) + + return True + + def get_role(self, role_name): + return self.iam_conn.get_role(role_name) + + def check_role_eq(self, zone_conn, role_name): + log.info('comparing role=%s zones={%s, %s}', role_name, self.name, zone_conn.name) + r1 = self.get_role(role_name) + r2 = zone_conn.get_role(role_name) + + assert r1 + assert r2 + log.debug('comparing role name=%s', r1['get_role_response']['get_role_result']['role']['role_name']) + eq(r1['get_role_response']['get_role_result']['role']['role_name'], r2['get_role_response']['get_role_result']['role']['role_name']) + eq(r1['get_role_response']['get_role_result']['role']['role_id'], r2['get_role_response']['get_role_result']['role']['role_id']) + eq(r1['get_role_response']['get_role_result']['role']['path'], r2['get_role_response']['get_role_result']['role']['path']) + eq(r1['get_role_response']['get_role_result']['role']['arn'], r2['get_role_response']['get_role_result']['role']['arn']) + eq(r1['get_role_response']['get_role_result']['role']['max_session_duration'], r2['get_role_response']['get_role_result']['role']['max_session_duration']) + eq(r1['get_role_response']['get_role_result']['role']['assume_role_policy_document'], r2['get_role_response']['get_role_result']['role']['assume_role_policy_document']) + + log.info('success, role identical: role=%s zones={%s, %s}', role_name, self.name, zone_conn.name) + + return True + + def create_role(self, path, rolename, policy_document, tag_list): + return self.iam_conn.create_role(rolename, policy_document, path) + + def get_conn(self, credentials): + return self.Conn(self, credentials) + |