diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/key_value_store | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/key_value_store')
-rw-r--r-- | src/key_value_store/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/key_value_store/cls_kvs.cc | 693 | ||||
-rw-r--r-- | src/key_value_store/key_value_structure.h | 146 | ||||
-rw-r--r-- | src/key_value_store/kv_flat_btree_async.cc | 2339 | ||||
-rw-r--r-- | src/key_value_store/kv_flat_btree_async.h | 897 | ||||
-rw-r--r-- | src/key_value_store/kvs_arg_types.h | 144 |
6 files changed, 4226 insertions, 0 deletions
diff --git a/src/key_value_store/CMakeLists.txt b/src/key_value_store/CMakeLists.txt new file mode 100644 index 000000000..0b17ede1d --- /dev/null +++ b/src/key_value_store/CMakeLists.txt @@ -0,0 +1,7 @@ +set(kvs_srcs cls_kvs.cc) +add_library(cls_kvs SHARED ${kvs_srcs}) +set_target_properties(cls_kvs PROPERTIES + VERSION "1.0.0" + SOVERSION "1" + INSTALL_RPATH "") +install(TARGETS cls_kvs DESTINATION ${CMAKE_INSTALL_LIBDIR}/rados-classes) diff --git a/src/key_value_store/cls_kvs.cc b/src/key_value_store/cls_kvs.cc new file mode 100644 index 000000000..a246e5f2a --- /dev/null +++ b/src/key_value_store/cls_kvs.cc @@ -0,0 +1,693 @@ +/* + * OSD classes for the key value store + * + * Created on: Aug 10, 2012 + * Author: Eleanor Cawthon + */ + +#include "include/compat.h" +#include "objclass/objclass.h" +#include <errno.h> +#include "key_value_store/kvs_arg_types.h" +#include "include/types.h" +#include <iostream> +#include <climits> + +using std::string; +using std::map; +using std::set; + +/** + * finds the index_data where a key belongs. + * + * @param key: the key to search for + * @param idata: the index_data for the first index value such that idata.key + * is greater than key. + * @param next_idata: the index_data for the next index entry after idata + * @pre: key is not encoded + * @post: idata contains complete information + * stored + */ +static int get_idata_from_key(cls_method_context_t hctx, const string &key, + index_data &idata, index_data &next_idata) { + bufferlist raw_val; + int r = 0; + std::map<std::string, bufferlist> kvmap; + + bool more; + + r = cls_cxx_map_get_vals(hctx, key_data(key).encoded(), "", 2, &kvmap, &more); + if (r < 0) { + CLS_LOG(20, "error reading index for range %s: %d", key.c_str(), r); + return r; + } + + r = cls_cxx_map_get_val(hctx, key_data(key).encoded(), &raw_val); + if (r == 0){ + CLS_LOG(20, "%s is already in the index: %d", key.c_str(), r); + auto b = raw_val.cbegin(); + idata.decode(b); + if (!kvmap.empty()) { + auto b = kvmap.begin()->second.cbegin(); + next_idata.decode(b); + } + return r; + } else if (r == -ENOENT || r == -ENODATA) { + auto b = kvmap.begin()->second.cbegin(); + idata.decode(b); + if (idata.kdata.prefix != "1") { + auto nb = (++kvmap.begin())->second.cbegin(); + next_idata.decode(nb); + } + r = 0; + } else if (r < 0) { + CLS_LOG(20, "error reading index for duplicates %s: %d", key.c_str(), r); + return r; + } + + CLS_LOG(20, "idata is %s", idata.str().c_str()); + return r; +} + + +static int get_idata_from_key_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "get_idata_from_key_op"); + idata_from_key_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + CLS_LOG(20, "error decoding idata_from_key_args."); + return -EINVAL; + } + int r = get_idata_from_key(hctx, op.key, op.idata, op.next_idata); + if (r < 0) { + return r; + } else { + encode(op, *out); + return 0; + } +} + +/** + * finds the object in the index with the lowest key value that is greater + * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If + * idata has a prefix and has timed out, cleans up. + * + * @param idata: idata for the object to search for. + * @param out_data: the idata for the next object. + * + * @pre: idata must contain a key. + * @post: out_data contains complete information + */ +static int get_next_idata(cls_method_context_t hctx, const index_data &idata, + index_data &out_data) { + int r = 0; + std::map<std::string, bufferlist> kvs; + bool more; + r = cls_cxx_map_get_vals(hctx, idata.kdata.encoded(), "", 1, &kvs, &more); + if (r < 0){ + CLS_LOG(20, "getting kvs failed with error %d", r); + return r; + } + + if (!kvs.empty()) { + out_data.kdata.parse(kvs.begin()->first); + auto b = kvs.begin()->second.cbegin(); + out_data.decode(b); + } else { + r = -EOVERFLOW; + } + + return r; +} + +static int get_next_idata_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "get_next_idata_op"); + idata_from_idata_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + return -EINVAL; + } + int r = get_next_idata(hctx, op.idata, op.next_idata); + if (r < 0) { + return r; + } else { + op.encode(*out); + return 0; + } +} + +/** + * finds the object in the index with the highest key value that is less + * than idata.key. If idata.key is the lowest key, returns -ERANGE If + * idata has a prefix and has timed out, cleans up. + * + * @param idata: idata for the object to search for. + * @param out_data: the idata for the next object. + * + * @pre: idata must contain a key. + * @ost: out_data contains complete information + */ +static int get_prev_idata(cls_method_context_t hctx, const index_data &idata, + index_data &out_data) { + int r = 0; + std::map<std::string, bufferlist> kvs; + bool more; + r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &kvs, &more); + if (r < 0){ + CLS_LOG(20, "getting kvs failed with error %d", r); + return r; + } + + std::map<std::string, bufferlist>::iterator it = + kvs.lower_bound(idata.kdata.encoded()); + if (it->first != idata.kdata.encoded()) { + CLS_LOG(20, "object %s not found in the index (expected %s, found %s)", + idata.str().c_str(), idata.kdata.encoded().c_str(), + it->first.c_str()); + return -ENODATA; + } + if (it == kvs.begin()) { + //it is the first object, there is no previous. + return -ERANGE; + } else { + --it; + } + out_data.kdata.parse(it->first); + auto b = it->second.cbegin(); + out_data.decode(b); + + return 0; +} + +static int get_prev_idata_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "get_next_idata_op"); + idata_from_idata_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + return -EINVAL; + } + int r = get_prev_idata(hctx, op.idata, op.next_idata); + if (r < 0) { + return r; + } else { + op.encode(*out); + return 0; + } +} + +/** + * Read all of the index entries where any keys in the map go + */ +static int read_many(cls_method_context_t hctx, const set<string> &keys, + map<string, bufferlist> * out) { + int r = 0; + bool more; + CLS_ERR("reading from a map of size %d, first key encoded is %s", + (int)keys.size(), key_data(*keys.begin()).encoded().c_str()); + r = cls_cxx_map_get_vals(hctx, key_data(*keys.begin()).encoded().c_str(), + "", LONG_MAX, out, &more); + if (r < 0) { + CLS_ERR("getting omap vals failed with error %d", r); + } + + CLS_ERR("got map of size %d ", (int)out->size()); + if (out->size() > 1) { + out->erase(out->upper_bound(key_data(*keys.rbegin()).encoded().c_str()), + out->end()); + } + CLS_ERR("returning map of size %d", (int)out->size()); + return r; +} + +static int read_many_op(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + CLS_LOG(20, "read_many_op"); + set<string> op; + map<string, bufferlist> outmap; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error & err) { + return -EINVAL; + } + int r = read_many(hctx, op, &outmap); + if (r < 0) { + return r; + } else { + encode(outmap, *out); + return 0; + } +} + +/** + * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns + * -EACCES. otherwise, returns 0. + */ +static int check_writable(cls_method_context_t hctx) { + bufferlist bl; + int r = cls_cxx_getxattr(hctx, "unwritable", &bl); + if (r < 0) { + CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r); + return r; + } + if (string(bl.c_str(), bl.length()) == "1") { + return -EACCES; + } else{ + return 0; + } +} + +static int check_writable_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "check_writable_op"); + return check_writable(hctx); +} + +/** + * returns -EKEYREJECTED if size is outside of bound, according to comparator. + * + * @bound: the limit to test + * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT] + */ +static int assert_size_in_bound(cls_method_context_t hctx, int bound, + int comparator) { + //determine size + bufferlist size_bl; + int r = cls_cxx_getxattr(hctx, "size", &size_bl); + if (r < 0) { + CLS_LOG(20, "error reading xattr %s: %d", "size", r); + return r; + } + + int size = atoi(string(size_bl.c_str(), size_bl.length()).c_str()); + CLS_LOG(20, "size is %d, bound is %d", size, bound); + + //compare size to comparator + switch (comparator) { + case CEPH_OSD_CMPXATTR_OP_EQ: + if (size != bound) { + return -EKEYREJECTED; + } + break; + case CEPH_OSD_CMPXATTR_OP_LT: + if (size >= bound) { + return -EKEYREJECTED; + } + break; + case CEPH_OSD_CMPXATTR_OP_GT: + if (size <= bound) { + return -EKEYREJECTED; + } + break; + default: + CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d", + comparator); + return -EINVAL; + } + return 0; +} + +static int assert_size_in_bound_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "assert_size_in_bound_op"); + assert_size_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + return -EINVAL; + } + return assert_size_in_bound(hctx, op.bound, op.comparator); +} + +/** + * Attempts to insert omap into this object's omap. + * + * @return: + * if unwritable, returns -EACCES. + * if size > bound and key doesn't already exist in the omap, returns -EBALANCE. + * if exclusive is true, returns -EEXIST if any keys already exist. + * + * @post: object has omap entries inserted, and size xattr is updated + */ +static int omap_insert(cls_method_context_t hctx, + const map<string, bufferlist> &omap, int bound, bool exclusive) { + + uint64_t size; + time_t time; + int r = cls_cxx_stat(hctx, &size, &time); + if (r < 0) { + return r; + } + CLS_LOG(20, "inserting %s", omap.begin()->first.c_str()); + r = check_writable(hctx); + if (r < 0) { + CLS_LOG(20, "omap_insert: this object is unwritable: %d", r); + return r; + } + + int assert_bound = bound; + + //if this is an exclusive insert, make sure the key doesn't already exist. + for (map<string, bufferlist>::const_iterator it = omap.begin(); + it != omap.end(); ++it) { + bufferlist bl; + r = cls_cxx_map_get_val(hctx, it->first, &bl); + if (r == 0 && string(bl.c_str(), bl.length()) != ""){ + if (exclusive) { + CLS_LOG(20, "error: this is an exclusive insert and %s exists.", + it->first.c_str()); + return -EEXIST; + } + assert_bound++; + CLS_LOG(20, "increased assert_bound to %d", assert_bound); + } else if (r != -ENODATA && r != -ENOENT) { + CLS_LOG(20, "error reading omap val for %s: %d", it->first.c_str(), r); + return r; + } + } + + bufferlist old_size; + r = cls_cxx_getxattr(hctx, "size", &old_size); + if (r < 0) { + CLS_LOG(20, "error reading xattr %s: %d", "size", r); + return r; + } + + int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str()); + + CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound, bound); + if (old_size_int >= assert_bound) { + return -EKEYREJECTED; + } + + int new_size_int = old_size_int + omap.size() - (assert_bound - bound); + CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int); + bufferlist new_size; + std::stringstream s; + s << new_size_int; + new_size.append(s.str()); + + r = cls_cxx_map_set_vals(hctx, &omap); + if (r < 0) { + CLS_LOG(20, "error setting omap: %d", r); + return r; + } + + r = cls_cxx_setxattr(hctx, "size", &new_size); + if (r < 0) { + CLS_LOG(20, "error setting xattr %s: %d", "size", r); + return r; + } + CLS_LOG(20, "successfully inserted %s", omap.begin()->first.c_str()); + return 0; +} + +static int omap_insert_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "omap_insert"); + omap_set_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + return -EINVAL; + } + return omap_insert(hctx, op.omap, op.bound, op.exclusive); +} + +static int create_with_omap(cls_method_context_t hctx, + const map<string, bufferlist> &omap) { + CLS_LOG(20, "creating with omap: %s", omap.begin()->first.c_str()); + //first make sure the object is writable + int r = cls_cxx_create(hctx, true); + if (r < 0) { + CLS_LOG(20, "omap create: creating failed: %d", r); + return r; + } + + int new_size_int = omap.size(); + CLS_LOG(20, "omap insert: new size is %d", new_size_int); + bufferlist new_size; + std::stringstream s; + s << new_size_int; + new_size.append(s.str()); + + r = cls_cxx_map_set_vals(hctx, &omap); + if (r < 0) { + CLS_LOG(20, "omap create: error setting omap: %d", r); + return r; + } + + r = cls_cxx_setxattr(hctx, "size", &new_size); + if (r < 0) { + CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r); + return r; + } + + bufferlist u; + u.append("0"); + r = cls_cxx_setxattr(hctx, "unwritable", &u); + if (r < 0) { + CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r); + return r; + } + + CLS_LOG(20, "successfully created %s", omap.begin()->first.c_str()); + return 0; +} + +static int create_with_omap_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "omap_insert"); + map<string, bufferlist> omap; + auto it = in->cbegin(); + try { + decode(omap, it); + } catch (buffer::error& err) { + return -EINVAL; + } + return create_with_omap(hctx, omap); +} + +/** + * Attempts to remove omap from this object's omap. + * + * @return: + * if unwritable, returns -EACCES. + * if size < bound and key doesn't already exist in the omap, returns -EBALANCE. + * if any of the keys are not in this object, returns -ENODATA. + * + * @post: object has omap entries removed, and size xattr is updated + */ +static int omap_remove(cls_method_context_t hctx, + const std::set<string> &omap, int bound) { + int r; + uint64_t size; + time_t time; + r = cls_cxx_stat(hctx, &size, &time); + if (r < 0) { + return r; + } + + //first make sure the object is writable + r = check_writable(hctx); + if (r < 0) { + return r; + } + + //check for existance of the key first + for (set<string>::const_iterator it = omap.begin(); + it != omap.end(); ++it) { + bufferlist bl; + r = cls_cxx_map_get_val(hctx, *it, &bl); + if (r == -ENOENT || r == -ENODATA + || string(bl.c_str(), bl.length()) == ""){ + return -ENODATA; + } else if (r < 0) { + CLS_LOG(20, "error reading omap val for %s: %d", it->c_str(), r); + return r; + } + } + + //fail if removing from an object with only bound entries. + bufferlist old_size; + r = cls_cxx_getxattr(hctx, "size", &old_size); + if (r < 0) { + CLS_LOG(20, "error reading xattr %s: %d", "size", r); + return r; + } + int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str()); + + CLS_LOG(20, "asserting size is greater than %d", bound); + if (old_size_int <= bound) { + return -EKEYREJECTED; + } + + int new_size_int = old_size_int - omap.size(); + CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int); + bufferlist new_size; + std::stringstream s; + s << new_size_int; + new_size.append(s.str()); + + r = cls_cxx_setxattr(hctx, "size", &new_size); + if (r < 0) { + CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r); + return r; + } + + for (std::set<string>::const_iterator it = omap.begin(); + it != omap.end(); ++it) { + r = cls_cxx_map_remove_key(hctx, *it); + if (r < 0) { + CLS_LOG(20, "error removing omap: %d", r); + return r; + } + } + return 0; +} + +static int omap_remove_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "omap_remove"); + omap_rm_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + return -EINVAL; + } + return omap_remove(hctx, op.omap, op.bound); +} + +/** + * checks to see if this object needs to be split or rebalanced. if so, reads + * information about it. + * + * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds, + * odata contains the size, omap, and unwritable attributes for this object. + * Otherwise, odata contains the size and unwritable attribute. + */ +static int maybe_read_for_balance(cls_method_context_t hctx, + object_data &odata, int bound, int comparator) { + CLS_LOG(20, "rebalance reading"); + //if unwritable, return + int r = check_writable(hctx); + if (r < 0) { + odata.unwritable = true; + CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r); + return r; + } else { + odata.unwritable = false; + } + + //get the size attribute + bufferlist size; + r = cls_cxx_getxattr(hctx, "size", &size); + if (r < 0) { + CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r); + return r; + } + odata.size = atoi(string(size.c_str(), size.length()).c_str()); + + //check if it needs to be balanced + r = assert_size_in_bound(hctx, bound, comparator); + if (r < 0) { + CLS_LOG(20, "rebalance read: error on asserting size: %d", r); + return -EBALANCE; + } + + //if the assert succeeded, it needs to be balanced + bool more; + r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &odata.omap, &more); + if (r < 0){ + CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r); + return r; + } + + CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu", + (unsigned long long)odata.size, + (unsigned long long)odata.omap.size()); + return 0; +} + +static int maybe_read_for_balance_op(cls_method_context_t hctx, + bufferlist *in, bufferlist *out) { + CLS_LOG(20, "maybe_read_for_balance"); + rebalance_args op; + auto it = in->cbegin(); + try { + decode(op, it); + } catch (buffer::error& err) { + return -EINVAL; + } + int r = maybe_read_for_balance(hctx, op.odata, op.bound, op.comparator); + if (r < 0) { + return r; + } else { + op.encode(*out); + return 0; + } +} + + +CLS_INIT(kvs) +{ + CLS_LOG(20, "Loaded assert condition class!"); + + cls_handle_t h_class; + cls_method_handle_t h_get_idata_from_key; + cls_method_handle_t h_get_next_idata; + cls_method_handle_t h_get_prev_idata; + cls_method_handle_t h_read_many; + cls_method_handle_t h_check_writable; + cls_method_handle_t h_assert_size_in_bound; + cls_method_handle_t h_omap_insert; + cls_method_handle_t h_create_with_omap; + cls_method_handle_t h_omap_remove; + cls_method_handle_t h_maybe_read_for_balance; + + cls_register("kvs", &h_class); + cls_register_cxx_method(h_class, "get_idata_from_key", + CLS_METHOD_RD, + get_idata_from_key_op, &h_get_idata_from_key); + cls_register_cxx_method(h_class, "get_next_idata", + CLS_METHOD_RD, + get_next_idata_op, &h_get_next_idata); + cls_register_cxx_method(h_class, "get_prev_idata", + CLS_METHOD_RD, + get_prev_idata_op, &h_get_prev_idata); + cls_register_cxx_method(h_class, "read_many", + CLS_METHOD_RD, + read_many_op, &h_read_many); + cls_register_cxx_method(h_class, "check_writable", + CLS_METHOD_RD | CLS_METHOD_WR, + check_writable_op, &h_check_writable); + cls_register_cxx_method(h_class, "assert_size_in_bound", + CLS_METHOD_WR, + assert_size_in_bound_op, &h_assert_size_in_bound); + cls_register_cxx_method(h_class, "omap_insert", + CLS_METHOD_WR, + omap_insert_op, &h_omap_insert); + cls_register_cxx_method(h_class, "create_with_omap", + CLS_METHOD_WR, + create_with_omap_op, &h_create_with_omap); + cls_register_cxx_method(h_class, "omap_remove", + CLS_METHOD_WR, + omap_remove_op, &h_omap_remove); + cls_register_cxx_method(h_class, "maybe_read_for_balance", + CLS_METHOD_RD, + maybe_read_for_balance_op, &h_maybe_read_for_balance); + + return; +} diff --git a/src/key_value_store/key_value_structure.h b/src/key_value_store/key_value_structure.h new file mode 100644 index 000000000..e0408e583 --- /dev/null +++ b/src/key_value_store/key_value_structure.h @@ -0,0 +1,146 @@ +/* + * Interface for key-value store using librados + * + * September 2, 2012 + * Eleanor Cawthon + * eleanor.cawthon@inktank.com + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef KEY_VALUE_STRUCTURE_HPP_ +#define KEY_VALUE_STRUCTURE_HPP_ + +#include "include/rados/librados.hpp" +#include "include/utime.h" +#include <vector> + +using ceph::bufferlist; + +class KeyValueStructure; + +/**An injection_t is a function that is called before every + * ObjectWriteOperation to test concurrency issues. For example, + * one injection_t might cause the client to have a greater chance of dying + * mid-split/merge. + */ +typedef int (KeyValueStructure::*injection_t)(); + +/** + * Passed to aio methods to be called when the operation completes + */ +typedef void (*callback)(int * err, void *arg); + +class KeyValueStructure{ +public: + std::map<char, int> opmap; + + //these are injection methods. By default, nothing is called at each + //interruption point. + /** + * returns 0 + */ + virtual int nothing() = 0; + /** + * 10% chance of waiting wait_ms seconds + */ + virtual int wait() = 0; + /** + * 10% chance of killing the client. + */ + virtual int suicide() = 0; + + ////////////////DESTRUCTOR///////////////// + virtual ~KeyValueStructure() {} + + ////////////////UPDATERS/////////////////// + + /** + * set up the KeyValueStructure (i.e., initialize rados/io_ctx, etc.) + */ + virtual int setup(int argc, const char** argv) = 0; + + /** + * set the method that gets called before each ObjectWriteOperation. + * If waite_time is set and the method passed involves waiting, it will wait + * for that many milliseconds. + */ + virtual void set_inject(injection_t inject, int wait_time) = 0; + + /** + * if update_on_existing is false, returns an error if + * key already exists in the structure + */ + virtual int set(const std::string &key, const bufferlist &val, + bool update_on_existing) = 0; + + /** + * efficiently insert the contents of in_map into the structure + */ + virtual int set_many(const std::map<std::string, bufferlist> &in_map) = 0; + + /** + * removes the key-value for key. returns an error if key does not exist + */ + virtual int remove(const std::string &key) = 0; + + /** + * removes all keys and values + */ + virtual int remove_all() = 0; + + + /** + * launches a thread to get the value of key. When complete, calls cb(cb_args) + */ + virtual void aio_get(const std::string &key, bufferlist *val, callback cb, + void *cb_args, int * err) = 0; + + /** + * launches a thread to set key to val. When complete, calls cb(cb_args) + */ + virtual void aio_set(const std::string &key, const bufferlist &val, bool exclusive, + callback cb, void * cb_args, int * err) = 0; + + /** + * launches a thread to remove key. When complete, calls cb(cb_args) + */ + virtual void aio_remove(const std::string &key, callback cb, void *cb_args, + int * err) = 0; + + ////////////////READERS//////////////////// + /** + * gets the val associated with key. + * + * @param key the key to get + * @param val the value is stored in this + * @return error code + */ + virtual int get(const std::string &key, bufferlist *val) = 0; + + /** + * stores all keys in keys. set should put them in order by key. + */ + virtual int get_all_keys(std::set<std::string> *keys) = 0; + + /** + * stores all keys and values in kv_map. map should put them in order by key. + */ + virtual int get_all_keys_and_values(std::map<std::string,bufferlist> *kv_map) = 0; + + /** + * True if the structure meets its own requirements for consistency. + */ + virtual bool is_consistent() = 0; + + /** + * prints a string representation of the structure + */ + virtual std::string str() = 0; +}; + + +#endif /* KEY_VALUE_STRUCTURE_HPP_ */ diff --git a/src/key_value_store/kv_flat_btree_async.cc b/src/key_value_store/kv_flat_btree_async.cc new file mode 100644 index 000000000..19c388cf0 --- /dev/null +++ b/src/key_value_store/kv_flat_btree_async.cc @@ -0,0 +1,2339 @@ +/* + * Key-value store using librados + * + * September 2, 2012 + * Eleanor Cawthon + * eleanor.cawthon@inktank.com + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "include/compat.h" +#include "key_value_store/key_value_structure.h" +#include "key_value_store/kv_flat_btree_async.h" +#include "key_value_store/kvs_arg_types.h" +#include "include/rados/librados.hpp" +#include "common/ceph_context.h" +#include "common/Clock.h" +#include "include/types.h" + +#include <errno.h> +#include <string> +#include <iostream> +#include <cassert> +#include <climits> +#include <cmath> +#include <sstream> +#include <stdlib.h> +#include <iterator> + +using ceph::bufferlist; +using namespace std; + +bool index_data::is_timed_out(utime_t now, utime_t timeout) const { + return prefix != "" && now - ts > timeout; +} + +void IndexCache::clear() { + k2itmap.clear(); + t2kmap.clear(); +} + +void IndexCache::push(const string &key, const index_data &idata) { + if (cache_size == 0) { + return; + } + index_data old_idata; + std::map<key_data, std::pair<index_data, utime_t> >::iterator old_it = + k2itmap.lower_bound(key_data(key)); + if (old_it != k2itmap.end()) { + t2kmap.erase(old_it->second.second); + k2itmap.erase(old_it); + } + std::map<key_data, std::pair<index_data, utime_t> >::iterator new_it = + k2itmap.find(idata.kdata); + if (new_it != k2itmap.end()) { + utime_t old_time = new_it->second.second; + t2kmap.erase(old_time); + } + utime_t time = ceph_clock_now(); + k2itmap[idata.kdata] = std::make_pair(idata, time); + t2kmap[time] = idata.kdata; + if ((int)k2itmap.size() > cache_size) { + pop(); + } + +} + +void IndexCache::push(const index_data &idata) { + if (cache_size == 0) { + return; + } + if (k2itmap.count(idata.kdata) > 0) { + utime_t old_time = k2itmap[idata.kdata].second; + t2kmap.erase(old_time); + k2itmap.erase(idata.kdata); + } + utime_t time = ceph_clock_now(); + k2itmap[idata.kdata] = std::make_pair(idata, time); + t2kmap[time] = idata.kdata; + if ((int)k2itmap.size() > cache_size) { + pop(); + } +} + +void IndexCache::pop() { + if (cache_size == 0) { + return; + } + std::map<utime_t, key_data>::iterator it = t2kmap.begin(); + utime_t time = it->first; + key_data kdata = it->second; + k2itmap.erase(kdata); + t2kmap.erase(time); +} + +void IndexCache::erase(key_data kdata) { + if (cache_size == 0) { + return; + } + if (k2itmap.count(kdata) > 0) { + utime_t c = k2itmap[kdata].second; + k2itmap.erase(kdata); + t2kmap.erase(c); + } +} + +int IndexCache::get(const string &key, index_data *idata) const { + if (cache_size == 0) { + return -ENODATA; + } + if ((int)k2itmap.size() == 0) { + return -ENODATA; + } + std::map<key_data, std::pair<index_data, utime_t> >::const_iterator it = + k2itmap.lower_bound(key_data(key)); + if (it == k2itmap.end() || !(it->second.first.min_kdata < key_data(key))) { + return -ENODATA; + } else { + *idata = it->second.first; + } + return 0; +} + +int IndexCache::get(const string &key, index_data *idata, + index_data *next_idata) const { + if (cache_size == 0) { + return -ENODATA; + } + std::map<key_data, std::pair<index_data, utime_t> >::const_iterator it = + k2itmap.lower_bound(key_data(key)); + if (it == k2itmap.end() || ++it == k2itmap.end()) { + return -ENODATA; + } else { + --it; + if (!(it->second.first.min_kdata < key_data(key))){ + //stale, should be reread. + return -ENODATA; + } else { + *idata = it->second.first; + ++it; + if (it != k2itmap.end()) { + *next_idata = it->second.first; + } + } + } + return 0; +} + +int KvFlatBtreeAsync::nothing() { + return 0; +} + +int KvFlatBtreeAsync::wait() { + if (rand() % 10 == 0) { + usleep(wait_ms); + } + return 0; +} + +int KvFlatBtreeAsync::suicide() { + if (rand() % 10 == 0) { + if (verbose) cout << client_name << " is suiciding" << std::endl; + return 1; + } + return 0; +} + +int KvFlatBtreeAsync::next(const index_data &idata, index_data * out_data) +{ + if (verbose) cout << "\t\t" << client_name << "-next: finding next of " + << idata.str() + << std::endl; + int err = 0; + librados::ObjectReadOperation oro; + std::map<std::string, bufferlist> kvs; + oro.omap_get_vals2(idata.kdata.encoded(),1,&kvs, nullptr, &err); + err = io_ctx.operate(index_name, &oro, NULL); + if (err < 0){ + if (verbose) cout << "\t\t\t" << client_name + << "-next: getting index failed with error " + << err << std::endl; + return err; + } + if (!kvs.empty()) { + out_data->kdata.parse(kvs.begin()->first); + auto b = kvs.begin()->second.cbegin(); + out_data->decode(b); + if (idata.is_timed_out(ceph_clock_now(), timeout)) { + if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED." + << std::endl; + //the client died after deleting the object. clean up. + cleanup(idata, err); + } + } else { + err = -EOVERFLOW; + } + return err; +} + +int KvFlatBtreeAsync::prev(const index_data &idata, index_data * out_data) +{ + if (verbose) cout << "\t\t" << client_name << "-prev: finding prev of " + << idata.str() << std::endl; + int err = 0; + bufferlist inbl; + idata_from_idata_args in_args; + in_args.idata = idata; + in_args.encode(inbl); + bufferlist outbl; + err = io_ctx.exec(index_name,"kvs", "get_prev_idata", inbl, outbl); + if (err < 0){ + if (verbose) cout << "\t\t\t" << client_name + << "-prev: getting index failed with error " + << err << std::endl; + if (idata.is_timed_out(ceph_clock_now(), timeout)) { + if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED." + << std::endl; + //the client died after deleting the object. clean up. + err = cleanup(idata, err); + if (err == -ESUICIDE) { + return err; + } else { + err = 0; + } + } + return err; + } + auto it = outbl.cbegin(); + in_args.decode(it); + *out_data = in_args.next_idata; + if (verbose) cout << "\t\t" << client_name << "-prev: prev is " + << out_data->str() + << std::endl; + return err; +} + +int KvFlatBtreeAsync::read_index(const string &key, index_data * idata, + index_data * next_idata, bool force_update) { + int err = 0; + if (!force_update) { + if (verbose) cout << "\t" << client_name + << "-read_index: getting index_data for " << key + << " from cache" << std::endl; + icache_lock.lock(); + if (next_idata != NULL) { + err = icache.get(key, idata, next_idata); + } else { + err = icache.get(key, idata); + } + icache_lock.unlock(); + + if (err == 0) { + //if (verbose) cout << "CACHE SUCCESS" << std::endl; + return err; + } else { + if (verbose) cout << "NOT IN CACHE" << std::endl; + } + } + + if (verbose) cout << "\t" << client_name + << "-read_index: getting index_data for " << key + << " from object" << std::endl; + librados::ObjectReadOperation oro; + bufferlist raw_val; + std::set<std::string> key_set; + key_set.insert(key_data(key).encoded()); + std::map<std::string, bufferlist> kvmap; + std::map<std::string, bufferlist> dupmap; + oro.omap_get_vals_by_keys(key_set, &dupmap, &err); + oro.omap_get_vals2(key_data(key).encoded(), + (cache_size / cache_refresh >= 2? cache_size / cache_refresh: 2), + &kvmap, nullptr, &err); + err = io_ctx.operate(index_name, &oro, NULL); + utime_t mytime = ceph_clock_now(); + if (err < 0){ + cerr << "\t" << client_name + << "-read_index: getting keys failed with " + << err << std::endl; + ceph_abort_msg(client_name + "-read_index: reading index failed"); + return err; + } + kvmap.insert(dupmap.begin(), dupmap.end()); + for (map<string, bufferlist>::iterator it = ++kvmap.begin(); + it != kvmap.end(); + ++it) { + bufferlist bl = it->second; + auto blit = bl.cbegin(); + index_data this_idata; + this_idata.decode(blit); + if (this_idata.is_timed_out(mytime, timeout)) { + if (verbose) cout << client_name + << " THINKS THE OTHER CLIENT DIED. (mytime is " + << mytime.sec() << "." << mytime.usec() << ", idata.ts is " + << this_idata.ts.sec() << "." << this_idata.ts.usec() + << ", it has been " << (mytime - this_idata.ts).sec() + << '.' << (mytime - this_idata.ts).usec() + << ", timeout is " << timeout << ")" << std::endl; + //the client died after deleting the object. clean up. + if (cleanup(this_idata, -EPREFIX) == -ESUICIDE) { + return -ESUICIDE; + } + return read_index(key, idata, next_idata, force_update); + } + std::scoped_lock l{icache_lock}; + icache.push(this_idata); + } + auto b = kvmap.begin()->second.cbegin(); + idata->decode(b); + idata->kdata.parse(kvmap.begin()->first); + if (verbose) cout << "\t" << client_name << "-read_index: kvmap_size is " + << kvmap.size() + << ", idata is " << idata->str() << std::endl; + + ceph_assert(idata->obj != ""); + icache_lock.lock(); + icache.push(key, *idata); + icache_lock.unlock(); + + if (next_idata != NULL && idata->kdata.prefix != "1") { + next_idata->kdata.parse((++kvmap.begin())->first); + auto nb = (++kvmap.begin())->second.cbegin(); + next_idata->decode(nb); + std::scoped_lock l{icache_lock}; + icache.push(*next_idata); + } + return err; +} + +int KvFlatBtreeAsync::split(const index_data &idata) { + int err = 0; + opmap['l']++; + + if (idata.prefix != "") { + return -EPREFIX; + } + + rebalance_args args; + args.bound = 2 * k - 1; + args.comparator = CEPH_OSD_CMPXATTR_OP_GT; + err = read_object(idata.obj, &args); + args.odata.max_kdata = idata.kdata; + if (err < 0) { + if (verbose) cout << "\t\t" << client_name << "-split: read object " + << args.odata.name + << " got " << err << std::endl; + return err; + } + + if (verbose) cout << "\t\t" << client_name << "-split: splitting " + << idata.obj + << ", which has size " << args.odata.size + << " and actual size " << args.odata.omap.size() << std::endl; + + ///////preparations that happen outside the critical section + //for prefix index + vector<object_data> to_create; + vector<object_data> to_delete; + to_delete.push_back(object_data(idata.min_kdata, + args.odata.max_kdata, args.odata.name, args.odata.version)); + + //for lower half object + std::map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin(); + client_index_lock.lock(); + to_create.push_back(object_data(to_string(client_name, client_index++))); + client_index_lock.unlock(); + for (int i = 0; i < k; i++) { + to_create[0].omap.insert(*it); + ++it; + } + to_create[0].min_kdata = idata.min_kdata; + to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first); + + //for upper half object + client_index_lock.lock(); + to_create.push_back(object_data(to_create[0].max_kdata, + args.odata.max_kdata, + to_string(client_name, client_index++))); + client_index_lock.unlock(); + to_create[1].omap.insert( + ++args.odata.omap.find(to_create[0].omap.rbegin()->first), + args.odata.omap.end()); + + //setting up operations + librados::ObjectWriteOperation owos[6]; + vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops; + index_data out_data; + set_up_prefix_index(to_create, to_delete, &owos[0], &out_data, &err); + ops.push_back(std::make_pair( + std::pair<int, string>(ADD_PREFIX, index_name), + &owos[0])); + for (int i = 1; i < 6; i++) { + ops.push_back(std::make_pair(std::make_pair(0,""), &owos[i])); + } + set_up_ops(to_create, to_delete, &ops, out_data, &err); + + /////BEGIN CRITICAL SECTION///// + //put prefix on index entry for idata.val + err = perform_ops("\t\t" + client_name + "-split:", out_data, &ops); + if (err < 0) { + return err; + } + if (verbose) cout << "\t\t" << client_name << "-split: done splitting." + << std::endl; + /////END CRITICAL SECTION///// + icache_lock.lock(); + for (vector<delete_data>::iterator it = out_data.to_delete.begin(); + it != out_data.to_delete.end(); ++it) { + icache.erase(it->max); + } + for (vector<create_data>::iterator it = out_data.to_create.begin(); + it != out_data.to_create.end(); ++it) { + icache.push(index_data(*it)); + } + icache_lock.unlock(); + return err; +} + +int KvFlatBtreeAsync::rebalance(const index_data &idata1, + const index_data &next_idata){ + opmap['m']++; + int err = 0; + + if (idata1.prefix != "") { + return -EPREFIX; + } + + rebalance_args args1; + args1.bound = k + 1; + args1.comparator = CEPH_OSD_CMPXATTR_OP_LT; + index_data idata2 = next_idata; + + rebalance_args args2; + args2.bound = k + 1; + args2.comparator = CEPH_OSD_CMPXATTR_OP_LT; + + if (idata1.kdata.prefix == "1") { + //this is the highest key in the index, so it doesn't have a next. + + //read the index for the previous entry + err = prev(idata1, &idata2); + if (err == -ERANGE) { + if (verbose) cout << "\t\t" << client_name + << "-rebalance: this is the only node, " + << "so aborting" << std::endl; + return -EUCLEAN; + } else if (err < 0) { + return err; + } + + //read the first object + err = read_object(idata1.obj, &args2); + if (err < 0) { + if (verbose) cout << "reading " << idata1.obj << " failed with " << err + << std::endl; + if (err == -ENOENT) { + return -ECANCELED; + } + return err; + } + args2.odata.min_kdata = idata1.min_kdata; + args2.odata.max_kdata = idata1.kdata; + + //read the second object + args1.bound = 2 * k + 1; + err = read_object(idata2.obj, &args1); + if (err < 0) { + if (verbose) cout << "reading " << idata1.obj << " failed with " << err + << std::endl; + return err; + } + args1.odata.min_kdata = idata2.min_kdata; + args1.odata.max_kdata = idata2.kdata; + + if (verbose) cout << "\t\t" << client_name << "-rebalance: read " + << idata2.obj + << ". size: " << args1.odata.size << " version: " + << args1.odata.version + << std::endl; + } else { + assert (next_idata.obj != ""); + //there is a next key, so get it. + err = read_object(idata1.obj, &args1); + if (err < 0) { + if (verbose) cout << "reading " << idata1.obj << " failed with " << err + << std::endl; + return err; + } + args1.odata.min_kdata = idata1.min_kdata; + args1.odata.max_kdata = idata1.kdata; + + args2.bound = 2 * k + 1; + err = read_object(idata2.obj, &args2); + if (err < 0) { + if (verbose) cout << "reading " << idata1.obj << " failed with " << err + << std::endl; + if (err == -ENOENT) { + return -ECANCELED; + } + return err; + } + args2.odata.min_kdata = idata2.min_kdata; + args2.odata.max_kdata = idata2.kdata; + + if (verbose) cout << "\t\t" << client_name << "-rebalance: read " + << idata2.obj + << ". size: " << args2.odata.size << " version: " + << args2.odata.version + << std::endl; + } + + if (verbose) cout << "\t\t" << client_name << "-rebalance: o1 is " + << args1.odata.max_kdata.encoded() << "," + << args1.odata.name << " with size " << args1.odata.size + << " , o2 is " << args2.odata.max_kdata.encoded() + << "," << args2.odata.name << " with size " << args2.odata.size + << std::endl; + + //calculations + if ((int)args1.odata.size > k && (int)args1.odata.size <= 2*k + && (int)args2.odata.size > k + && (int)args2.odata.size <= 2*k) { + //nothing to do + if (verbose) cout << "\t\t" << client_name + << "-rebalance: both sizes in range, so" + << " aborting " << std::endl; + return -EBALANCE; + } else if (idata1.prefix != "" || idata2.prefix != "") { + return -EPREFIX; + } + + //this is the high object. it gets created regardless of rebalance or merge. + client_index_lock.lock(); + string o2w = to_string(client_name, client_index++); + client_index_lock.unlock(); + index_data idata; + vector<object_data> to_create; + vector<object_data> to_delete; + librados::ObjectWriteOperation create[2];//possibly only 1 will be used + librados::ObjectWriteOperation other_ops[6]; + vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops; + ops.push_back(std::make_pair( + std::pair<int, string>(ADD_PREFIX, index_name), + &other_ops[0])); + + if ((int)args1.odata.size + (int)args2.odata.size <= 2*k) { + //merge + if (verbose) cout << "\t\t" << client_name << "-rebalance: merging " + << args1.odata.name + << " and " << args2.odata.name << " to get " << o2w + << std::endl; + std::map<string, bufferlist> write2_map; + write2_map.insert(args1.odata.omap.begin(), args1.odata.omap.end()); + write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end()); + to_create.push_back(object_data(args1.odata.min_kdata, + args2.odata.max_kdata, o2w, write2_map)); + ops.push_back(std::make_pair( + std::pair<int, std::string>(MAKE_OBJECT, o2w), + &create[0])); + ceph_assert((int)write2_map.size() <= 2*k); + } else { + //rebalance + if (verbose) cout << "\t\t" << client_name << "-rebalance: rebalancing " + << args1.odata.name + << " and " << args2.odata.name << std::endl; + std::map<std::string, bufferlist> write1_map; + std::map<std::string, bufferlist> write2_map; + std::map<std::string, bufferlist>::iterator it; + client_index_lock.lock(); + string o1w = to_string(client_name, client_index++); + client_index_lock.unlock(); + int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size) + / 2.0); + if (args1.odata.max_kdata != idata1.kdata) { + //this should be true if idata1 is the high object + target_size_1 = floor(((int)args1.odata.size + (int)args2.odata.size) + / 2.0); + } + for (it = args1.odata.omap.begin(); + it != args1.odata.omap.end() && (int)write1_map.size() + < target_size_1; + ++it) { + write1_map.insert(*it); + } + if (it != args1.odata.omap.end()){ + //write1_map is full, so put the rest in write2_map + write2_map.insert(it, args1.odata.omap.end()); + write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end()); + } else { + //args1.odata.omap was small, and write2_map still needs more + std::map<std::string, bufferlist>::iterator it2; + for(it2 = args2.odata.omap.begin(); + (it2 != args2.odata.omap.end()) && ((int)write1_map.size() + < target_size_1); + ++it2) { + write1_map.insert(*it2); + } + write2_map.insert(it2, args2.odata.omap.end()); + } + if (verbose) cout << "\t\t" << client_name + << "-rebalance: write1_map has size " + << write1_map.size() << ", write2_map.size() is " << write2_map.size() + << std::endl; + //at this point, write1_map and write2_map should have the correct pairs + to_create.push_back(object_data(args1.odata.min_kdata, + key_data(write1_map.rbegin()->first), + o1w,write1_map)); + to_create.push_back(object_data( key_data(write1_map.rbegin()->first), + args2.odata.max_kdata, o2w, write2_map)); + ops.push_back(std::make_pair( + std::pair<int, std::string>(MAKE_OBJECT, o1w), + &create[0])); + ops.push_back(std::make_pair( + std::pair<int, std::string>(MAKE_OBJECT, o2w), + &create[1])); + } + + to_delete.push_back(object_data(args1.odata.min_kdata, + args1.odata.max_kdata, args1.odata.name, args1.odata.version)); + to_delete.push_back(object_data(args2.odata.min_kdata, + args2.odata.max_kdata, args2.odata.name, args2.odata.version)); + for (int i = 1; i < 6; i++) { + ops.push_back(std::make_pair(std::make_pair(0,""), &other_ops[i])); + } + + index_data out_data; + set_up_prefix_index(to_create, to_delete, &other_ops[0], &out_data, &err); + set_up_ops(to_create, to_delete, &ops, out_data, &err); + + //at this point, all operations should be completely set up. + /////BEGIN CRITICAL SECTION///// + err = perform_ops("\t\t" + client_name + "-rebalance:", out_data, &ops); + if (err < 0) { + return err; + } + icache_lock.lock(); + for (vector<delete_data>::iterator it = out_data.to_delete.begin(); + it != out_data.to_delete.end(); ++it) { + icache.erase(it->max); + } + for (vector<create_data>::iterator it = out_data.to_create.begin(); + it != out_data.to_create.end(); ++it) { + icache.push(index_data(*it)); + } + icache_lock.unlock(); + if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing." + << std::endl; + /////END CRITICAL SECTION///// + return err; +} + +int KvFlatBtreeAsync::read_object(const string &obj, object_data * odata) { + librados::ObjectReadOperation get_obj; + librados::AioCompletion * obj_aioc = rados.aio_create_completion(); + int err; + bufferlist unw_bl; + odata->name = obj; + get_obj.omap_get_vals2("", LONG_MAX, &odata->omap, nullptr, &err); + get_obj.getxattr("unwritable", &unw_bl, &err); + io_ctx.aio_operate(obj, obj_aioc, &get_obj, NULL); + obj_aioc->wait_for_complete(); + err = obj_aioc->get_return_value(); + if (err < 0){ + //possibly -ENOENT, meaning someone else deleted it. + obj_aioc->release(); + return err; + } + odata->unwritable = string(unw_bl.c_str(), unw_bl.length()) == "1"; + odata->version = obj_aioc->get_version64(); + odata->size = odata->omap.size(); + obj_aioc->release(); + return 0; +} + +int KvFlatBtreeAsync::read_object(const string &obj, rebalance_args * args) { + bufferlist inbl; + args->encode(inbl); + bufferlist outbl; + int err; + librados::AioCompletion * a = rados.aio_create_completion(); + io_ctx.aio_exec(obj, a, "kvs", "maybe_read_for_balance", inbl, &outbl); + a->wait_for_complete(); + err = a->get_return_value(); + if (err < 0) { + if (verbose) cout << "\t\t" << client_name + << "-read_object: reading failed with " + << err << std::endl; + a->release(); + return err; + } + auto it = outbl.cbegin(); + args->decode(it); + args->odata.name = obj; + args->odata.version = a->get_version64(); + a->release(); + return err; +} + +void KvFlatBtreeAsync::set_up_prefix_index( + const vector<object_data> &to_create, + const vector<object_data> &to_delete, + librados::ObjectWriteOperation * owo, + index_data * idata, + int * err) { + std::map<std::string, std::pair<bufferlist, int> > assertions; + std::map<string, bufferlist> to_insert; + idata->prefix = "1"; + idata->ts = ceph_clock_now(); + for(vector<object_data>::const_iterator it = to_create.begin(); + it != to_create.end(); + ++it) { + create_data c(it->min_kdata, it->max_kdata, it->name); + idata->to_create.push_back(c); + } + for(vector<object_data>::const_iterator it = to_delete.begin(); + it != to_delete.end(); + ++it) { + delete_data d(it->min_kdata, it->max_kdata, it->name, it->version); + idata->to_delete.push_back(d); + } + for(vector<object_data>::const_iterator it = to_delete.begin(); + it != to_delete.end(); + ++it) { + idata->obj = it->name; + idata->min_kdata = it->min_kdata; + idata->kdata = it->max_kdata; + bufferlist insert; + idata->encode(insert); + to_insert[it->max_kdata.encoded()] = insert; + index_data this_entry; + this_entry.min_kdata = idata->min_kdata; + this_entry.kdata = idata->kdata; + this_entry.obj = idata->obj; + assertions[it->max_kdata.encoded()] = std::pair<bufferlist, int> + (to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ); + if (verbose) cout << "\t\t\t" << client_name + << "-setup_prefix: will assert " + << this_entry.str() << std::endl; + } + ceph_assert(*err == 0); + owo->omap_cmp(assertions, err); + if (to_create.size() <= 2) { + owo->omap_set(to_insert); + } +} + +//some args can be null if there are no corresponding entries in p +void KvFlatBtreeAsync::set_up_ops( + const vector<object_data> &create_vector, + const vector<object_data> &delete_vector, + vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > * ops, + const index_data &idata, + int * err) { + vector<std::pair<std::pair<int, string>, + librados::ObjectWriteOperation* > >::iterator it; + + //skip the prefixing part + for(it = ops->begin(); it->first.first == ADD_PREFIX; ++it) {} + std::map<string, bufferlist> to_insert; + std::set<string> to_remove; + std::map<string, std::pair<bufferlist, int> > assertions; + if (create_vector.size() > 0) { + for (int i = 0; i < (int)idata.to_delete.size(); ++i) { + it->first = std::pair<int, string>(UNWRITE_OBJECT, idata.to_delete[i].obj); + set_up_unwrite_object(delete_vector[i].version, it->second); + ++it; + } + } + for (int i = 0; i < (int)idata.to_create.size(); ++i) { + index_data this_entry(idata.to_create[i].max, idata.to_create[i].min, + idata.to_create[i].obj); + to_insert[idata.to_create[i].max.encoded()] = to_bl(this_entry); + if (idata.to_create.size() <= 2) { + it->first = std::pair<int, string>(MAKE_OBJECT, idata.to_create[i].obj); + } else { + it->first = std::pair<int, string>(AIO_MAKE_OBJECT, idata.to_create[i].obj); + } + set_up_make_object(create_vector[i].omap, it->second); + ++it; + } + for (int i = 0; i < (int)idata.to_delete.size(); ++i) { + index_data this_entry = idata; + this_entry.obj = idata.to_delete[i].obj; + this_entry.min_kdata = idata.to_delete[i].min; + this_entry.kdata = idata.to_delete[i].max; + if (verbose) cout << "\t\t\t" << client_name << "-setup_ops: will assert " + << this_entry.str() << std::endl; + assertions[idata.to_delete[i].max.encoded()] = std::pair<bufferlist, int>( + to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ); + to_remove.insert(idata.to_delete[i].max.encoded()); + it->first = std::pair<int, string>(REMOVE_OBJECT, idata.to_delete[i].obj); + set_up_delete_object(it->second); + ++it; + } + if ((int)idata.to_create.size() <= 2) { + it->second->omap_cmp(assertions, err); + } + it->second->omap_rm_keys(to_remove); + it->second->omap_set(to_insert); + + + it->first = std::pair<int, string>(REMOVE_PREFIX, index_name); +} + +void KvFlatBtreeAsync::set_up_make_object( + const std::map<std::string, bufferlist> &to_set, + librados::ObjectWriteOperation *owo) { + bufferlist inbl; + encode(to_set, inbl); + owo->exec("kvs", "create_with_omap", inbl); +} + +void KvFlatBtreeAsync::set_up_unwrite_object( + const int &ver, librados::ObjectWriteOperation *owo) { + if (ver > 0) { + owo->assert_version(ver); + } + owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("0")); + owo->setxattr("unwritable", to_bl("1")); +} + +void KvFlatBtreeAsync::set_up_restore_object( + librados::ObjectWriteOperation *owo) { + owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1")); + owo->setxattr("unwritable", to_bl("0")); +} + +void KvFlatBtreeAsync::set_up_delete_object( + librados::ObjectWriteOperation *owo) { + owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1")); + owo->remove(); +} + +int KvFlatBtreeAsync::perform_ops(const string &debug_prefix, + const index_data &idata, + vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > *ops) { + int err = 0; + vector<librados::AioCompletion*> aiocs(idata.to_create.size()); + int count = 0; + for (vector<std::pair<std::pair<int, string>, + librados::ObjectWriteOperation*> >::iterator it = ops->begin(); + it != ops->end(); ++it) { + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + return -ESUICIDE; + } + switch (it->first.first) { + case ADD_PREFIX://prefixing + if (verbose) cout << debug_prefix << " adding prefix" << std::endl; + err = io_ctx.operate(index_name, it->second); + if (err < 0) { + if (verbose) cout << debug_prefix << " prefixing the index failed with " + << err << std::endl; + return -EPREFIX; + } + if (verbose) cout << debug_prefix << " prefix added." << std::endl; + break; + case UNWRITE_OBJECT://marking + if (verbose) cout << debug_prefix << " marking " << it->first.second + << std::endl; + err = io_ctx.operate(it->first.second, it->second); + if (err < 0) { + //most likely because it changed, in which case it will be -ERANGE + if (verbose) cout << debug_prefix << " marking " << it->first.second + << "failed with code" << err << std::endl; + if (it->first.second == (*idata.to_delete.begin()).max.encoded()) { + if (cleanup(idata, -EFIRSTOBJ) == -ESUICIDE) { + return -ESUICIDE; + } + } else { + if (cleanup(idata, -ERANGE) == -ESUICIDE) { + return -ESUICIDE; + } + } + return err; + } + if (verbose) cout << debug_prefix << " marked " << it->first.second + << std::endl; + break; + case MAKE_OBJECT://creating + if (verbose) cout << debug_prefix << " creating " << it->first.second + << std::endl; + err = io_ctx.operate(it->first.second, it->second); + if (err < 0) { + //this can happen if someone else was cleaning up after us. + if (verbose) cout << debug_prefix << " creating " << it->first.second + << " failed" + << " with code " << err << std::endl; + if (err == -EEXIST) { + //someone thinks we died, so die + if (verbose) cout << client_name << " is suiciding!" << std::endl; + return -ESUICIDE; + } else { + ceph_abort(); + } + return err; + } + if (verbose || idata.to_create.size() > 2) { + cout << debug_prefix << " created object " << it->first.second + << std::endl; + } + break; + case AIO_MAKE_OBJECT: + cout << debug_prefix << " launching asynchronous create " + << it->first.second << std::endl; + aiocs[count] = rados.aio_create_completion(); + io_ctx.aio_operate(it->first.second, aiocs[count], it->second); + count++; + if ((int)idata.to_create.size() == count) { + cout << "starting aiowrite waiting loop" << std::endl; + for (count -= 1; count >= 0; count--) { + aiocs[count]->wait_for_complete(); + err = aiocs[count]->get_return_value(); + if (err < 0) { + //this can happen if someone else was cleaning up after us. + cerr << debug_prefix << " a create failed" + << " with code " << err << std::endl; + if (err == -EEXIST) { + //someone thinks we died, so die + cerr << client_name << " is suiciding!" << std::endl; + return -ESUICIDE; + } else { + ceph_abort(); + } + return err; + } + if (verbose || idata.to_create.size() > 2) { + cout << debug_prefix << " completed aio " << aiocs.size() - count + << "/" << aiocs.size() << std::endl; + } + } + } + break; + case REMOVE_OBJECT://deleting + if (verbose) cout << debug_prefix << " deleting " << it->first.second + << std::endl; + err = io_ctx.operate(it->first.second, it->second); + if (err < 0) { + //if someone else called cleanup on this prefix first + if (verbose) cout << debug_prefix << " deleting " << it->first.second + << "failed with code" << err << std::endl; + } + if (verbose) cout << debug_prefix << " deleted " << it->first.second + << std::endl; + break; + case REMOVE_PREFIX://rewriting index + if (verbose) cout << debug_prefix << " updating index " << std::endl; + err = io_ctx.operate(index_name, it->second); + if (err < 0) { + if (verbose) cout << debug_prefix + << " rewriting the index failed with code " << err + << ". someone else must have thought we died, so dying" << std::endl; + return -ETIMEDOUT; + } + if (verbose) cout << debug_prefix << " updated index." << std::endl; + break; + case RESTORE_OBJECT: + if (verbose) cout << debug_prefix << " restoring " << it->first.second + << std::endl; + err = io_ctx.operate(it->first.second, it->second); + if (err < 0) { + if (verbose) cout << debug_prefix << "restoring " << it->first.second + << " failed" + << " with " << err << std::endl; + return err; + } + if (verbose) cout << debug_prefix << " restored " << it->first.second + << std::endl; + break; + default: + if (verbose) cout << debug_prefix << " performing unknown op on " + << it->first.second + << std::endl; + err = io_ctx.operate(index_name, it->second); + if (err < 0) { + if (verbose) cout << debug_prefix << " unknown op on " + << it->first.second + << " failed with " << err << std::endl; + return err; + } + if (verbose) cout << debug_prefix << " unknown op on " + << it->first.second + << " succeeded." << std::endl; + break; + } + } + + return err; +} + +int KvFlatBtreeAsync::cleanup(const index_data &idata, const int &error) { + if (verbose) cout << "\t\t" << client_name << ": cleaning up after " + << idata.str() + << std::endl; + int err = 0; + ceph_assert(idata.prefix != ""); + std::map<std::string,bufferlist> new_index; + std::map<std::string, std::pair<bufferlist, int> > assertions; + switch (error) { + case -EFIRSTOBJ: { + //this happens if the split or rebalance failed to mark the first object, + //meaning only the index needs to be changed. + //restore objects that had been marked unwritable. + for(vector<delete_data >::const_iterator it = + idata.to_delete.begin(); + it != idata.to_delete.end(); ++it) { + index_data this_entry; + this_entry.obj = (*it).obj; + this_entry.min_kdata = it->min; + this_entry.kdata = it->max; + new_index[it->max.encoded()] = to_bl(this_entry); + this_entry = idata; + this_entry.obj = it->obj; + this_entry.min_kdata = it->min; + this_entry.kdata = it->max; + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: will assert index contains " + << this_entry.str() << std::endl; + assertions[it->max.encoded()] = + std::pair<bufferlist, int>(to_bl(this_entry), + CEPH_OSD_CMPXATTR_OP_EQ); + } + + //update the index + librados::ObjectWriteOperation update_index; + update_index.omap_cmp(assertions, &err); + update_index.omap_set(new_index); + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index" + << std::endl; + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + return -ESUICIDE; + } + err = io_ctx.operate(index_name, &update_index); + if (err < 0) { + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: rewriting failed with " + << err << ". returning -ECANCELED" << std::endl; + return -ECANCELED; + } + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: updated index. cleanup done." + << std::endl; + break; + } + case -ERANGE: { + //this happens if a split or rebalance fails to mark an object. It is a + //special case of rolling back that does not have to deal with new objects. + + //restore objects that had been marked unwritable. + vector<delete_data >::const_iterator it; + for(it = idata.to_delete.begin(); + it != idata.to_delete.end(); ++it) { + index_data this_entry; + this_entry.obj = (*it).obj; + this_entry.min_kdata = it->min; + this_entry.kdata = it->max; + new_index[it->max.encoded()] = to_bl(this_entry); + this_entry = idata; + this_entry.obj = it->obj; + this_entry.min_kdata = it->min; + this_entry.kdata = it->max; + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: will assert index contains " + << this_entry.str() << std::endl; + assertions[it->max.encoded()] = + std::pair<bufferlist, int>(to_bl(this_entry), + CEPH_OSD_CMPXATTR_OP_EQ); + } + it = idata.to_delete.begin(); + librados::ObjectWriteOperation restore; + set_up_restore_object(&restore); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + return -ESUICIDE; + } + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring " + << it->obj + << std::endl; + err = io_ctx.operate(it->obj, &restore); + if (err < 0) { + //i.e., -ECANCELED because the object was already restored by someone + //else + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring " + << it->obj + << " failed with " << err << std::endl; + } else { + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored " + << it->obj + << std::endl; + } + + //update the index + librados::ObjectWriteOperation update_index; + update_index.omap_cmp(assertions, &err); + update_index.omap_set(new_index); + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index" + << std::endl; + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + return -ESUICIDE; + } + err = io_ctx.operate(index_name, &update_index); + if (err < 0) { + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: rewriting failed with " + << err << ". returning -ECANCELED" << std::endl; + return -ECANCELED; + } + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: updated index. cleanup done." + << std::endl; + break; + } + case -ENOENT: { + if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling forward" + << std::endl; + //all changes were created except for updating the index and possibly + //deleting the objects. roll forward. + vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops; + vector<librados::ObjectWriteOperation> owos(idata.to_delete.size() + 1); + for (int i = 0; i <= (int)idata.to_delete.size(); ++i) { + ops.push_back(std::make_pair(std::pair<int, std::string>(0, ""), &owos[i])); + } + set_up_ops(vector<object_data>(), + vector<object_data>(), &ops, idata, &err); + err = perform_ops("\t\t" + client_name + "-cleanup:", idata, &ops); + if (err < 0) { + if (err == -ESUICIDE) { + return -ESUICIDE; + } + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: rewriting failed with " + << err << ". returning -ECANCELED" << std::endl; + return -ECANCELED; + } + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updated index" + << std::endl; + break; + } + default: { + //roll back all changes. + if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling back" + << std::endl; + std::map<std::string,bufferlist> new_index; + std::set<std::string> to_remove; + std::map<std::string, std::pair<bufferlist, int> > assertions; + + //mark the objects to be created. if someone else already has, die. + for(vector<create_data >::const_reverse_iterator it = + idata.to_create.rbegin(); + it != idata.to_create.rend(); ++it) { + librados::ObjectWriteOperation rm; + set_up_unwrite_object(0, &rm); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) + { + return -ESUICIDE; + } + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking " + << it->obj + << std::endl; + err = io_ctx.operate(it->obj, &rm); + if (err < 0) { + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking " + << it->obj + << " failed with " << err << std::endl; + } else { + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marked " + << it->obj + << std::endl; + } + } + + //restore objects that had been marked unwritable. + for(vector<delete_data >::const_iterator it = + idata.to_delete.begin(); + it != idata.to_delete.end(); ++it) { + index_data this_entry; + this_entry.obj = (*it).obj; + this_entry.min_kdata = it->min; + this_entry.kdata = it->max; + new_index[it->max.encoded()] = to_bl(this_entry); + this_entry = idata; + this_entry.obj = it->obj; + this_entry.min_kdata = it->min; + this_entry.kdata = it->max; + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: will assert index contains " + << this_entry.str() << std::endl; + assertions[it->max.encoded()] = + std::pair<bufferlist, int>(to_bl(this_entry), + CEPH_OSD_CMPXATTR_OP_EQ); + librados::ObjectWriteOperation restore; + set_up_restore_object(&restore); + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: will assert index contains " + << this_entry.str() << std::endl; + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) + { + return -ESUICIDE; + } + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring " + << it->obj + << std::endl; + err = io_ctx.operate(it->obj, &restore); + if (err == -ENOENT) { + //it had gotten far enough to be rolled forward - unmark the objects + //and roll forward. + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: roll forward instead" + << std::endl; + for(vector<create_data >::const_iterator cit = + idata.to_create.begin(); + cit != idata.to_create.end(); ++cit) { + librados::ObjectWriteOperation res; + set_up_restore_object(&res); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() + == 1 ) { + return -ECANCELED; + } + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: restoring " << cit->obj + << std::endl; + err = io_ctx.operate(cit->obj, &res); + if (err < 0) { + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: restoring " + << cit->obj << " failed with " << err << std::endl; + } + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored " + << cit->obj + << std::endl; + } + return cleanup(idata, -ENOENT); + } else if (err < 0) { + //i.e., -ECANCELED because the object was already restored by someone + //else + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: restoring " << it->obj + << " failed with " << err << std::endl; + } else { + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored " + << it->obj + << std::endl; + } + } + + //remove the new objects + for(vector<create_data >::const_reverse_iterator it = + idata.to_create.rbegin(); + it != idata.to_create.rend(); ++it) { + to_remove.insert(it->max.encoded()); + librados::ObjectWriteOperation rm; + rm.remove(); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) + { + return -ESUICIDE; + } + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removing " + << it->obj + << std::endl; + err = io_ctx.operate(it->obj, &rm); + if (err < 0) { + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: failed to remove " + << it->obj << std::endl; + } else { + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removed " + << it->obj + << std::endl; + } + } + + //update the index + librados::ObjectWriteOperation update_index; + update_index.omap_cmp(assertions, &err); + update_index.omap_rm_keys(to_remove); + update_index.omap_set(new_index); + if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index" + << std::endl; + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + return -ESUICIDE; + } + err = io_ctx.operate(index_name, &update_index); + if (err < 0) { + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: rewriting failed with " + << err << ". returning -ECANCELED" << std::endl; + return -ECANCELED; + } + if (verbose) cout << "\t\t\t" << client_name + << "-cleanup: updated index. cleanup done." + << std::endl; + break; + } + } + return err; +} + +string KvFlatBtreeAsync::to_string(string s, int i) { + stringstream ret; + ret << s << i; + return ret.str(); +} + +string KvFlatBtreeAsync::get_name() { + return rados_id; +} + +void KvFlatBtreeAsync::set_inject(injection_t inject, int wait_time) { + interrupt = inject; + wait_ms = wait_time; +} + +int KvFlatBtreeAsync::setup(int argc, const char** argv) { + int r = rados.init(rados_id.c_str()); + if (r < 0) { + cerr << "error during init" << r << std::endl; + return r; + } + r = rados.conf_parse_argv(argc, argv); + if (r < 0) { + cerr << "error during parsing args" << r << std::endl; + return r; + } + r = rados.conf_parse_env(NULL); + if (r < 0) { + cerr << "error during parsing env" << r << std::endl; + return r; + } + r = rados.conf_read_file(NULL); + if (r < 0) { + cerr << "error during read file: " << r << std::endl; + return r; + } + r = rados.connect(); + if (r < 0) { + cerr << "error during connect: " << r << std::endl; + return r; + } + r = rados.ioctx_create(pool_name.c_str(), io_ctx); + if (r < 0) { + cerr << "error creating io ctx: " << r << std::endl; + rados.shutdown(); + return r; + } + + librados::ObjectWriteOperation make_index; + make_index.create(true); + std::map<std::string,bufferlist> index_map; + index_data idata; + idata.obj = client_name; + idata.min_kdata.raw_key = ""; + idata.kdata = key_data(""); + index_map["1"] = to_bl(idata); + make_index.omap_set(index_map); + r = io_ctx.operate(index_name, &make_index); + if (r < 0) { + if (verbose) cout << client_name << ": Making the index failed with code " + << r + << std::endl; + return 0; + } + if (verbose) cout << client_name << ": created index object" << std::endl; + + librados::ObjectWriteOperation make_max_obj; + make_max_obj.create(true); + make_max_obj.setxattr("unwritable", to_bl("0")); + make_max_obj.setxattr("size", to_bl("0")); + r = io_ctx.operate(client_name, &make_max_obj); + if (r < 0) { + if (verbose) cout << client_name << ": Setting xattr failed with code " + << r + << std::endl; + } + + return 0; +} + +int KvFlatBtreeAsync::set(const string &key, const bufferlist &val, + bool update_on_existing) { + if (verbose) cout << client_name << " is " + << (update_on_existing? "updating " : "setting ") + << key << std::endl; + int err = 0; + utime_t mytime; + index_data idata(key); + + if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl; + err = read_index(key, &idata, NULL, false); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + if (verbose) cout << "\t" << client_name << ": index data is " << idata.str() + << ", object is " << idata.obj << std::endl; + + err = set_op(key, val, update_on_existing, idata); + + if (verbose) cout << "\t" << client_name << ": finished set with " << err + << std::endl; + return err; +} + +int KvFlatBtreeAsync::set_op(const string &key, const bufferlist &val, + bool update_on_existing, index_data &idata) { + //write + + bufferlist inbl; + omap_set_args args; + args.bound = 2 * k; + args.exclusive = !update_on_existing; + args.omap[key] = val; + args.encode(inbl); + + librados::ObjectWriteOperation owo; + owo.exec("kvs", "omap_insert", inbl); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + if (verbose) cout << client_name << " IS SUICIDING!" << std::endl; + return -ESUICIDE; + } + if (verbose) cout << "\t" << client_name << ": inserting " << key + << " into object " + << idata.obj << std::endl; + int err = io_ctx.operate(idata.obj, &owo); + if (err < 0) { + switch (err) { + case -EEXIST: { + //the key already exists and this is an exclusive insert. + cerr << "\t" << client_name << ": writing key failed with " + << err << std::endl; + return err; + } + case -EKEYREJECTED: { + //the object needs to be split. + do { + if (verbose) cout << "\t" << client_name << ": running split on " + << idata.obj + << std::endl; + err = read_index(key, &idata, NULL, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + err = split(idata); + if (err < 0 && err != -ENOENT && err != -EBALANCE) { + if (verbose) cerr << "\t" << client_name << ": split failed with " + << err << std::endl; + int ret = handle_set_rm_errors(err, idata.obj, key, &idata, NULL); + switch (ret) { + case -ESUICIDE: + if (verbose) cout << client_name << " IS SUICIDING!" << std::endl; + return ret; + case 1: + return set_op(key, val, update_on_existing, idata); + case 2: + return err; + } + } + } while (err < 0 && err != -EBALANCE && err != -ENOENT); + err = read_index(key, &idata, NULL, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + return set_op(key, val, update_on_existing, idata); + } + default: + if (verbose) cerr << "\t" << client_name << ": writing obj failed with " + << err << std::endl; + if (err == -ENOENT || err == -EACCES) { + if (err == -ENOENT) { + if (verbose) cout << "CACHE FAILURE" << std::endl; + } + err = read_index(key, &idata, NULL, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + if (verbose) cout << "\t" << client_name << ": index data is " + << idata.str() + << ", object is " << idata.obj << std::endl; + return set_op(key, val, update_on_existing, idata); + } else { + return err; + } + } + } + return 0; +} + +int KvFlatBtreeAsync::remove(const string &key) { + if (verbose) cout << client_name << ": removing " << key << std::endl; + int err = 0; + string obj; + utime_t mytime; + index_data idata; + index_data next_idata; + + if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl; + err = read_index(key, &idata, &next_idata, false); + if (err < 0) { + if (verbose) cout << "getting oid failed with code " << err << std::endl; + return err; + } + obj = idata.obj; + if (verbose) cout << "\t" << client_name << ": idata is " << idata.str() + << ", next_idata is " << next_idata.str() + << ", obj is " << obj << std::endl; + + err = remove_op(key, idata, next_idata); + + if (verbose) cout << "\t" << client_name << ": finished remove with " << err + << " and exiting" << std::endl; + return err; +} + +int KvFlatBtreeAsync::remove_op(const string &key, index_data &idata, + index_data &next_idata) { + //write + bufferlist inbl; + omap_rm_args args; + args.bound = k; + args.omap.insert(key); + args.encode(inbl); + + librados::ObjectWriteOperation owo; + owo.exec("kvs", "omap_remove", inbl); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + if (verbose) cout << client_name << " IS SUICIDING!" << std::endl; + return -ESUICIDE; + } + if (verbose) cout << "\t" << client_name << ": removing " << key << " from " + << idata.obj + << std::endl; + int err = io_ctx.operate(idata.obj, &owo); + if (err < 0) { + if (verbose) cout << "\t" << client_name << ": writing obj failed with " + << err << std::endl; + switch (err) { + case -ENODATA: { + //the key does not exist in the object + return err; + } + case -EKEYREJECTED: { + //the object needs to be split. + do { + if (verbose) cerr << "\t" << client_name << ": running rebalance on " + << idata.obj << std::endl; + err = read_index(key, &idata, &next_idata, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + err = rebalance(idata, next_idata); + if (err < 0 && err != -ENOENT && err != -EBALANCE) { + if (verbose) cerr << "\t" << client_name << ": rebalance returned " + << err << std::endl; + int ret = handle_set_rm_errors(err, idata.obj, key, &idata, + &next_idata); + switch (ret) { + case -ESUICIDE: + if (verbose) cout << client_name << " IS SUICIDING!" << std::endl; + return err; + case 1: + return remove_op(key, idata, next_idata); + case 2: + return err; + break; + case -EUCLEAN: + //this is the only node, so it's ok to go below k. + librados::ObjectWriteOperation owo; + bufferlist inbl; + omap_rm_args args; + args.bound = 0; + args.omap.insert(key); + args.encode(inbl); + owo.exec("kvs", "omap_remove", inbl); + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() + == 1 ) { + if (verbose) cout << client_name << " IS SUICIDING!" + << std::endl; + return -ESUICIDE; + } + if (verbose) cout << "\t" << client_name << ": removing " << key + << " from " + << idata.obj + << std::endl; + int err = io_ctx.operate(idata.obj, &owo); + if (err == 0) { + return 0; + } + } + } + } while (err < 0 && err != -EBALANCE && err != -ENOENT); + err = read_index(key, &idata, &next_idata, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + return remove(key); + } + default: + if (err == -ENOENT || err == -EACCES) { + err = read_index(key, &idata, &next_idata, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + if (verbose) cout << "\t" << client_name << ": index data is " + << idata.str() + << ", object is " << idata.obj << std::endl; + //idea: we read the time every time we read the index anyway - store it. + return remove_op(key, idata, next_idata); + } else { + return err; + } + } + } + return 0; +} + +int KvFlatBtreeAsync::handle_set_rm_errors(int &err, string obj, + string key, + index_data * idata, index_data * next_idata) { + if (err == -ESUICIDE) { + return err; + } else if (err == -ECANCELED //if an object was unwritable or index changed + || err == -EPREFIX //if there is currently a prefix + || err == -ETIMEDOUT// if the index changes during the op - i.e. cleanup + || err == -EACCES) //possible if we were acting on old index data + { + err = read_index(key, idata, next_idata, true); + if (err < 0) { + return err; + } + if (verbose) cout << "\t" << client_name << ": prefix is " << idata->str() + << std::endl; + if (idata->obj != obj) { + //someone else has split or cleaned up or something. start over. + return 1;//meaning repeat + } + } else if (err != -ETIMEDOUT && err != -ERANGE && err != -EACCES + && err != -EUCLEAN){ + if (verbose) cout << "\t" << client_name + << ": split encountered an unexpected error: " << err + << std::endl; + return 2; + } + return err; +} + +int KvFlatBtreeAsync::get(const string &key, bufferlist *val) { + opmap['g']++; + if (verbose) cout << client_name << ": getting " << key << std::endl; + int err = 0; + index_data idata; + utime_t mytime; + + if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) { + return -ESUICIDE; + } + err = read_index(key, &idata, NULL, false); + mytime = ceph_clock_now(); + if (err < 0) { + if (verbose) cout << "getting oid failed with code " << err << std::endl; + return err; + } + + err = get_op(key, val, idata); + + if (verbose) cout << client_name << ": got " << key << " with " << err + << std::endl; + + return err; +} + +int KvFlatBtreeAsync::get_op(const string &key, bufferlist *val, + index_data &idata) { + int err = 0; + std::set<std::string> key_set; + key_set.insert(key); + std::map<std::string,bufferlist> omap; + librados::ObjectReadOperation read; + read.omap_get_vals_by_keys(key_set, &omap, &err); + err = io_ctx.operate(idata.obj, &read, NULL); + if (err < 0) { + if (err == -ENOENT) { + err = read_index(key, &idata, NULL, true); + if (err < 0) { + if (verbose) cout << "\t" << client_name + << ": getting oid failed with code " + << err << std::endl; + return err; + } + if (verbose) cout << "\t" << client_name << ": index data is " + << idata.str() + << ", object is " << idata.obj << std::endl; + return get_op(key, val, idata); + } else { + if (verbose) cout << client_name + << ": get encountered an unexpected error: " << err + << std::endl; + return err; + } + } + + *val = omap[key]; + return err; +} + +void *KvFlatBtreeAsync::pset(void *ptr) { + struct aio_set_args *args = (struct aio_set_args *)ptr; + *args->err = + args->kvba->KvFlatBtreeAsync::set((string)args->key, + (bufferlist)args->val, (bool)args->exc); + args->cb(args->err, args->cb_args); + delete args; + return NULL; +} + +void KvFlatBtreeAsync::aio_set(const string &key, const bufferlist &val, + bool exclusive, callback cb, void * cb_args, int * err) { + aio_set_args *args = new aio_set_args(); + args->kvba = this; + args->key = key; + args->val = val; + args->exc = exclusive; + args->cb = cb; + args->cb_args = cb_args; + args->err = err; + pthread_t t; + int r = pthread_create(&t, NULL, pset, (void*)args); + if (r < 0) { + *args->err = r; + return; + } + pthread_detach(t); +} + +void *KvFlatBtreeAsync::prm(void *ptr) { + struct aio_rm_args *args = (struct aio_rm_args *)ptr; + *args->err = + args->kvba->KvFlatBtreeAsync::remove((string)args->key); + args->cb(args->err, args->cb_args); + delete args; + return NULL; +} + +void KvFlatBtreeAsync::aio_remove(const string &key, + callback cb, void * cb_args, int * err) { + aio_rm_args * args = new aio_rm_args(); + args->kvba = this; + args->key = key; + args->cb = cb; + args->cb_args = cb_args; + args->err = err; + pthread_t t; + int r = pthread_create(&t, NULL, prm, (void*)args); + if (r < 0) { + *args->err = r; + return; + } + pthread_detach(t); +} + +void *KvFlatBtreeAsync::pget(void *ptr) { + struct aio_get_args *args = (struct aio_get_args *)ptr; + *args->err = + args->kvba->KvFlatBtreeAsync::get((string)args->key, + (bufferlist *)args->val); + args->cb(args->err, args->cb_args); + delete args; + return NULL; +} + +void KvFlatBtreeAsync::aio_get(const string &key, bufferlist *val, + callback cb, void * cb_args, int * err) { + aio_get_args * args = new aio_get_args(); + args->kvba = this; + args->key = key; + args->val = val; + args->cb = cb; + args->cb_args = cb_args; + args->err = err; + pthread_t t; + int r = pthread_create(&t, NULL, pget, (void*)args); + if (r < 0) { + *args->err = r; + return; + } + pthread_detach(t); +} + +int KvFlatBtreeAsync::set_many(const std::map<string, bufferlist> &in_map) { + int err = 0; + bufferlist inbl; + bufferlist outbl; + std::set<string> keys; + + std::map<string, bufferlist> big_map; + for (map<string, bufferlist>::const_iterator it = in_map.begin(); + it != in_map.end(); ++it) { + keys.insert(it->first); + big_map.insert(*it); + } + + if (verbose) cout << "created key set and big_map" << std::endl; + + encode(keys, inbl); + librados::AioCompletion * aioc = rados.aio_create_completion(); + io_ctx.aio_exec(index_name, aioc, "kvs", "read_many", inbl, &outbl); + aioc->wait_for_complete(); + err = aioc->get_return_value(); + aioc->release(); + if (err < 0) { + cerr << "getting index failed with " << err << std::endl; + return err; + } + + std::map<string, bufferlist> imap;//read from the index + auto blit = outbl.cbegin(); + decode(imap, blit); + + if (verbose) cout << "finished reading index for objects. there are " + << imap.size() << " entries that need to be changed. " << std::endl; + + + vector<object_data> to_delete; + + vector<object_data> to_create; + + if (verbose) cout << "setting up to_delete and to_create vectors from index " + << "map" << std::endl; + //set up to_delete from index map + for (map<string, bufferlist>::iterator it = imap.begin(); it != imap.end(); + ++it){ + index_data idata; + blit = it->second.begin(); + idata.decode(blit); + to_delete.push_back(object_data(idata.min_kdata, idata.kdata, idata.obj)); + err = read_object(idata.obj, &to_delete[to_delete.size() - 1]); + if (err < 0) { + if (verbose) cout << "reading " << idata.obj << " failed with " << err + << std::endl; + return set_many(in_map); + } + + big_map.insert(to_delete[to_delete.size() - 1].omap.begin(), + to_delete[to_delete.size() - 1].omap.end()); + } + + to_create.push_back(object_data( + to_string(client_name, client_index++))); + to_create[0].min_kdata = to_delete[0].min_kdata; + + for(map<string, bufferlist>::iterator it = big_map.begin(); + it != big_map.end(); ++it) { + if (to_create[to_create.size() - 1].omap.size() == 1.5 * k) { + to_create[to_create.size() - 1].max_kdata = + key_data(to_create[to_create.size() - 1] + .omap.rbegin()->first); + + to_create.push_back(object_data( + to_string(client_name, client_index++))); + to_create[to_create.size() - 1].min_kdata = + to_create[to_create.size() - 2].max_kdata; + } + + to_create[to_create.size() - 1].omap.insert(*it); + } + to_create[to_create.size() - 1].max_kdata = + to_delete[to_delete.size() - 1].max_kdata; + + vector<librados::ObjectWriteOperation> owos(2 + 2 * to_delete.size() + + to_create.size()); + vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops; + + + index_data idata; + set_up_prefix_index(to_create, to_delete, &owos[0], &idata, &err); + + if (verbose) cout << "finished making to_create and to_delete. " + << std::endl; + + ops.push_back(std::make_pair( + std::pair<int, string>(ADD_PREFIX, index_name), + &owos[0])); + for (int i = 1; i < 2 + 2 * (int)to_delete.size() + (int)to_create.size(); + i++) { + ops.push_back(std::make_pair(std::make_pair(0,""), &owos[i])); + } + + set_up_ops(to_create, to_delete, &ops, idata, &err); + + cout << "finished setting up ops. Starting critical section..." << std::endl; + + /////BEGIN CRITICAL SECTION///// + //put prefix on index entry for idata.val + err = perform_ops("\t\t" + client_name + "-set_many:", idata, &ops); + if (err < 0) { + return set_many(in_map); + } + if (verbose) cout << "\t\t" << client_name << "-split: done splitting." + << std::endl; + /////END CRITICAL SECTION///// + std::scoped_lock l{icache_lock}; + for (vector<delete_data>::iterator it = idata.to_delete.begin(); + it != idata.to_delete.end(); ++it) { + icache.erase(it->max); + } + for (vector<create_data>::iterator it = idata.to_create.begin(); + it != idata.to_create.end(); ++it) { + icache.push(index_data(*it)); + } + return err; +} + +int KvFlatBtreeAsync::remove_all() { + if (verbose) cout << client_name << ": removing all" << std::endl; + int err = 0; + librados::ObjectReadOperation oro; + librados::AioCompletion * oro_aioc = rados.aio_create_completion(); + std::map<std::string, bufferlist> index_set; + oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err); + err = io_ctx.aio_operate(index_name, oro_aioc, &oro, NULL); + if (err < 0){ + if (err == -ENOENT) { + return 0; + } + if (verbose) cout << "getting keys failed with error " << err << std::endl; + return err; + } + oro_aioc->wait_for_complete(); + oro_aioc->release(); + + librados::ObjectWriteOperation rm_index; + librados::AioCompletion * rm_index_aioc = rados.aio_create_completion(); + std::map<std::string,bufferlist> new_index; + new_index["1"] = index_set["1"]; + rm_index.omap_clear(); + rm_index.omap_set(new_index); + io_ctx.aio_operate(index_name, rm_index_aioc, &rm_index); + err = rm_index_aioc->get_return_value(); + rm_index_aioc->release(); + if (err < 0) { + if (verbose) cout << "rm index aioc failed with " << err + << std::endl; + return err; + } + + if (!index_set.empty()) { + for (std::map<std::string,bufferlist>::iterator it = index_set.begin(); + it != index_set.end(); ++it){ + librados::ObjectWriteOperation sub; + if (it->first == "1") { + sub.omap_clear(); + } else { + sub.remove(); + } + index_data idata; + auto b = it->second.cbegin(); + idata.decode(b); + io_ctx.operate(idata.obj, &sub); + } + } + + icache.clear(); + + return 0; +} + +int KvFlatBtreeAsync::get_all_keys(std::set<std::string> *keys) { + if (verbose) cout << client_name << ": getting all keys" << std::endl; + int err = 0; + librados::ObjectReadOperation oro; + std::map<std::string,bufferlist> index_set; + oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err); + io_ctx.operate(index_name, &oro, NULL); + if (err < 0){ + if (verbose) cout << "getting keys failed with error " << err << std::endl; + return err; + } + for (std::map<std::string,bufferlist>::iterator it = index_set.begin(); + it != index_set.end(); ++it){ + librados::ObjectReadOperation sub; + std::set<std::string> ret; + sub.omap_get_keys2("",LONG_MAX,&ret, nullptr, &err); + index_data idata; + auto b = it->second.cbegin(); + idata.decode(b); + io_ctx.operate(idata.obj, &sub, NULL); + keys->insert(ret.begin(), ret.end()); + } + return err; +} + +int KvFlatBtreeAsync::get_all_keys_and_values( + std::map<std::string,bufferlist> *kv_map) { + if (verbose) cout << client_name << ": getting all keys and values" + << std::endl; + int err = 0; + librados::ObjectReadOperation first_read; + std::set<std::string> index_set; + first_read.omap_get_keys2("",LONG_MAX,&index_set, nullptr, &err); + io_ctx.operate(index_name, &first_read, NULL); + if (err < 0){ + if (verbose) cout << "getting keys failed with error " << err << std::endl; + return err; + } + for (std::set<std::string>::iterator it = index_set.begin(); + it != index_set.end(); ++it){ + librados::ObjectReadOperation sub; + std::map<std::string, bufferlist> ret; + sub.omap_get_vals2("",LONG_MAX,&ret, nullptr, &err); + io_ctx.operate(*it, &sub, NULL); + kv_map->insert(ret.begin(), ret.end()); + } + return err; +} + +bool KvFlatBtreeAsync::is_consistent() { + int err; + bool ret = true; + if (verbose) cout << client_name << ": checking consistency" << std::endl; + std::map<std::string,bufferlist> index; + std::map<std::string, std::set<std::string> > sub_objs; + librados::ObjectReadOperation oro; + oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err); + io_ctx.operate(index_name, &oro, NULL); + if (err < 0){ + //probably because the index doesn't exist - this might be ok. + for (librados::NObjectIterator oit = io_ctx.nobjects_begin(); + oit != io_ctx.nobjects_end(); ++oit) { + //if this executes, there are floating objects. + cerr << "Not consistent! found floating object " << oit->get_oid() + << std::endl; + ret = false; + } + return ret; + } + + std::map<std::string, string> parsed_index; + std::set<std::string> onames; + std::set<std::string> special_names; + for (map<std::string,bufferlist>::iterator it = index.begin(); + it != index.end(); ++it) { + if (it->first != "") { + index_data idata; + auto b = it->second.cbegin(); + idata.decode(b); + if (idata.prefix != "") { + for(vector<delete_data>::iterator dit = idata.to_delete.begin(); + dit != idata.to_delete.end(); ++dit) { + librados::ObjectReadOperation oro; + librados::AioCompletion * aioc = rados.aio_create_completion(); + bufferlist un; + oro.getxattr("unwritable", &un, &err); + io_ctx.aio_operate(dit->obj, aioc, &oro, NULL); + aioc->wait_for_complete(); + err = aioc->get_return_value(); + if (ceph_clock_now() - idata.ts > timeout) { + if (err < 0) { + aioc->release(); + if (err == -ENOENT) { + continue; + } else { + cerr << "Not consistent! reading object " << dit->obj + << "returned " << err << std::endl; + ret = false; + break; + } + } + if (atoi(string(un.c_str(), un.length()).c_str()) != 1 && + aioc->get_version64() != dit->version) { + cerr << "Not consistent! object " << dit->obj << " has been " + << " modified since the client died was not cleaned up." + << std::endl; + ret = false; + } + } + special_names.insert(dit->obj); + aioc->release(); + } + for(vector<create_data >::iterator cit = idata.to_create.begin(); + cit != idata.to_create.end(); ++cit) { + special_names.insert(cit->obj); + } + } + parsed_index.insert(std::make_pair(it->first, idata.obj)); + onames.insert(idata.obj); + } + } + + //make sure that an object exists iff it either is the index + //or is listed in the index + for (librados::NObjectIterator oit = io_ctx.nobjects_begin(); + oit != io_ctx.nobjects_end(); ++oit) { + string name = oit->get_oid(); + if (name != index_name && onames.count(name) == 0 + && special_names.count(name) == 0) { + cerr << "Not consistent! found floating object " << name << std::endl; + ret = false; + } + } + + //check objects + string prev = ""; + for (std::map<std::string, string>::iterator it = parsed_index.begin(); + it != parsed_index.end(); + ++it) { + librados::ObjectReadOperation read; + read.omap_get_keys2("", LONG_MAX, &sub_objs[it->second], nullptr, &err); + err = io_ctx.operate(it->second, &read, NULL); + int size_int = (int)sub_objs[it->second].size(); + + //check that size is in the right range + if (it->first != "1" && special_names.count(it->second) == 0 && + err != -ENOENT && (size_int > 2*k|| size_int < k) + && parsed_index.size() > 1) { + cerr << "Not consistent! Object " << *it << " has size " << size_int + << ", which is outside the acceptable range." << std::endl; + ret = false; + } + + //check that all keys belong in that object + for(std::set<std::string>::iterator subit = sub_objs[it->second].begin(); + subit != sub_objs[it->second].end(); ++subit) { + if ((it->first != "1" + && *subit > it->first.substr(1,it->first.length())) + || *subit <= prev) { + cerr << "Not consistent! key " << *subit << " does not belong in " + << *it << std::endl; + cerr << "not last element, i.e. " << it->first << " not equal to 1? " + << (it->first != "1") << std::endl + << "greater than " << it->first.substr(1,it->first.length()) + <<"? " << (*subit > it->first.substr(1,it->first.length())) + << std::endl + << "less than or equal to " << prev << "? " + << (*subit <= prev) << std::endl; + ret = false; + } + } + + prev = it->first.substr(1,it->first.length()); + } + + if (!ret) { + if (verbose) cout << "failed consistency test - see error log" + << std::endl; + cerr << str(); + } else { + if (verbose) cout << "passed consistency test" << std::endl; + } + return ret; +} + +string KvFlatBtreeAsync::str() { + stringstream ret; + ret << "Top-level map:" << std::endl; + int err = 0; + std::set<std::string> keys; + std::map<std::string,bufferlist> index; + librados::ObjectReadOperation oro; + librados::AioCompletion * top_aioc = rados.aio_create_completion(); + oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err); + io_ctx.aio_operate(index_name, top_aioc, &oro, NULL); + top_aioc->wait_for_complete(); + err = top_aioc->get_return_value(); + top_aioc->release(); + if (err < 0 && err != -5){ + if (verbose) cout << "getting keys failed with error " << err << std::endl; + return ret.str(); + } + if(index.empty()) { + ret << "There are no objects!" << std::endl; + return ret.str(); + } + + for (map<std::string,bufferlist>::iterator it = index.begin(); + it != index.end(); ++it) { + keys.insert(string(it->second.c_str(), it->second.length()) + .substr(1,it->second.length())); + } + + vector<std::string> all_names; + vector<int> all_sizes(index.size()); + vector<int> all_versions(index.size()); + vector<bufferlist> all_unwrit(index.size()); + vector<map<std::string,bufferlist> > all_maps(keys.size()); + vector<map<std::string,bufferlist>::iterator> its(keys.size()); + unsigned done = 0; + vector<bool> dones(keys.size()); + ret << std::endl << string(150,'-') << std::endl; + + for (map<std::string,bufferlist>::iterator it = index.begin(); + it != index.end(); ++it){ + index_data idata; + auto b = it->second.cbegin(); + idata.decode(b); + string s = idata.str(); + ret << "|" << string((148 - + ((*it).first.length()+s.length()+3))/2,' '); + ret << (*it).first; + ret << " | "; + ret << string(idata.str()); + ret << string((148 - + ((*it).first.length()+s.length()+3))/2,' '); + ret << "|\t"; + all_names.push_back(idata.obj); + ret << std::endl << string(150,'-') << std::endl; + } + + int indexer = 0; + + //get the object names and sizes + for(vector<std::string>::iterator it = all_names.begin(); it + != all_names.end(); + ++it) { + librados::ObjectReadOperation oro; + librados::AioCompletion *aioc = rados.aio_create_completion(); + oro.omap_get_vals2("", LONG_MAX, &all_maps[indexer], nullptr, &err); + oro.getxattr("unwritable", &all_unwrit[indexer], &err); + io_ctx.aio_operate(*it, aioc, &oro, NULL); + aioc->wait_for_complete(); + if (aioc->get_return_value() < 0) { + ret << "reading" << *it << "failed: " << err << std::endl; + //return ret.str(); + } + all_sizes[indexer] = all_maps[indexer].size(); + all_versions[indexer] = aioc->get_version64(); + indexer++; + aioc->release(); + } + + ret << "///////////////////OBJECT NAMES////////////////" << std::endl; + //HEADERS + ret << std::endl; + for (int i = 0; i < indexer; i++) { + ret << "---------------------------\t"; + } + ret << std::endl; + for (int i = 0; i < indexer; i++) { + ret << "|" << string((25 - + (string("Bucket: ").length() + all_names[i].length()))/2, ' '); + ret << "Bucket: " << all_names[i]; + ret << string((25 - + (string("Bucket: ").length() + all_names[i].length()))/2, ' ') << "|\t"; + } + ret << std::endl; + for (int i = 0; i < indexer; i++) { + its[i] = all_maps[i].begin(); + ret << "|" << string((25 - (string("size: ").length() + + to_string("",all_sizes[i]).length()))/2, ' '); + ret << "size: " << all_sizes[i]; + ret << string((25 - (string("size: ").length() + + to_string("",all_sizes[i]).length()))/2, ' ') << "|\t"; + } + ret << std::endl; + for (int i = 0; i < indexer; i++) { + its[i] = all_maps[i].begin(); + ret << "|" << string((25 - (string("version: ").length() + + to_string("",all_versions[i]).length()))/2, ' '); + ret << "version: " << all_versions[i]; + ret << string((25 - (string("version: ").length() + + to_string("",all_versions[i]).length()))/2, ' ') << "|\t"; + } + ret << std::endl; + for (int i = 0; i < indexer; i++) { + its[i] = all_maps[i].begin(); + ret << "|" << string((25 - (string("unwritable? ").length() + + 1))/2, ' '); + ret << "unwritable? " << string(all_unwrit[i].c_str(), + all_unwrit[i].length()); + ret << string((25 - (string("unwritable? ").length() + + 1))/2, ' ') << "|\t"; + } + ret << std::endl; + for (int i = 0; i < indexer; i++) { + ret << "---------------------------\t"; + } + ret << std::endl; + ret << "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl; + + + ret << std::endl; + for (int i = 0; i < indexer; i++) { + ret << "---------------------------\t"; + } + ret << std::endl; + //each time through this part is two lines + while(done < keys.size()) { + for(int i = 0; i < indexer; i++) { + if(dones[i]){ + ret << " \t"; + } else { + if (its[i] == all_maps[i].end()){ + done++; + dones[i] = true; + ret << " \t"; + } else { + ret << "|" << string((25 - + ((*its[i]).first.length()+its[i]->second.length()+3))/2,' '); + ret << (*its[i]).first; + ret << " | "; + ret << string(its[i]->second.c_str(), its[i]->second.length()); + ret << string((25 - + ((*its[i]).first.length()+its[i]->second.length()+3))/2,' '); + ret << "|\t"; + ++(its[i]); + } + + } + } + ret << std::endl; + for (int i = 0; i < indexer; i++) { + if(dones[i]){ + ret << " \t"; + } else { + ret << "---------------------------\t"; + } + } + ret << std::endl; + + } + return ret.str(); +} diff --git a/src/key_value_store/kv_flat_btree_async.h b/src/key_value_store/kv_flat_btree_async.h new file mode 100644 index 000000000..7ba0ada6f --- /dev/null +++ b/src/key_value_store/kv_flat_btree_async.h @@ -0,0 +1,897 @@ +/* + * Uses a two-level B-tree to store a set of key-value pairs. + * + * September 2, 2012 + * Eleanor Cawthon + * eleanor.cawthon@inktank.com + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef KVFLATBTREEASYNC_H_ +#define KVFLATBTREEASYNC_H_ + +#define ESUICIDE 134 +#define EPREFIX 136 +#define EFIRSTOBJ 138 + +#include "key_value_store/key_value_structure.h" +#include "include/utime.h" +#include "include/types.h" +#include "include/encoding.h" +#include "common/ceph_mutex.h" +#include "common/Clock.h" +#include "common/Formatter.h" +#include "global/global_context.h" +#include "include/rados/librados.hpp" +#include <cfloat> +#include <queue> +#include <sstream> +#include <stdarg.h> + +using ceph::bufferlist; + + +enum { + ADD_PREFIX = 1, + MAKE_OBJECT = 2, + UNWRITE_OBJECT = 3, + RESTORE_OBJECT = 4, + REMOVE_OBJECT = 5, + REMOVE_PREFIX = 6, + AIO_MAKE_OBJECT = 7 +}; + +struct rebalance_args; + +/** + * stores information about a key in the index. + * + * prefix is "0" unless key is "", in which case it is "1". This ensures that + * the object with key "" will always be the highest key in the index. + */ +struct key_data { + std::string raw_key; + std::string prefix; + + key_data() + {} + + /** + * @pre: key is a raw key (does not contain a prefix) + */ + key_data(std::string key) + : raw_key(key) + { + raw_key == "" ? prefix = "1" : prefix = "0"; + } + + bool operator==(key_data k) const { + return ((raw_key == k.raw_key) && (prefix == k.prefix)); + } + + bool operator!=(key_data k) const { + return ((raw_key != k.raw_key) || (prefix != k.prefix)); + } + + bool operator<(key_data k) const { + return this->encoded() < k.encoded(); + } + + bool operator>(key_data k) const { + return this->encoded() > k.encoded(); + } + + /** + * parses the prefix from encoded and stores the data in this. + * + * @pre: encoded has a prefix + */ + void parse(std::string encoded) { + prefix = encoded[0]; + raw_key = encoded.substr(1,encoded.length()); + } + + /** + * returns a string containing the encoded (prefixed) key + */ + std::string encoded() const { + return prefix + raw_key; + } + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(raw_key, bl); + encode(prefix, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(raw_key, p); + decode(prefix, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(key_data) + + +/** + * Stores information read from a librados object. + */ +struct object_data { + key_data min_kdata; //the max key from the previous index entry + key_data max_kdata; //the max key, from the index + std::string name; //the object's name + std::map<std::string, bufferlist> omap; // the omap of the object + bool unwritable; // an xattr that, if false, means an op is in + // progress and other clients should not write to it. + uint64_t version; //the version at time of read + uint64_t size; //the number of elements in the omap + + object_data() + : unwritable(false), + version(0), + size(0) + {} + + object_data(std::string the_name) + : name(the_name), + unwritable(false), + version(0), + size(0) + {} + + object_data(key_data min, key_data kdat, std::string the_name) + : min_kdata(min), + max_kdata(kdat), + name(the_name), + unwritable(false), + version(0), + size(0) + {} + + object_data(key_data min, key_data kdat, std::string the_name, + std::map<std::string, bufferlist> the_omap) + : min_kdata(min), + max_kdata(kdat), + name(the_name), + omap(the_omap), + unwritable(false), + version(0), + size(0) + {} + + object_data(key_data min, key_data kdat, std::string the_name, int the_version) + : min_kdata(min), + max_kdata(kdat), + name(the_name), + unwritable(false), + version(the_version), + size(0) + {} + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(min_kdata, bl); + encode(max_kdata, bl); + encode(name, bl); + encode(omap, bl); + encode(unwritable, bl); + encode(version, bl); + encode(size, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(min_kdata, p); + decode(max_kdata, p); + decode(name, p); + decode(omap, p); + decode(unwritable, p); + decode(version, p); + decode(size, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(object_data) + +/** + * information about objects to be created by a split or merge - stored in the + * index_data. + */ +struct create_data { + key_data min; + key_data max; + std::string obj; + + create_data() + {} + + create_data(key_data n, key_data x, std::string o) + : min(n), + max(x), + obj(o) + {} + + create_data(object_data o) + : min(o.min_kdata), + max(o.max_kdata), + obj(o.name) + {} + + create_data & operator=(const create_data &c) { + min = c.min; + max = c.max; + obj = c.obj; + return *this; + } + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(min, bl); + encode(max, bl); + encode(obj, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(min, p); + decode(max, p); + decode(obj, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(create_data) + +/** + * information about objects to be deleted by a split or merge - stored in the + * index_data. + */ +struct delete_data { + key_data min; + key_data max; + std::string obj; + uint64_t version; + + delete_data() + : version(0) + {} + + delete_data(key_data n, key_data x, std::string o, uint64_t v) + : min(n), + max(x), + obj(o), + version(v) + {} + + delete_data & operator=(const delete_data &d) { + min = d.min; + max = d.max; + obj = d.obj; + version = d.version; + return *this; + } + + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(min, bl); + encode(max, bl); + encode(obj, bl); + encode(version, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(min, p); + decode(max, p); + decode(obj, p); + decode(version, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(delete_data) + +/** + * The index object is a key value map that stores + * the highest key stored in an object as keys, and an index_data + * as the corresponding value. The index_data contains the encoded + * high and low keys (where keys in this object are > min_kdata and + * <= kdata), the name of the librados object where keys containing + * that range of keys are located, and information about split and + * merge operations that may need to be cleaned up if a client dies. + */ +struct index_data { + //the encoded key corresponding to the object + key_data kdata; + + //"1" if there is a prefix (because a split or merge is + //in progress), otherwise "" + std::string prefix; + + //the kdata of the previous index entry + key_data min_kdata; + + utime_t ts; //time that a split/merge started + + //objects to be created + std::vector<create_data > to_create; + + //objects to be deleted + std::vector<delete_data > to_delete; + + //the name of the object where the key range is located. + std::string obj; + + index_data() + {} + + index_data(std::string raw_key) + : kdata(raw_key) + {} + + index_data(key_data max, key_data min, std::string o) + : kdata(max), + min_kdata(min), + obj(o) + {} + + index_data(create_data c) + : kdata(c.max), + min_kdata(c.min), + obj(c.obj) + {} + + bool operator<(const index_data &other) const { + return (kdata.encoded() < other.kdata.encoded()); + } + + //true if there is a prefix and now - ts > timeout. + bool is_timed_out(utime_t now, utime_t timeout) const; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(prefix, bl); + encode(min_kdata, bl); + encode(kdata, bl); + encode(ts, bl); + encode(to_create, bl); + encode(to_delete, bl); + encode(obj, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(prefix, p); + decode(min_kdata, p); + decode(kdata, p); + decode(ts, p); + decode(to_create, p); + decode(to_delete, p); + decode(obj, p); + DECODE_FINISH(p); + } + + /* + * Prints a string representation of the information, in the following format: + * (min_kdata/ + * kdata, + * prefix + * ts + * elements of to_create, organized into (high key| obj name) + * ; + * elements of to_delete, organized into (high key| obj name | version number) + * : + * val) + */ + std::string str() const { + std::stringstream strm; + strm << '(' << min_kdata.encoded() << "/" << kdata.encoded() << ',' + << prefix; + if (prefix == "1") { + strm << ts.sec() << '.' << ts.usec(); + for(std::vector<create_data>::const_iterator it = to_create.begin(); + it != to_create.end(); ++it) { + strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|' + << it->obj << ')'; + } + strm << ';'; + for(std::vector<delete_data >::const_iterator it = to_delete.begin(); + it != to_delete.end(); ++it) { + strm << '(' << it->min.encoded() << '/' << it->max.encoded() << '|' + << it->obj << '|' + << it->version << ')'; + } + strm << ':'; + } + strm << obj << ')'; + return strm.str(); + } +}; +WRITE_CLASS_ENCODER(index_data) + +/** + * Structure to store information read from the index for reuse. + */ +class IndexCache { +protected: + std::map<key_data, std::pair<index_data, utime_t> > k2itmap; + std::map<utime_t, key_data> t2kmap; + int cache_size; + +public: + IndexCache(int n) + : cache_size(n) + {} + /** + * Inserts idata into the cache and removes whatever key mapped to before. + * If the cache is full, pops the oldest entry. + */ + void push(const std::string &key, const index_data &idata); + + /** + * Inserts idata into the cache. If idata.kdata is already in the cache, + * replaces the old one. Pops the oldest entry if the cache is full. + */ + void push(const index_data &idata); + + /** + * Removes the oldest entry from the cache + */ + void pop(); + + /** + * Removes the value associated with kdata from both maps + */ + void erase(key_data kdata); + + /** + * gets the idata where key belongs. If none, returns -ENODATA. + */ + int get(const std::string &key, index_data *idata) const; + + /** + * Gets the idata where key goes and the one after it. If there are not + * valid entries for both of them, returns -ENODATA. + */ + int get(const std::string &key, index_data *idata, index_data * next_idata) const; + void clear(); +}; + +class KvFlatBtreeAsync; + + +/** + * These are used internally to translate aio operations into useful thread + * arguments. + */ +struct aio_set_args { + KvFlatBtreeAsync * kvba; + std::string key; + bufferlist val; + bool exc; + callback cb; + void * cb_args; + int * err; +}; + +struct aio_rm_args { + KvFlatBtreeAsync * kvba; + std::string key; + callback cb; + void * cb_args; + int * err; +}; + +struct aio_get_args { + KvFlatBtreeAsync * kvba; + std::string key; + bufferlist * val; + bool exc; + callback cb; + void * cb_args; + int * err; +}; + +class KvFlatBtreeAsync : public KeyValueStructure { +protected: + + //don't change these once operations start being called - they are not + //protected with mutexes! + int k; + std::string index_name; + librados::IoCtx io_ctx; + std::string rados_id; + std::string client_name; + librados::Rados rados; + std::string pool_name; + injection_t interrupt; + int wait_ms; + utime_t timeout; //declare a client dead if it goes this long without + //finishing a split/merge + int cache_size; + double cache_refresh; //read cache_size / cache_refresh entries each time the + //index is read + bool verbose;//if true, display lots of debug output + + //shared variables protected with mutexes + ceph::mutex client_index_lock = ceph::make_mutex("client_index_lock"); + int client_index; //names of new objects are client_name.client_index + ceph::mutex icache_lock = ceph::make_mutex("icache_lock"); + IndexCache icache; + friend struct index_data; + + /** + * finds the object in the index with the lowest key value that is greater + * than idata.kdata. If idata.kdata is the max key, returns -EOVERFLOW. If + * idata has a prefix and has timed out, cleans up. + * + * @param idata: idata for the object to search for. + * @param out_data: the idata for the next object. + * + * @pre: idata must contain a key_data. + * @post: out_data contains complete information + */ + int next(const index_data &idata, index_data * out_data); + + /** + * finds the object in the index with the lowest key value that is greater + * than idata.kdata. If idata.kdata is the lowest key, returns -ERANGE If + * idata has a prefix and has timed out, cleans up. + * + * @param idata: idata for the object to search for. + * @param out_data: the idata for the next object. + * + * @pre: idata must contain a key_data. + * @post: out_data contains complete information + */ + int prev(const index_data &idata, index_data * out_data); + + /** + * finds the index_data where a key belongs, from cache if possible. If it + * reads the index object, it will read the first cache_size entries after + * key and put them in the cache. + * + * @param key: the key to search for + * @param idata: the index_data for the first index value such that idata.key + * is greater than key. + * @param next_idata: if not NULL, this will be set to the idata after idata + * @param force_update: if false, will try to read from cache first. + * + * @pre: key is not encoded + * @post: idata contains complete information + * stored + */ + int read_index(const std::string &key, index_data * idata, + index_data * next_idata, bool force_update); + + /** + * Reads obj and generates information about it. Iff the object has >= 2k + * entries, reads the whole omap and then splits it. + * + * @param idata: index data for the object being split + * @pre: idata contains a key and an obj + * @post: idata.obj has been split and icache has been updated + * @return -EBALANCE if obj does not need to be split, 0 if split successful, + * error from read_object or perform_ops if there is one. + */ + int split(const index_data &idata); + + /** + * reads o1 and the next object after o1 and, if necessary, rebalances them. + * if hk1 is the highest key in the index, calls rebalance on the next highest + * key. + * + * @param idata: index data for the object being rebalanced + * @param next_idata: index data for the next object. If blank, will read. + * @pre: idata contains a key and an obj + * @post: idata.obj has been rebalanced and icache has been updated + * @return -EBALANCE if no change needed, -ENOENT if o1 does not exist, + * -ECANCELED if second object does not exist, otherwise, error from + * perform_ops + */ + int rebalance(const index_data &idata1, const index_data &next_idata); + + /** + * performs an ObjectReadOperation to populate odata + * + * @post: odata has all information about obj except for key (which is "") + */ + int read_object(const std::string &obj, object_data * odata); + + /** + * performs a maybe_read_for_balance ObjectOperation so the omap is only + * read if the object is out of bounds. + */ + int read_object(const std::string &obj, rebalance_args * args); + + /** + * sets up owo to change the index in preparation for a split/merge. + * + * @param to_create: vector of object_data to be created. + * @param to_delete: vector of object_data to be deleted. + * @param owo: the ObjectWriteOperation to set up + * @param idata: will be populated by index data for this op. + * @param err: error code reference to pass to omap_cmp + * @pre: entries in to_create and to_delete must have keys and names. + */ + void set_up_prefix_index( + const std::vector<object_data> &to_create, + const std::vector<object_data> &to_delete, + librados::ObjectWriteOperation * owo, + index_data * idata, + int * err); + + /** + * sets up all make, mark, restore, and delete ops, as well as the remove + * prefix op, based on idata. + * + * @param create_vector: vector of data about the objects to be created. + * @pre: entries in create_data must have names and omaps and be in idata + * order + * @param delete_vector: vector of data about the objects to be deleted + * @pre: entries in to_delete must have versions and be in idata order + * @param ops: the owos to set up. the pair is a pair of op identifiers + * and names of objects - set_up_ops fills these in. + * @pre: ops must be the correct size and the ObjectWriteOperation pointers + * must be valid. + * @param idata: the idata with information about how to set up the ops + * @pre: idata has valid to_create and to_delete + * @param err: the int to get the error value for omap_cmp + */ + void set_up_ops( + const std::vector<object_data> &create_vector, + const std::vector<object_data> &delete_vector, + std::vector<std::pair<std::pair<int, std::string>, librados::ObjectWriteOperation*> > * ops, + const index_data &idata, + int * err); + + /** + * sets up owo to exclusive create, set omap to to_set, and set + * unwritable to "0" + */ + void set_up_make_object( + const std::map<std::string, bufferlist> &to_set, + librados::ObjectWriteOperation *owo); + + /** + * sets up owo to assert object version and that object version is + * writable, + * then mark it unwritable. + * + * @param ver: if this is 0, no version is asserted. + */ + void set_up_unwrite_object( + const int &ver, librados::ObjectWriteOperation *owo); + + /** + * sets up owo to assert that an object is unwritable and then mark it + * writable + */ + void set_up_restore_object( + librados::ObjectWriteOperation *owo); + + /** + * sets up owo to assert that the object is unwritable and then remove it + */ + void set_up_delete_object( + librados::ObjectWriteOperation *owo); + + /** + * perform the operations in ops and handles errors. + * + * @param debug_prefix: what to print at the beginning of debug output + * @param idata: the idata for the object being operated on, to be + * passed to cleanup if necessary + * @param ops: this contains an int identifying the type of op, + * a string that is the name of the object to operate on, and a pointer + * to the ObjectWriteOperation to use. All of this must be complete. + * @post: all operations are performed and most errors are handled + * (e.g., cleans up if an assertion fails). If an unknown error is found, + * returns it. + */ + int perform_ops( const std::string &debug_prefix, + const index_data &idata, + std::vector<std::pair<std::pair<int, std::string>, librados::ObjectWriteOperation*> > * ops); + + /** + * Called when a client discovers that another client has died during a + * split or a merge. cleans up after that client. + * + * @param idata: the index data parsed from the index entry left by the dead + * client. + * @param error: the error that caused the client to realize the other client + * died (should be -ENOENT or -ETIMEDOUT) + * @post: rolls forward if -ENOENT, otherwise rolls back. + */ + int cleanup(const index_data &idata, const int &error); + + /** + * does the ObjectWriteOperation and splits, reads the index, and/or retries + * until success. + */ + int set_op(const std::string &key, const bufferlist &val, + bool update_on_existing, index_data &idata); + + /** + * does the ObjectWriteOperation and merges, reads the index, and/or retries + * until success. + */ + int remove_op(const std::string &key, index_data &idata, index_data &next_idata); + + /** + * does the ObjectWriteOperation and reads the index and/or retries + * until success. + */ + int get_op(const std::string &key, bufferlist * val, index_data &idata); + + /** + * does the ObjectWriteOperation and splits, reads the index, and/or retries + * until success. + */ + int handle_set_rm_errors(int &err, std::string key, std::string obj, + index_data * idata, index_data * next_idata); + + /** + * called by aio_set, aio_remove, and aio_get, respectively. + */ + static void* pset(void *ptr); + static void* prm(void *ptr); + static void* pget(void *ptr); +public: + + + //interruption methods, for correctness testing + /** + * returns 0 + */ + int nothing() override; + /** + * 10% chance of waiting wait_ms seconds + */ + int wait() override; + /** + * 10% chance of killing the client. + */ + int suicide() override; + +KvFlatBtreeAsync(int k_val, std::string name, int cache, double cache_r, + bool verb) + : k(k_val), + index_name("index_object"), + rados_id(name), + client_name(std::string(name).append(".")), + pool_name("rbd"), + interrupt(&KeyValueStructure::nothing), + wait_ms(0), + timeout(100000,0), + cache_size(cache), + cache_refresh(cache_r), + verbose(verb), + client_index(0), + icache(cache) + {} + + /** + * creates a string with an int at the end. + * + * @param s: the string on the left + * @param i: the int to be appended to the string + * @return the string + */ + static std::string to_string(std::string s, int i); + + /** + * returns in encoded + */ + static bufferlist to_bl(const std::string &in) { + bufferlist bl; + bl.append(in); + return bl; + } + + /** + * returns idata encoded; + */ + static bufferlist to_bl(const index_data &idata) { + bufferlist bl; + idata.encode(bl); + return bl; + } + + /** + * returns the rados_id of this KvFlatBtreeAsync + */ + std::string get_name(); + + /** + * sets this kvba to call inject before every ObjectWriteOperation. + * If inject is wait and wait_time is set, wait will have a 10% chance of + * sleeping for waite_time milliseconds. + */ + void set_inject(injection_t inject, int wait_time) override; + + /** + * sets up the rados and io_ctx of this KvFlatBtreeAsync. If the don't already + * exist, creates the index and max object. + */ + int setup(int argc, const char** argv) override; + + int set(const std::string &key, const bufferlist &val, + bool update_on_existing) override; + + int remove(const std::string &key) override; + + /** + * returns true if all of the following are true: + * + * all objects are accounted for in the index or a prefix + * (i.e., no floating objects) + * all objects have k <= size <= 2k + * all keys in an object are within the specified predicted by the index + * + * if any of those fails, states that the problem(s) are, and prints str(). + * + * @pre: no operations are in progress + */ + bool is_consistent() override; + + /** + * returns an ASCII representation of the index and sub objects, showing + * stats about each object and all omaps. Don't use if you have more than + * about 10 objects. + */ + std::string str() override; + + int get(const std::string &key, bufferlist *val) override; + + //async versions of these methods + void aio_get(const std::string &key, bufferlist *val, callback cb, + void *cb_args, int * err) override; + void aio_set(const std::string &key, const bufferlist &val, bool exclusive, + callback cb, void * cb_args, int * err) override; + void aio_remove(const std::string &key, callback cb, void *cb_args, int * err) override; + + //these methods that deal with multiple keys at once are efficient, but make + //no guarantees about atomicity! + + /** + * Removes all objects and resets the store as if setup had just run. Makes no + * attempt to do this safely - make sure this is the only operation running + * when it is called! + */ + int remove_all() override; + + /** + * This does not add prefixes to the index and therefore DOES NOT guarantee + * consistency! It is ONLY safe if there is only one instance at a time. + * It follows the same general logic as a rebalance, but + * with all objects that contain any of the keys in in_map. It is O(n), where + * n is the number of librados objects it has to change. Higher object sizes + * (i.e., k values) also decrease the efficiency of this method because it + * copies all of the entries in each object it modifies. Writing new objects + * is done in parallel. + * + * This is efficient if: + * * other clients are very unlikely to be modifying any of the objects while + * this operation is in progress + * * The entries in in_map are close together + * * It is especially efficient for initially entering lots of entries into + * an empty structure. + * + * It is very inefficient compared to setting one key and/or will starve if: + * * other clients are modifying the objects it tries to modify + * * The keys are distributed across the range of keys in the store + * * there is a small number of keys compared to k + */ + int set_many(const std::map<std::string, bufferlist> &in_map) override; + + int get_all_keys(std::set<std::string> *keys) override; + int get_all_keys_and_values(std::map<std::string,bufferlist> *kv_map) override; + +}; + +#endif /* KVFLATBTREEASYNC_H_ */ diff --git a/src/key_value_store/kvs_arg_types.h b/src/key_value_store/kvs_arg_types.h new file mode 100644 index 000000000..5bcc4b131 --- /dev/null +++ b/src/key_value_store/kvs_arg_types.h @@ -0,0 +1,144 @@ +/* + * Argument types used by cls_kvs.cc + * + * Created on: Aug 10, 2012 + * Author: eleanor + */ + +#ifndef CLS_KVS_H_ +#define CLS_KVS_H_ + +#define EBALANCE 137 + +#include "include/encoding.h" +#include "key_value_store/kv_flat_btree_async.h" + +using ceph::bufferlist; + +struct assert_size_args { + uint64_t bound; //the size to compare to - should be k or 2k + uint64_t comparator; //should be CEPH_OSD_CMPXATTR_OP_EQ, + //CEPH_OSD_CMPXATTR_OP_LT, or + //CEPH_OSD_CMPXATTR_OP_GT + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(bound, bl); + encode(comparator, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(bound, p); + decode(comparator, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(assert_size_args) + +struct idata_from_key_args { + std::string key; + index_data idata; + index_data next_idata; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(key, bl); + encode(idata, bl); + encode(next_idata, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(key, p); + decode(idata, p); + decode(next_idata, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(idata_from_key_args) + +struct idata_from_idata_args { + index_data idata; + index_data next_idata; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(idata, bl); + encode(next_idata, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(idata, p); + decode(next_idata, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(idata_from_idata_args) + +struct omap_set_args { + std::map<std::string, bufferlist> omap; + uint64_t bound; + bool exclusive; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(omap, bl); + encode(bound, bl); + encode(exclusive, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(omap, p); + decode(bound, p); + decode(exclusive, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(omap_set_args) + +struct omap_rm_args { + std::set<std::string> omap; + uint64_t bound; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(omap, bl); + encode(bound, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(omap, p); + decode(bound, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(omap_rm_args) + +struct rebalance_args { + object_data odata; + uint64_t bound; + uint64_t comparator; + + void encode(bufferlist &bl) const { + ENCODE_START(1,1,bl); + encode(odata, bl); + encode(bound, bl); + encode(comparator, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &p) { + DECODE_START(1, p); + decode(odata,p); + decode(bound, p); + decode(comparator, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(rebalance_args) + + +#endif /* CLS_KVS_H_ */ |