// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "include/int_types.h" #include "include/buffer.h" #include "include/encoding.h" #include "common/errno.h" #include "objclass/objclass.h" #include "cls/journal/cls_journal_types.h" #include #include #include #include CLS_VER(1, 0) CLS_NAME(journal) namespace { static const uint64_t MAX_KEYS_READ = 64; static const std::string HEADER_KEY_ORDER = "order"; static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width"; static const std::string HEADER_KEY_POOL_ID = "pool_id"; static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set"; static const std::string HEADER_KEY_ACTIVE_SET = "active_set"; static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid"; static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class"; static const std::string HEADER_KEY_CLIENT_PREFIX = "client_"; static const std::string HEADER_KEY_TAG_PREFIX = "tag_"; std::string to_hex(uint64_t value) { std::ostringstream oss; oss << std::setw(16) << std::setfill('0') << std::hex << value; return oss.str(); } std::string key_from_client_id(const std::string &client_id) { return HEADER_KEY_CLIENT_PREFIX + client_id; } std::string key_from_tag_tid(uint64_t tag_tid) { return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid); } uint64_t tag_tid_from_key(const std::string &key) { std::istringstream iss(key); uint64_t id; iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id; return id; } template int read_key(cls_method_context_t hctx, const string &key, T *t, bool ignore_enoent = false) { bufferlist bl; int r = cls_cxx_map_get_val(hctx, key, &bl); if (r == -ENOENT) { if (ignore_enoent) { r = 0; } return r; } else if (r < 0) { CLS_ERR("failed to get omap key: %s", key.c_str()); return r; } try { auto iter = bl.cbegin(); decode(*t, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } return 0; } template int write_key(cls_method_context_t hctx, const string &key, const T &t) { bufferlist bl; encode(t, bl); int r = cls_cxx_map_set_val(hctx, key, &bl); if (r < 0) { CLS_ERR("failed to set omap key: %s", key.c_str()); return r; } return 0; } int remove_key(cls_method_context_t hctx, const string &key) { int r = cls_cxx_map_remove_key(hctx, key); if (r < 0 && r != -ENOENT) { CLS_ERR("failed to remove key: %s", key.c_str()); return r; } return 0; } int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) { std::string skip_client_key; if (skip_client_id != nullptr) { skip_client_key = key_from_client_id(*skip_client_id); } uint64_t minimum_tag_tid = std::numeric_limits::max(); std::string last_read = ""; bool more; do { std::map vals; int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX, MAX_KEYS_READ, &vals, &more); if (r < 0 && r != -ENOENT) { CLS_ERR("failed to retrieve registered clients: %s", cpp_strerror(r).c_str()); return r; } for (auto &val : vals) { // if we are removing a client, skip its commit positions if (val.first == skip_client_key) { continue; } cls::journal::Client client; auto iter = val.second.cbegin(); try { decode(client, iter); } catch (const buffer::error &err) { CLS_ERR("error decoding registered client: %s", val.first.c_str()); return -EIO; } if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { // don't allow a disconnected client to prevent pruning continue; } else if (client.commit_position.object_positions.empty()) { // cannot prune if one or more clients has an empty commit history return 0; } for (auto object_position : client.commit_position.object_positions) { minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid); } } if (!vals.empty()) { last_read = vals.rbegin()->first; } } while (more); // cannot expire tags if a client hasn't committed yet if (minimum_tag_tid == std::numeric_limits::max()) { return 0; } // compute the minimum in-use tag for each class std::map minimum_tag_class_to_tids; typedef enum { TAG_PASS_CALCULATE_MINIMUMS, TAG_PASS_SCRUB, TAG_PASS_DONE } TagPass; int tag_pass = TAG_PASS_CALCULATE_MINIMUMS; last_read = HEADER_KEY_TAG_PREFIX; do { std::map vals; int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX, MAX_KEYS_READ, &vals, &more); if (r < 0 && r != -ENOENT) { CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str()); return r; } for (auto &val : vals) { cls::journal::Tag tag; auto iter = val.second.cbegin(); try { decode(tag, iter); } catch (const buffer::error &err) { CLS_ERR("error decoding tag: %s", val.first.c_str()); return -EIO; } if (tag.tid != tag_tid_from_key(val.first)) { CLS_ERR("tag tid mismatched: %s", val.first.c_str()); return -EINVAL; } if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) { minimum_tag_class_to_tids[tag.tag_class] = tag.tid; } else if (tag_pass == TAG_PASS_SCRUB && tag.tid < minimum_tag_class_to_tids[tag.tag_class]) { r = remove_key(hctx, val.first); if (r < 0) { return r; } } if (tag.tid >= minimum_tag_tid) { // no need to check for tag classes beyond this point vals.clear(); more = false; break; } } if (tag_pass != TAG_PASS_DONE && !more) { last_read = HEADER_KEY_TAG_PREFIX; ++tag_pass; } else if (!vals.empty()) { last_read = vals.rbegin()->first; } } while (tag_pass != TAG_PASS_DONE); return 0; } int get_client_list_range(cls_method_context_t hctx, std::set *clients, std::string start_after, uint64_t max_return) { std::string last_read; if (!start_after.empty()) { last_read = key_from_client_id(start_after); } std::map vals; bool more; int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX, max_return, &vals, &more); if (r < 0) { CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str()); return r; } for (std::map::iterator it = vals.begin(); it != vals.end(); ++it) { try { auto iter = it->second.cbegin(); cls::journal::Client client; decode(client, iter); clients->insert(client); } catch (const buffer::error &err) { CLS_ERR("could not decode client '%s': %s", it->first.c_str(), err.what()); return -EIO; } } return 0; } int find_min_commit_position(cls_method_context_t hctx, cls::journal::ObjectSetPosition *minset) { int r; bool valid = false; std::string start_after = ""; uint64_t tag_tid = 0, entry_tid = 0; while (true) { std::set batch; r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN); if ((r < 0) || batch.empty()) { break; } start_after = batch.rbegin()->id; // update the (minimum) commit position from this batch of clients for(std::set::iterator it = batch.begin(); it != batch.end(); ++it) { cls::journal::ObjectSetPosition object_set_position = (*it).commit_position; if (object_set_position.object_positions.empty()) { *minset = cls::journal::ObjectSetPosition(); break; } cls::journal::ObjectPosition first = object_set_position.object_positions.front(); // least tag_tid (or least entry_tid for matching tag_tid) if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) { tag_tid = first.tag_tid; entry_tid = first.entry_tid; *minset = cls::journal::ObjectSetPosition(object_set_position); valid = true; } } // got the last batch, we're done if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) { break; } } return r; } } // anonymous namespace /** * Input: * @param order (uint8_t) - bits to shift to compute the object max size * @param splay width (uint8_t) - number of active journal objects * * Output: * @returns 0 on success, negative error code on failure */ int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint8_t order; uint8_t splay_width; int64_t pool_id; try { auto iter = in->cbegin(); decode(order, iter); decode(splay_width, iter); decode(pool_id, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } bufferlist stored_orderbl; int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl); if (r >= 0) { CLS_ERR("journal already exists"); return -EEXIST; } else if (r != -ENOENT) { return r; } r = write_key(hctx, HEADER_KEY_ORDER, order); if (r < 0) { return r; } r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width); if (r < 0) { return r; } r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id); if (r < 0) { return r; } uint64_t object_set = 0; r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set); if (r < 0) { return r; } r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set); if (r < 0) { return r; } uint64_t tag_id = 0; r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id); if (r < 0) { return r; } r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id); if (r < 0) { return r; } return 0; } /** * Input: * none * * Output: * order (uint8_t) * @returns 0 on success, negative error code on failure */ int journal_get_order(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint8_t order; int r = read_key(hctx, HEADER_KEY_ORDER, &order); if (r < 0) { return r; } encode(order, *out); return 0; } /** * Input: * none * * Output: * splay_width (uint8_t) * @returns 0 on success, negative error code on failure */ int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint8_t splay_width; int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width); if (r < 0) { return r; } encode(splay_width, *out); return 0; } /** * Input: * none * * Output: * pool_id (int64_t) * @returns 0 on success, negative error code on failure */ int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { int64_t pool_id = 0; int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id); if (r < 0) { return r; } encode(pool_id, *out); return 0; } /** * Input: * none * * Output: * object set (uint64_t) * @returns 0 on success, negative error code on failure */ int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t minimum_set; int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set); if (r < 0) { return r; } encode(minimum_set, *out); return 0; } /** * Input: * @param object set (uint64_t) * * Output: * @returns 0 on success, negative error code on failure */ int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t object_set; try { auto iter = in->cbegin(); decode(object_set, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } uint64_t current_active_set; int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, ¤t_active_set); if (r < 0) { return r; } if (current_active_set < object_set) { CLS_LOG(10, "active object set earlier than minimum: %" PRIu64 " < %" PRIu64, current_active_set, object_set); return -EINVAL; } uint64_t current_minimum_set; r = read_key(hctx, HEADER_KEY_MINIMUM_SET, ¤t_minimum_set); if (r < 0) { return r; } if (object_set == current_minimum_set) { return 0; } else if (object_set < current_minimum_set) { CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64, object_set, current_minimum_set); return -ESTALE; } r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set); if (r < 0) { return r; } return 0; } /** * Input: * none * * Output: * object set (uint64_t) * @returns 0 on success, negative error code on failure */ int journal_get_active_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t active_set; int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set); if (r < 0) { return r; } encode(active_set, *out); return 0; } /** * Input: * @param object set (uint64_t) * * Output: * @returns 0 on success, negative error code on failure */ int journal_set_active_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t object_set; try { auto iter = in->cbegin(); decode(object_set, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } uint64_t current_minimum_set; int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, ¤t_minimum_set); if (r < 0) { return r; } if (current_minimum_set > object_set) { CLS_ERR("minimum object set later than active: %" PRIu64 " > %" PRIu64, current_minimum_set, object_set); return -EINVAL; } uint64_t current_active_set; r = read_key(hctx, HEADER_KEY_ACTIVE_SET, ¤t_active_set); if (r < 0) { return r; } if (object_set == current_active_set) { return 0; } else if (object_set < current_active_set) { CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64, object_set, current_active_set); return -ESTALE; } r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set); if (r < 0) { return r; } return 0; } /** * Input: * @param id (string) - unique client id * * Output: * cls::journal::Client * @returns 0 on success, negative error code on failure */ int journal_get_client(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string id; try { auto iter = in->cbegin(); decode(id, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::string key(key_from_client_id(id)); cls::journal::Client client; int r = read_key(hctx, key, &client); if (r < 0) { return r; } encode(client, *out); return 0; } /** * Input: * @param id (string) - unique client id * @param data (bufferlist) - opaque data associated to client * * Output: * @returns 0 on success, negative error code on failure */ int journal_client_register(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string id; bufferlist data; try { auto iter = in->cbegin(); decode(id, iter); decode(data, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } uint8_t order; int r = read_key(hctx, HEADER_KEY_ORDER, &order); if (r < 0) { return r; } std::string key(key_from_client_id(id)); bufferlist stored_clientbl; r = cls_cxx_map_get_val(hctx, key, &stored_clientbl); if (r >= 0) { CLS_ERR("duplicate client id: %s", id.c_str()); return -EEXIST; } else if (r != -ENOENT) { return r; } cls::journal::ObjectSetPosition minset; r = find_min_commit_position(hctx, &minset); if (r < 0) return r; cls::journal::Client client(id, data, minset); r = write_key(hctx, key, client); if (r < 0) { return r; } return 0; } /** * Input: * @param id (string) - unique client id * @param data (bufferlist) - opaque data associated to client * * Output: * @returns 0 on success, negative error code on failure */ int journal_client_update_data(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string id; bufferlist data; try { auto iter = in->cbegin(); decode(id, iter); decode(data, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::string key(key_from_client_id(id)); cls::journal::Client client; int r = read_key(hctx, key, &client); if (r < 0) { return r; } client.data = data; r = write_key(hctx, key, client); if (r < 0) { return r; } return 0; } /** * Input: * @param id (string) - unique client id * @param state (uint8_t) - client state * * Output: * @returns 0 on success, negative error code on failure */ int journal_client_update_state(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string id; cls::journal::ClientState state; bufferlist data; try { auto iter = in->cbegin(); decode(id, iter); uint8_t state_raw; decode(state_raw, iter); state = static_cast(state_raw); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::string key(key_from_client_id(id)); cls::journal::Client client; int r = read_key(hctx, key, &client); if (r < 0) { return r; } client.state = state; r = write_key(hctx, key, client); if (r < 0) { return r; } return 0; } /** * Input: * @param id (string) - unique client id * * Output: * @returns 0 on success, negative error code on failure */ int journal_client_unregister(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string id; try { auto iter = in->cbegin(); decode(id, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::string key(key_from_client_id(id)); bufferlist bl; int r = cls_cxx_map_get_val(hctx, key, &bl); if (r < 0) { CLS_ERR("client is not registered: %s", id.c_str()); return r; } r = cls_cxx_map_remove_key(hctx, key); if (r < 0) { CLS_ERR("failed to remove omap key: %s", key.c_str()); return r; } // prune expired tags r = expire_tags(hctx, &id); if (r < 0) { return r; } return 0; } /** * Input: * @param client_id (uint64_t) - unique client id * @param commit_position (ObjectSetPosition) * * Output: * @returns 0 on success, negative error code on failure */ int journal_client_commit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string id; cls::journal::ObjectSetPosition commit_position; try { auto iter = in->cbegin(); decode(id, iter); decode(commit_position, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } uint8_t splay_width; int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width); if (r < 0) { return r; } if (commit_position.object_positions.size() > splay_width) { CLS_ERR("too many object positions"); return -EINVAL; } std::string key(key_from_client_id(id)); cls::journal::Client client; r = read_key(hctx, key, &client); if (r < 0) { return r; } if (client.commit_position == commit_position) { return 0; } client.commit_position = commit_position; r = write_key(hctx, key, client); if (r < 0) { return r; } return 0; } /** * Input: * @param start_after (string) * @param max_return (uint64_t) * * Output: * clients (set) - collection of registered clients * @returns 0 on success, negative error code on failure */ int journal_client_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { std::string start_after; uint64_t max_return; try { auto iter = in->cbegin(); decode(start_after, iter); decode(max_return, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::set clients; int r = get_client_list_range(hctx, &clients, start_after, max_return); if (r < 0) return r; encode(clients, *out); return 0; } /** * Input: * none * * Output: * @returns 0 on success, negative error code on failure */ int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t tag_tid; int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid); if (r < 0) { return r; } encode(tag_tid, *out); return 0; } /** * Input: * @param tag_tid (uint64_t) * * Output: * cls::journal::Tag * @returns 0 on success, negative error code on failure */ int journal_get_tag(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t tag_tid; try { auto iter = in->cbegin(); decode(tag_tid, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::string key(key_from_tag_tid(tag_tid)); cls::journal::Tag tag; int r = read_key(hctx, key, &tag); if (r < 0) { return r; } encode(tag, *out); return 0; } /** * Input: * @param tag_tid (uint64_t) * @param tag_class (uint64_t) * @param data (bufferlist) * * Output: * @returns 0 on success, negative error code on failure */ int journal_tag_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t tag_tid; uint64_t tag_class; bufferlist data; try { auto iter = in->cbegin(); decode(tag_tid, iter); decode(tag_class, iter); decode(data, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } std::string key(key_from_tag_tid(tag_tid)); bufferlist stored_tag_bl; int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl); if (r >= 0) { CLS_ERR("duplicate tag id: %" PRIu64, tag_tid); return -EEXIST; } else if (r != -ENOENT) { return r; } // verify tag tid ordering uint64_t next_tag_tid; r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid); if (r < 0) { return r; } if (tag_tid != next_tag_tid) { CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid); return -ESTALE; } uint64_t next_tag_class; r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class); if (r < 0) { return r; } if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) { // allocate a new tag class tag_class = next_tag_class; r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1); if (r < 0) { return r; } } else { // verify tag class range if (tag_class >= next_tag_class) { CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class); return -EINVAL; } } // prune expired tags r = expire_tags(hctx, nullptr); if (r < 0) { return r; } // update tag tid sequence r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1); if (r < 0) { return r; } // write tag structure cls::journal::Tag tag(tag_tid, tag_class, data); key = key_from_tag_tid(tag_tid); r = write_key(hctx, key, tag); if (r < 0) { return r; } return 0; } /** * Input: * @param start_after_tag_tid (uint64_t) - first tag tid * @param max_return (uint64_t) - max tags to return * @param client_id (std::string) - client id filter * @param tag_class (boost::optional - optional tag class filter * * Output: * std::set - collection of tags * @returns 0 on success, negative error code on failure */ int journal_tag_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t start_after_tag_tid; uint64_t max_return; std::string client_id; boost::optional tag_class(0); // handle compiler false positive about use-before-init tag_class = boost::none; try { auto iter = in->cbegin(); decode(start_after_tag_tid, iter); decode(max_return, iter); decode(client_id, iter); decode(tag_class, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } // calculate the minimum tag within client's commit position uint64_t minimum_tag_tid = std::numeric_limits::max(); cls::journal::Client client; int r = read_key(hctx, key_from_client_id(client_id), &client); if (r < 0) { return r; } for (auto object_position : client.commit_position.object_positions) { minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid); } // compute minimum tags in use per-class std::set tags; std::map minimum_tag_class_to_tids; typedef enum { TAG_PASS_CALCULATE_MINIMUMS, TAG_PASS_LIST, TAG_PASS_DONE } TagPass; int tag_pass = (minimum_tag_tid == std::numeric_limits::max() ? TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS); std::string last_read = HEADER_KEY_TAG_PREFIX; do { std::map vals; bool more; r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX, MAX_KEYS_READ, &vals, &more); if (r < 0 && r != -ENOENT) { CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str()); return r; } for (auto &val : vals) { cls::journal::Tag tag; auto iter = val.second.cbegin(); try { decode(tag, iter); } catch (const buffer::error &err) { CLS_ERR("error decoding tag: %s", val.first.c_str()); return -EIO; } if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) { minimum_tag_class_to_tids[tag.tag_class] = tag.tid; // completed calculation of tag class minimums if (tag.tid >= minimum_tag_tid) { vals.clear(); more = false; break; } } else if (tag_pass == TAG_PASS_LIST) { if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) { continue; } if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] && (!tag_class || *tag_class == tag.tag_class)) { tags.insert(tag); } if (tags.size() >= max_return) { tag_pass = TAG_PASS_DONE; } } } if (tag_pass != TAG_PASS_DONE && !more) { last_read = HEADER_KEY_TAG_PREFIX; ++tag_pass; } else if (!vals.empty()) { last_read = vals.rbegin()->first; } } while (tag_pass != TAG_PASS_DONE); encode(tags, *out); return 0; } /** * Input: * @param soft_max_size (uint64_t) * * Output: * @returns 0 if object size less than max, negative error code otherwise */ int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { uint64_t soft_max_size; try { auto iter = in->cbegin(); decode(soft_max_size, iter); } catch (const buffer::error &err) { CLS_ERR("failed to decode input parameters: %s", err.what()); return -EINVAL; } uint64_t size; time_t mtime; int r = cls_cxx_stat(hctx, &size, &mtime); if (r == -ENOENT) { return 0; } else if (r < 0) { CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str()); return r; } if (size >= soft_max_size) { CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64, size, soft_max_size); return -EOVERFLOW; } return 0; } CLS_INIT(journal) { CLS_LOG(20, "Loaded journal class!"); cls_handle_t h_class; cls_method_handle_t h_journal_create; cls_method_handle_t h_journal_get_order; cls_method_handle_t h_journal_get_splay_width; cls_method_handle_t h_journal_get_pool_id; cls_method_handle_t h_journal_get_minimum_set; cls_method_handle_t h_journal_set_minimum_set; cls_method_handle_t h_journal_get_active_set; cls_method_handle_t h_journal_set_active_set; cls_method_handle_t h_journal_get_client; cls_method_handle_t h_journal_client_register; cls_method_handle_t h_journal_client_update_data; cls_method_handle_t h_journal_client_update_state; cls_method_handle_t h_journal_client_unregister; cls_method_handle_t h_journal_client_commit; cls_method_handle_t h_journal_client_list; cls_method_handle_t h_journal_get_next_tag_tid; cls_method_handle_t h_journal_get_tag; cls_method_handle_t h_journal_tag_create; cls_method_handle_t h_journal_tag_list; cls_method_handle_t h_journal_object_guard_append; cls_register("journal", &h_class); /// methods for journal.$journal_id objects cls_register_cxx_method(h_class, "create", CLS_METHOD_RD | CLS_METHOD_WR, journal_create, &h_journal_create); cls_register_cxx_method(h_class, "get_order", CLS_METHOD_RD, journal_get_order, &h_journal_get_order); cls_register_cxx_method(h_class, "get_splay_width", CLS_METHOD_RD, journal_get_splay_width, &h_journal_get_splay_width); cls_register_cxx_method(h_class, "get_pool_id", CLS_METHOD_RD, journal_get_pool_id, &h_journal_get_pool_id); cls_register_cxx_method(h_class, "get_minimum_set", CLS_METHOD_RD, journal_get_minimum_set, &h_journal_get_minimum_set); cls_register_cxx_method(h_class, "set_minimum_set", CLS_METHOD_RD | CLS_METHOD_WR, journal_set_minimum_set, &h_journal_set_minimum_set); cls_register_cxx_method(h_class, "get_active_set", CLS_METHOD_RD, journal_get_active_set, &h_journal_get_active_set); cls_register_cxx_method(h_class, "set_active_set", CLS_METHOD_RD | CLS_METHOD_WR, journal_set_active_set, &h_journal_set_active_set); cls_register_cxx_method(h_class, "get_client", CLS_METHOD_RD, journal_get_client, &h_journal_get_client); cls_register_cxx_method(h_class, "client_register", CLS_METHOD_RD | CLS_METHOD_WR, journal_client_register, &h_journal_client_register); cls_register_cxx_method(h_class, "client_update_data", CLS_METHOD_RD | CLS_METHOD_WR, journal_client_update_data, &h_journal_client_update_data); cls_register_cxx_method(h_class, "client_update_state", CLS_METHOD_RD | CLS_METHOD_WR, journal_client_update_state, &h_journal_client_update_state); cls_register_cxx_method(h_class, "client_unregister", CLS_METHOD_RD | CLS_METHOD_WR, journal_client_unregister, &h_journal_client_unregister); cls_register_cxx_method(h_class, "client_commit", CLS_METHOD_RD | CLS_METHOD_WR, journal_client_commit, &h_journal_client_commit); cls_register_cxx_method(h_class, "client_list", CLS_METHOD_RD, journal_client_list, &h_journal_client_list); cls_register_cxx_method(h_class, "get_next_tag_tid", CLS_METHOD_RD, journal_get_next_tag_tid, &h_journal_get_next_tag_tid); cls_register_cxx_method(h_class, "get_tag", CLS_METHOD_RD, journal_get_tag, &h_journal_get_tag); cls_register_cxx_method(h_class, "tag_create", CLS_METHOD_RD | CLS_METHOD_WR, journal_tag_create, &h_journal_tag_create); cls_register_cxx_method(h_class, "tag_list", CLS_METHOD_RD, journal_tag_list, &h_journal_tag_list); /// methods for journal_data.$journal_id.$object_id objects cls_register_cxx_method(h_class, "guard_append", CLS_METHOD_RD | CLS_METHOD_WR, journal_object_guard_append, &h_journal_object_guard_append); }