diff options
Diffstat (limited to 'src/test/pybind/test_rados.py')
-rw-r--r-- | src/test/pybind/test_rados.py | 1114 |
1 files changed, 1114 insertions, 0 deletions
diff --git a/src/test/pybind/test_rados.py b/src/test/pybind/test_rados.py new file mode 100644 index 00000000..c4ed1d24 --- /dev/null +++ b/src/test/pybind/test_rados.py @@ -0,0 +1,1114 @@ +from __future__ import print_function +from nose import SkipTest +from nose.tools import eq_ as eq, ok_ as ok, assert_raises +from rados import (Rados, Error, RadosStateError, Object, ObjectExists, + ObjectNotFound, ObjectBusy, requires, opt, + LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, + LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog) +import time +import threading +import json +import errno +import os +import re +import sys + +# Are we running Python 2.x +_python2 = sys.version_info[0] < 3 + +def test_rados_init_error(): + assert_raises(Error, Rados, conffile='', rados_id='admin', + name='client.admin') + assert_raises(Error, Rados, conffile='', name='invalid') + assert_raises(Error, Rados, conffile='', name='bad.invalid') + +def test_rados_init(): + with Rados(conffile='', rados_id='admin'): + pass + with Rados(conffile='', name='client.admin'): + pass + with Rados(conffile='', name='client.admin'): + pass + with Rados(conffile='', name='client.admin'): + pass + +def test_ioctx_context_manager(): + with Rados(conffile='', rados_id='admin') as conn: + with conn.open_ioctx('rbd') as ioctx: + pass + +def test_parse_argv(): + args = ['osd', 'pool', 'delete', 'foobar', 'foobar', '--yes-i-really-really-mean-it'] + r = Rados() + eq(args, r.conf_parse_argv(args)) + +def test_parse_argv_empty_str(): + args = [''] + r = Rados() + eq(args, r.conf_parse_argv(args)) + +class TestRequires(object): + @requires(('foo', str), ('bar', int), ('baz', int)) + def _method_plain(self, foo, bar, baz): + ok(isinstance(foo, str)) + ok(isinstance(bar, int)) + ok(isinstance(baz, int)) + return (foo, bar, baz) + + def test_method_plain(self): + assert_raises(TypeError, self._method_plain, 42, 42, 42) + assert_raises(TypeError, self._method_plain, '42', '42', '42') + assert_raises(TypeError, self._method_plain, foo='42', bar='42', baz='42') + eq(self._method_plain('42', 42, 42), ('42', 42, 42)) + eq(self._method_plain(foo='42', bar=42, baz=42), ('42', 42, 42)) + + @requires(('opt_foo', opt(str)), ('opt_bar', opt(int)), ('baz', int)) + def _method_with_opt_arg(self, foo, bar, baz): + ok(isinstance(foo, str) or foo is None) + ok(isinstance(bar, int) or bar is None) + ok(isinstance(baz, int)) + return (foo, bar, baz) + + def test_method_with_opt_args(self): + assert_raises(TypeError, self._method_with_opt_arg, 42, 42, 42) + assert_raises(TypeError, self._method_with_opt_arg, '42', '42', 42) + assert_raises(TypeError, self._method_with_opt_arg, None, None, None) + eq(self._method_with_opt_arg(None, 42, 42), (None, 42, 42)) + eq(self._method_with_opt_arg('42', None, 42), ('42', None, 42)) + eq(self._method_with_opt_arg(None, None, 42), (None, None, 42)) + + +class TestRadosStateError(object): + def _requires_configuring(self, rados): + assert_raises(RadosStateError, rados.connect) + + def _requires_configuring_or_connected(self, rados): + assert_raises(RadosStateError, rados.conf_read_file) + assert_raises(RadosStateError, rados.conf_parse_argv, None) + assert_raises(RadosStateError, rados.conf_parse_env) + assert_raises(RadosStateError, rados.conf_get, 'opt') + assert_raises(RadosStateError, rados.conf_set, 'opt', 'val') + assert_raises(RadosStateError, rados.ping_monitor, 0) + + def _requires_connected(self, rados): + assert_raises(RadosStateError, rados.pool_exists, 'foo') + assert_raises(RadosStateError, rados.pool_lookup, 'foo') + assert_raises(RadosStateError, rados.pool_reverse_lookup, 0) + assert_raises(RadosStateError, rados.create_pool, 'foo') + assert_raises(RadosStateError, rados.get_pool_base_tier, 0) + assert_raises(RadosStateError, rados.delete_pool, 'foo') + assert_raises(RadosStateError, rados.list_pools) + assert_raises(RadosStateError, rados.get_fsid) + assert_raises(RadosStateError, rados.open_ioctx, 'foo') + assert_raises(RadosStateError, rados.mon_command, '', b'') + assert_raises(RadosStateError, rados.osd_command, 0, '', b'') + assert_raises(RadosStateError, rados.pg_command, '', '', b'') + assert_raises(RadosStateError, rados.wait_for_latest_osdmap) + assert_raises(RadosStateError, rados.blacklist_add, '127.0.0.1/123', 0) + + def test_configuring(self): + rados = Rados(conffile='') + eq('configuring', rados.state) + self._requires_connected(rados) + + def test_connected(self): + rados = Rados(conffile='') + with rados: + eq('connected', rados.state) + self._requires_configuring(rados) + + def test_shutdown(self): + rados = Rados(conffile='') + with rados: + pass + eq('shutdown', rados.state) + self._requires_configuring(rados) + self._requires_configuring_or_connected(rados) + self._requires_connected(rados) + + +class TestRados(object): + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.conf_parse_env('FOO_DOES_NOT_EXIST_BLAHBLAH') + self.rados.conf_parse_env() + self.rados.connect() + + # Assume any pre-existing pools are the cluster's defaults + self.default_pools = self.rados.list_pools() + + def tearDown(self): + self.rados.shutdown() + + def test_ping_monitor(self): + assert_raises(ObjectNotFound, self.rados.ping_monitor, 'not_exists_monitor') + cmd = {'prefix': 'mon dump', 'format':'json'} + ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'') + for mon in json.loads(buf.decode('utf8'))['mons']: + while True: + output = self.rados.ping_monitor(mon['name']) + if output is None: + continue + buf = json.loads(output) + if buf.get('health'): + break + + def test_create(self): + self.rados.create_pool('foo') + self.rados.delete_pool('foo') + + def test_create_utf8(self): + if _python2: + # Use encoded bytestring + poolname = b"\351\273\204" + else: + poolname = "\u9ec4" + self.rados.create_pool(poolname) + assert self.rados.pool_exists(u"\u9ec4") + self.rados.delete_pool(poolname) + + def test_pool_lookup_utf8(self): + if _python2: + poolname = u'\u9ec4' + else: + poolname = '\u9ec4' + self.rados.create_pool(poolname) + try: + poolid = self.rados.pool_lookup(poolname) + eq(poolname, self.rados.pool_reverse_lookup(poolid)) + finally: + self.rados.delete_pool(poolname) + + def test_eexist(self): + self.rados.create_pool('foo') + assert_raises(ObjectExists, self.rados.create_pool, 'foo') + self.rados.delete_pool('foo') + + def list_non_default_pools(self): + pools = self.rados.list_pools() + for p in self.default_pools: + pools.remove(p) + return set(pools) + + def test_list_pools(self): + eq(set(), self.list_non_default_pools()) + self.rados.create_pool('foo') + eq(set(['foo']), self.list_non_default_pools()) + self.rados.create_pool('bar') + eq(set(['foo', 'bar']), self.list_non_default_pools()) + self.rados.create_pool('baz') + eq(set(['foo', 'bar', 'baz']), self.list_non_default_pools()) + self.rados.delete_pool('foo') + eq(set(['bar', 'baz']), self.list_non_default_pools()) + self.rados.delete_pool('baz') + eq(set(['bar']), self.list_non_default_pools()) + self.rados.delete_pool('bar') + eq(set(), self.list_non_default_pools()) + self.rados.create_pool('a' * 500) + eq(set(['a' * 500]), self.list_non_default_pools()) + self.rados.delete_pool('a' * 500) + + def test_get_pool_base_tier(self): + self.rados.create_pool('foo') + try: + self.rados.create_pool('foo-cache') + try: + pool_id = self.rados.pool_lookup('foo') + tier_pool_id = self.rados.pool_lookup('foo-cache') + + cmd = {"prefix":"osd tier add", "pool":"foo", "tierpool":"foo-cache", "force_nonempty":""} + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0) + + try: + cmd = {"prefix":"osd tier cache-mode", "pool":"foo-cache", "tierpool":"foo-cache", "mode":"readonly", "yes_i_really_mean_it": True} + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0) + + eq(self.rados.wait_for_latest_osdmap(), 0) + + eq(pool_id, self.rados.get_pool_base_tier(pool_id)) + eq(pool_id, self.rados.get_pool_base_tier(tier_pool_id)) + finally: + cmd = {"prefix":"osd tier remove", "pool":"foo", "tierpool":"foo-cache"} + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0) + finally: + self.rados.delete_pool('foo-cache') + finally: + self.rados.delete_pool('foo') + + def test_get_fsid(self): + fsid = self.rados.get_fsid() + assert re.match('[0-9a-f\-]{36}', fsid, re.I) + + def test_blacklist_add(self): + self.rados.blacklist_add("1.2.3.4/123", 1) + + def test_get_cluster_stats(self): + stats = self.rados.get_cluster_stats() + assert stats['kb'] > 0 + assert stats['kb_avail'] > 0 + assert stats['kb_used'] > 0 + assert stats['num_objects'] >= 0 + + def test_monitor_log(self): + lock = threading.Condition() + def cb(arg, line, who, sec, nsec, seq, level, msg): + # NOTE(sileht): the old pyrados API was received the pointer as int + # instead of the value of arg + eq(arg, "arg") + with lock: + lock.notify() + return 0 + + # NOTE(sileht): force don't save the monitor into local var + # to ensure all references are correctly tracked into the lib + MonitorLog(self.rados, "debug", cb, "arg") + with lock: + lock.wait() + MonitorLog(self.rados, "debug", None, None) + eq(None, self.rados.monitor_callback) + +class TestIoctx(object): + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + self.rados.create_pool('test_pool') + assert self.rados.pool_exists('test_pool') + self.ioctx = self.rados.open_ioctx('test_pool') + + def tearDown(self): + cmd = {"prefix":"osd unset", "key":"noup"} + self.rados.mon_command(json.dumps(cmd), b'') + self.ioctx.close() + self.rados.delete_pool('test_pool') + self.rados.shutdown() + + def test_get_last_version(self): + version = self.ioctx.get_last_version() + assert version >= 0 + + def test_get_stats(self): + stats = self.ioctx.get_stats() + eq(stats, {'num_objects_unfound': 0, + 'num_objects_missing_on_primary': 0, + 'num_object_clones': 0, + 'num_objects': 0, + 'num_object_copies': 0, + 'num_bytes': 0, + 'num_rd_kb': 0, + 'num_wr_kb': 0, + 'num_kb': 0, + 'num_wr': 0, + 'num_objects_degraded': 0, + 'num_rd': 0}) + + def test_write(self): + self.ioctx.write('abc', b'abc') + eq(self.ioctx.read('abc'), b'abc') + + def test_write_full(self): + self.ioctx.write('abc', b'abc') + eq(self.ioctx.read('abc'), b'abc') + self.ioctx.write_full('abc', b'd') + eq(self.ioctx.read('abc'), b'd') + + def test_append(self): + self.ioctx.write('abc', b'a') + self.ioctx.append('abc', b'b') + self.ioctx.append('abc', b'c') + eq(self.ioctx.read('abc'), b'abc') + + def test_write_zeros(self): + self.ioctx.write('abc', b'a\0b\0c') + eq(self.ioctx.read('abc'), b'a\0b\0c') + + def test_trunc(self): + self.ioctx.write('abc', b'abc') + self.ioctx.trunc('abc', 2) + eq(self.ioctx.read('abc'), b'ab') + size = self.ioctx.stat('abc')[0] + eq(size, 2) + + def test_list_objects_empty(self): + eq(list(self.ioctx.list_objects()), []) + + def test_list_objects(self): + self.ioctx.write('a', b'') + self.ioctx.write('b', b'foo') + self.ioctx.write_full('c', b'bar') + self.ioctx.append('d', b'jazz') + object_names = [obj.key for obj in self.ioctx.list_objects()] + eq(sorted(object_names), ['a', 'b', 'c', 'd']) + + def test_list_ns_objects(self): + self.ioctx.write('a', b'') + self.ioctx.write('b', b'foo') + self.ioctx.write_full('c', b'bar') + self.ioctx.append('d', b'jazz') + self.ioctx.set_namespace("ns1") + self.ioctx.write('ns1-a', b'') + self.ioctx.write('ns1-b', b'foo') + self.ioctx.write_full('ns1-c', b'bar') + self.ioctx.append('ns1-d', b'jazz') + self.ioctx.append('d', b'jazz') + self.ioctx.set_namespace(LIBRADOS_ALL_NSPACES) + object_names = [(obj.nspace, obj.key) for obj in self.ioctx.list_objects()] + eq(sorted(object_names), [('', 'a'), ('','b'), ('','c'), ('','d'),\ + ('ns1', 'd'), ('ns1', 'ns1-a'), ('ns1', 'ns1-b'),\ + ('ns1', 'ns1-c'), ('ns1', 'ns1-d')]) + + def test_xattrs(self): + xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f='') + self.ioctx.write('abc', b'') + for key, value in xattrs.items(): + self.ioctx.set_xattr('abc', key, value) + eq(self.ioctx.get_xattr('abc', key), value) + stored_xattrs = {} + for key, value in self.ioctx.get_xattrs('abc'): + stored_xattrs[key] = value + eq(stored_xattrs, xattrs) + + def test_obj_xattrs(self): + xattrs = dict(a=b'1', b=b'2', c=b'3', d=b'a\0b', e=b'\0', f='') + self.ioctx.write('abc', b'') + obj = list(self.ioctx.list_objects())[0] + for key, value in xattrs.items(): + obj.set_xattr(key, value) + eq(obj.get_xattr(key), value) + stored_xattrs = {} + for key, value in obj.get_xattrs(): + stored_xattrs[key] = value + eq(stored_xattrs, xattrs) + + def test_create_snap(self): + assert_raises(ObjectNotFound, self.ioctx.remove_snap, 'foo') + self.ioctx.create_snap('foo') + self.ioctx.remove_snap('foo') + + def test_list_snaps_empty(self): + eq(list(self.ioctx.list_snaps()), []) + + def test_list_snaps(self): + snaps = ['snap1', 'snap2', 'snap3'] + for snap in snaps: + self.ioctx.create_snap(snap) + listed_snaps = [snap.name for snap in self.ioctx.list_snaps()] + eq(snaps, listed_snaps) + + def test_lookup_snap(self): + self.ioctx.create_snap('foo') + snap = self.ioctx.lookup_snap('foo') + eq(snap.name, 'foo') + + def test_snap_timestamp(self): + self.ioctx.create_snap('foo') + snap = self.ioctx.lookup_snap('foo') + snap.get_timestamp() + + def test_remove_snap(self): + self.ioctx.create_snap('foo') + (snap,) = self.ioctx.list_snaps() + eq(snap.name, 'foo') + self.ioctx.remove_snap('foo') + eq(list(self.ioctx.list_snaps()), []) + + def test_snap_rollback(self): + self.ioctx.write("insnap", b"contents1") + self.ioctx.create_snap("snap1") + self.ioctx.remove_object("insnap") + self.ioctx.snap_rollback("insnap", "snap1") + eq(self.ioctx.read("insnap"), b"contents1") + self.ioctx.remove_snap("snap1") + self.ioctx.remove_object("insnap") + + def test_snap_read(self): + self.ioctx.write("insnap", b"contents1") + self.ioctx.create_snap("snap1") + self.ioctx.remove_object("insnap") + snap = self.ioctx.lookup_snap("snap1") + self.ioctx.set_read(snap.snap_id) + eq(self.ioctx.read("insnap"), b"contents1") + self.ioctx.set_read(LIBRADOS_SNAP_HEAD) + self.ioctx.write("inhead", b"contents2") + eq(self.ioctx.read("inhead"), b"contents2") + self.ioctx.remove_snap("snap1") + self.ioctx.remove_object("inhead") + + def test_set_omap(self): + keys = ("1", "2", "3", "4") + values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") + with WriteOpCtx() as write_op: + self.ioctx.set_omap(write_op, keys, values) + write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS) + self.ioctx.operate_write_op(write_op, "hw") + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4) + eq(ret, 0) + self.ioctx.operate_read_op(read_op, "hw") + next(iter) + eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals(read_op, "2", "", 4) + eq(ret, 0) + self.ioctx.operate_read_op(read_op, "hw") + eq(("3", b"ccc"), next(iter)) + eq(list(iter), [("4", b"\x04\x04\x04\x04")]) + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4) + eq(ret, 0) + read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS) + self.ioctx.operate_read_op(read_op, "hw") + eq(list(iter), [("2", b"bbb")]) + + def test_set_omap_aio(self): + lock = threading.Condition() + count = [0] + def cb(blah): + with lock: + count[0] += 1 + lock.notify() + return 0 + + keys = ("1", "2", "3", "4") + values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") + with WriteOpCtx() as write_op: + self.ioctx.set_omap(write_op, keys, values) + comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb) + comp.wait_for_complete() + comp.wait_for_safe() + with lock: + while count[0] < 2: + lock.wait() + eq(comp.get_return_value(), 0) + + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4) + eq(ret, 0) + comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb) + comp.wait_for_complete() + comp.wait_for_safe() + with lock: + while count[0] < 4: + lock.wait() + eq(comp.get_return_value(), 0) + next(iter) + eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) + + def test_write_ops(self): + with WriteOpCtx() as write_op: + write_op.new(0) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'') + write_op.write_full(b'1') + write_op.append(b'2') + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12') + write_op.write_full(b'12345') + write_op.write(b'x', 2) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12x45') + write_op.write_full(b'12345') + write_op.zero(2, 2) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12\x00\x005') + write_op.write_full(b'12345') + write_op.truncate(2) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12') + write_op.remove() + self.ioctx.operate_write_op(write_op, "write_ops") + with assert_raises(ObjectNotFound): + self.ioctx.read('write_ops') + + def test_get_omap_vals_by_keys(self): + keys = ("1", "2", "3", "4") + values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") + with WriteOpCtx() as write_op: + self.ioctx.set_omap(write_op, keys, values) + self.ioctx.operate_write_op(write_op, "hw") + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",)) + eq(ret, 0) + self.ioctx.operate_read_op(read_op, "hw") + eq(list(iter), [("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("3","4",)) + eq(ret, 0) + with assert_raises(ObjectNotFound): + self.ioctx.operate_read_op(read_op, "no_such") + + def test_get_omap_keys(self): + keys = ("1", "2", "3") + values = (b"aaa", b"bbb", b"ccc") + with WriteOpCtx() as write_op: + self.ioctx.set_omap(write_op, keys, values) + self.ioctx.operate_write_op(write_op, "hw") + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_keys(read_op,"",2) + eq(ret, 0) + self.ioctx.operate_read_op(read_op, "hw") + eq(list(iter), [("1", None), ("2", None)]) + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_keys(read_op,"",2) + eq(ret, 0) + with assert_raises(ObjectNotFound): + self.ioctx.operate_read_op(read_op, "no_such") + + def test_clear_omap(self): + keys = ("1", "2", "3") + values = (b"aaa", b"bbb", b"ccc") + with WriteOpCtx() as write_op: + self.ioctx.set_omap(write_op, keys, values) + self.ioctx.operate_write_op(write_op, "hw") + with WriteOpCtx() as write_op_1: + self.ioctx.clear_omap(write_op_1) + self.ioctx.operate_write_op(write_op_1, "hw") + with ReadOpCtx() as read_op: + iter, ret = self.ioctx.get_omap_vals_by_keys(read_op,("1",)) + eq(ret, 0) + self.ioctx.operate_read_op(read_op, "hw") + eq(list(iter), []) + + def test_locator(self): + self.ioctx.set_locator_key("bar") + self.ioctx.write('foo', b'contents1') + objects = [i for i in self.ioctx.list_objects()] + eq(len(objects), 1) + eq(self.ioctx.get_locator_key(), "bar") + self.ioctx.set_locator_key("") + objects[0].seek(0) + objects[0].write(b"contents2") + eq(self.ioctx.get_locator_key(), "") + self.ioctx.set_locator_key("bar") + contents = self.ioctx.read("foo") + eq(contents, b"contents2") + eq(self.ioctx.get_locator_key(), "bar") + objects[0].remove() + objects = [i for i in self.ioctx.list_objects()] + eq(objects, []) + self.ioctx.set_locator_key("") + + def test_aio_write(self): + lock = threading.Condition() + count = [0] + def cb(blah): + with lock: + count[0] += 1 + lock.notify() + return 0 + comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb) + comp.wait_for_complete() + comp.wait_for_safe() + with lock: + while count[0] < 2: + lock.wait() + eq(comp.get_return_value(), 0) + contents = self.ioctx.read("foo") + eq(contents, b"bar") + [i.remove() for i in self.ioctx.list_objects()] + + def test_aio_write_no_comp_ref(self): + lock = threading.Condition() + count = [0] + def cb(blah): + with lock: + count[0] += 1 + lock.notify() + return 0 + # NOTE(sileht): force don't save the comp into local var + # to ensure all references are correctly tracked into the lib + self.ioctx.aio_write("foo", b"bar", 0, cb, cb) + with lock: + while count[0] < 2: + lock.wait() + contents = self.ioctx.read("foo") + eq(contents, b"bar") + [i.remove() for i in self.ioctx.list_objects()] + + def test_aio_append(self): + lock = threading.Condition() + count = [0] + def cb(blah): + with lock: + count[0] += 1 + lock.notify() + return 0 + comp = self.ioctx.aio_write("foo", b"bar", 0, cb, cb) + comp2 = self.ioctx.aio_append("foo", b"baz", cb, cb) + comp.wait_for_complete() + contents = self.ioctx.read("foo") + eq(contents, b"barbaz") + with lock: + while count[0] < 4: + lock.wait() + eq(comp.get_return_value(), 0) + eq(comp2.get_return_value(), 0) + [i.remove() for i in self.ioctx.list_objects()] + + def test_aio_write_full(self): + lock = threading.Condition() + count = [0] + def cb(blah): + with lock: + count[0] += 1 + lock.notify() + return 0 + self.ioctx.aio_write("foo", b"barbaz", 0, cb, cb) + comp = self.ioctx.aio_write_full("foo", b"bar", cb, cb) + comp.wait_for_complete() + comp.wait_for_safe() + with lock: + while count[0] < 2: + lock.wait() + eq(comp.get_return_value(), 0) + contents = self.ioctx.read("foo") + eq(contents, b"bar") + [i.remove() for i in self.ioctx.list_objects()] + + def test_aio_stat(self): + lock = threading.Condition() + count = [0] + def cb(_, size, mtime): + with lock: + count[0] += 1 + lock.notify() + + comp = self.ioctx.aio_stat("foo", cb) + comp.wait_for_complete() + with lock: + while count[0] < 1: + lock.wait() + eq(comp.get_return_value(), -2) + + self.ioctx.write("foo", b"bar") + + comp = self.ioctx.aio_stat("foo", cb) + comp.wait_for_complete() + with lock: + while count[0] < 2: + lock.wait() + eq(comp.get_return_value(), 0) + + [i.remove() for i in self.ioctx.list_objects()] + + def _take_down_acting_set(self, pool, objectname): + # find acting_set for pool:objectname and take it down; used to + # verify that async reads don't complete while acting set is missing + cmd = { + "prefix":"osd map", + "pool":pool, + "object":objectname, + "format":"json", + } + r, jsonout, _ = self.rados.mon_command(json.dumps(cmd), b'') + objmap = json.loads(jsonout.decode("utf-8")) + acting_set = objmap['acting'] + cmd = {"prefix":"osd set", "key":"noup"} + r, _, _ = self.rados.mon_command(json.dumps(cmd), b'') + eq(r, 0) + cmd = {"prefix":"osd down", "ids":[str(i) for i in acting_set]} + r, _, _ = self.rados.mon_command(json.dumps(cmd), b'') + eq(r, 0) + + # wait for OSDs to acknowledge the down + eq(self.rados.wait_for_latest_osdmap(), 0) + + def _let_osds_back_up(self): + cmd = {"prefix":"osd unset", "key":"noup"} + r, _, _ = self.rados.mon_command(json.dumps(cmd), b'') + eq(r, 0) + + def test_aio_read(self): + # this is a list so that the local cb() can modify it + retval = [None] + lock = threading.Condition() + def cb(_, buf): + with lock: + retval[0] = buf + lock.notify() + payload = b"bar\000frob" + self.ioctx.write("foo", payload) + + # test1: use wait_for_complete() and wait for cb by + # watching retval[0] + self._take_down_acting_set('test_pool', 'foo') + comp = self.ioctx.aio_read("foo", len(payload), 0, cb) + eq(False, comp.is_complete()) + time.sleep(3) + eq(False, comp.is_complete()) + with lock: + eq(None, retval[0]) + self._let_osds_back_up() + comp.wait_for_complete() + loops = 0 + with lock: + while retval[0] is None and loops <= 10: + lock.wait(timeout=5) + loops += 1 + assert(loops <= 10) + + eq(retval[0], payload) + eq(sys.getrefcount(comp), 2) + + # test2: use wait_for_complete_and_cb(), verify retval[0] is + # set by the time we regain control + + retval[0] = None + self._take_down_acting_set('test_pool', 'foo') + comp = self.ioctx.aio_read("foo", len(payload), 0, cb) + eq(False, comp.is_complete()) + time.sleep(3) + eq(False, comp.is_complete()) + with lock: + eq(None, retval[0]) + self._let_osds_back_up() + + comp.wait_for_complete_and_cb() + assert(retval[0] is not None) + eq(retval[0], payload) + eq(sys.getrefcount(comp), 2) + + # test3: error case, use wait_for_complete_and_cb(), verify retval[0] is + # set by the time we regain control + + retval[0] = 1 + self._take_down_acting_set('test_pool', 'bar') + comp = self.ioctx.aio_read("bar", len(payload), 0, cb) + eq(False, comp.is_complete()) + time.sleep(3) + eq(False, comp.is_complete()) + with lock: + eq(1, retval[0]) + self._let_osds_back_up() + + comp.wait_for_complete_and_cb() + eq(None, retval[0]) + assert(comp.get_return_value() < 0) + eq(sys.getrefcount(comp), 2) + + [i.remove() for i in self.ioctx.list_objects()] + + def test_lock(self): + self.ioctx.lock_exclusive("foo", "lock", "locker", "desc_lock", + 10000, 0) + assert_raises(ObjectExists, + self.ioctx.lock_exclusive, + "foo", "lock", "locker", "desc_lock", 10000, 0) + self.ioctx.unlock("foo", "lock", "locker") + assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker") + + self.ioctx.lock_shared("foo", "lock", "locker1", "tag", "desc_lock", + 10000, 0) + self.ioctx.lock_shared("foo", "lock", "locker2", "tag", "desc_lock", + 10000, 0) + assert_raises(ObjectBusy, + self.ioctx.lock_exclusive, + "foo", "lock", "locker3", "desc_lock", 10000, 0) + self.ioctx.unlock("foo", "lock", "locker1") + self.ioctx.unlock("foo", "lock", "locker2") + assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker1") + assert_raises(ObjectNotFound, self.ioctx.unlock, "foo", "lock", "locker2") + + def test_execute(self): + self.ioctx.write("foo", b"") # ensure object exists + + ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"") + eq(buf, b"Hello, world!") + + ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose") + eq(buf, b"Hello, nose!") + + def test_aio_execute(self): + count = [0] + retval = [None] + lock = threading.Condition() + def cb(_, buf): + with lock: + if retval[0] is None: + retval[0] = buf + count[0] += 1 + lock.notify() + self.ioctx.write("foo", b"") # ensure object exists + + comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb) + comp.wait_for_complete() + with lock: + while count[0] < 2: + lock.wait() + eq(comp.get_return_value(), 13) + eq(retval[0], b"Hello, world!") + + retval[0] = None + comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb) + comp.wait_for_complete() + with lock: + while count[0] < 4: + lock.wait() + eq(comp.get_return_value(), 12) + eq(retval[0], b"Hello, nose!") + + [i.remove() for i in self.ioctx.list_objects()] + + def test_applications(self): + cmd = {"prefix":"osd dump", "format":"json"} + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'') + eq(ret, 0) + assert len(buf) > 0 + release = json.loads(buf.decode("utf-8")).get("require_osd_release", + None) + if not release or release[0] < 'l': + raise SkipTest + + eq([], self.ioctx.application_list()) + + self.ioctx.application_enable("app1") + assert_raises(Error, self.ioctx.application_enable, "app2") + self.ioctx.application_enable("app2", True) + + assert_raises(Error, self.ioctx.application_metadata_list, "dne") + eq([], self.ioctx.application_metadata_list("app1")) + + assert_raises(Error, self.ioctx.application_metadata_set, "dne", "key", + "key") + self.ioctx.application_metadata_set("app1", "key1", "val1") + self.ioctx.application_metadata_set("app1", "key2", "val2") + self.ioctx.application_metadata_set("app2", "key1", "val1") + + eq([("key1", "val1"), ("key2", "val2")], + self.ioctx.application_metadata_list("app1")) + + self.ioctx.application_metadata_remove("app1", "key1") + eq([("key2", "val2")], self.ioctx.application_metadata_list("app1")) + + def test_service_daemon(self): + name = "pid-" + str(os.getpid()) + metadata = {'version': '3.14', 'memory': '42'} + self.rados.service_daemon_register("laundry", name, metadata) + status = {'result': 'unknown', 'test': 'running'} + self.rados.service_daemon_update(status) + + def test_alignment(self): + eq(self.ioctx.alignment(), None) + + +class TestIoctxEc(object): + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + self.pool = 'test-ec' + self.profile = 'testprofile-%s' % self.pool + cmd = {"prefix": "osd erasure-code-profile set", + "name": self.profile, "profile": ["k=2", "m=1", "crush-failure-domain=osd"]} + ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0, msg=out) + # create ec pool with profile created above + cmd = {'prefix': 'osd pool create', 'pg_num': 8, 'pgp_num': 8, + 'pool': self.pool, 'pool_type': 'erasure', + 'erasure_code_profile': self.profile} + ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0, msg=out) + assert self.rados.pool_exists(self.pool) + self.ioctx = self.rados.open_ioctx(self.pool) + + def tearDown(self): + cmd = {"prefix": "osd unset", "key": "noup"} + self.rados.mon_command(json.dumps(cmd), b'') + self.ioctx.close() + self.rados.delete_pool(self.pool) + self.rados.shutdown() + + def test_alignment(self): + eq(self.ioctx.alignment(), 8192) + + +class TestIoctx2(object): + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + self.rados.create_pool('test_pool') + assert self.rados.pool_exists('test_pool') + pool_id = self.rados.pool_lookup('test_pool') + assert pool_id > 0 + self.ioctx2 = self.rados.open_ioctx2(pool_id) + + def tearDown(self): + cmd = {"prefix": "osd unset", "key": "noup"} + self.rados.mon_command(json.dumps(cmd), b'') + self.ioctx2.close() + self.rados.delete_pool('test_pool') + self.rados.shutdown() + + def test_get_last_version(self): + version = self.ioctx2.get_last_version() + assert version >= 0 + + def test_get_stats(self): + stats = self.ioctx2.get_stats() + eq(stats, {'num_objects_unfound': 0, + 'num_objects_missing_on_primary': 0, + 'num_object_clones': 0, + 'num_objects': 0, + 'num_object_copies': 0, + 'num_bytes': 0, + 'num_rd_kb': 0, + 'num_wr_kb': 0, + 'num_kb': 0, + 'num_wr': 0, + 'num_objects_degraded': 0, + 'num_rd': 0}) + + +class TestObject(object): + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + self.rados.create_pool('test_pool') + assert self.rados.pool_exists('test_pool') + self.ioctx = self.rados.open_ioctx('test_pool') + self.ioctx.write('foo', b'bar') + self.object = Object(self.ioctx, 'foo') + + def tearDown(self): + self.ioctx.close() + self.ioctx = None + self.rados.delete_pool('test_pool') + self.rados.shutdown() + self.rados = None + + def test_read(self): + eq(self.object.read(3), b'bar') + eq(self.object.read(100), b'') + + def test_seek(self): + self.object.write(b'blah') + self.object.seek(0) + eq(self.object.read(4), b'blah') + self.object.seek(1) + eq(self.object.read(3), b'lah') + + def test_write(self): + self.object.write(b'barbaz') + self.object.seek(0) + eq(self.object.read(3), b'bar') + eq(self.object.read(3), b'baz') + +class TestIoCtxSelfManagedSnaps(object): + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + self.rados.create_pool('test_pool') + assert self.rados.pool_exists('test_pool') + self.ioctx = self.rados.open_ioctx('test_pool') + + def tearDown(self): + cmd = {"prefix":"osd unset", "key":"noup"} + self.rados.mon_command(json.dumps(cmd), b'') + self.ioctx.close() + self.rados.delete_pool('test_pool') + self.rados.shutdown() + + def test(self): + # cannot mix-and-match pool and self-managed snapshot mode + self.ioctx.set_self_managed_snap_write([]) + self.ioctx.write('abc', b'abc') + snap_id_1 = self.ioctx.create_self_managed_snap() + self.ioctx.set_self_managed_snap_write([snap_id_1]) + + self.ioctx.write('abc', b'def') + snap_id_2 = self.ioctx.create_self_managed_snap() + self.ioctx.set_self_managed_snap_write([snap_id_1, snap_id_2]) + + self.ioctx.write('abc', b'ghi') + + self.ioctx.rollback_self_managed_snap('abc', snap_id_1) + eq(self.ioctx.read('abc'), b'abc') + + self.ioctx.rollback_self_managed_snap('abc', snap_id_2) + eq(self.ioctx.read('abc'), b'def') + + self.ioctx.remove_self_managed_snap(snap_id_1) + self.ioctx.remove_self_managed_snap(snap_id_2) + +class TestCommand(object): + + def setUp(self): + self.rados = Rados(conffile='') + self.rados.connect() + + def tearDown(self): + self.rados.shutdown() + + def test_monmap_dump(self): + + # check for success and some plain output with epoch in it + cmd = {"prefix":"mon dump"} + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0) + assert len(buf) > 0 + assert(b'epoch' in buf) + + # JSON, and grab current epoch + cmd['format'] = 'json' + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, 0) + assert len(buf) > 0 + d = json.loads(buf.decode("utf-8")) + assert('epoch' in d) + epoch = d['epoch'] + + # assume epoch + 1000 does not exist; test for ENOENT + cmd['epoch'] = epoch + 1000 + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30) + eq(ret, -errno.ENOENT) + eq(len(buf), 0) + del cmd['epoch'] + + # send to specific target by name + target = d['mons'][0]['name'] + print(target) + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30, + target=target) + eq(ret, 0) + assert len(buf) > 0 + d = json.loads(buf.decode("utf-8")) + assert('epoch' in d) + + # and by rank + target = d['mons'][0]['rank'] + print(target) + ret, buf, errs = self.rados.mon_command(json.dumps(cmd), b'', timeout=30, + target=target) + eq(ret, 0) + assert len(buf) > 0 + d = json.loads(buf.decode("utf-8")) + assert('epoch' in d) + + def test_osd_bench(self): + cmd = dict(prefix='bench', size=4096, count=8192) + ret, buf, err = self.rados.osd_command(0, json.dumps(cmd), b'', + timeout=30) + eq(ret, 0) + assert len(buf) > 0 + out = json.loads(buf.decode('utf-8')) + eq(out['blocksize'], cmd['size']) + eq(out['bytes_written'], cmd['count']) + + def test_ceph_osd_pool_create_utf8(self): + if _python2: + # Use encoded bytestring + poolname = b"\351\273\205" + else: + poolname = "\u9ec5" + + cmd = {"prefix": "osd pool create", "pg_num": 16, "pool": poolname} + ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'') + eq(ret, 0) + assert len(out) > 0 + eq(u"pool '\u9ec5' created", out) |