diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/cls/journal | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cls/journal')
-rw-r--r-- | src/cls/journal/cls_journal.cc | 1314 | ||||
-rw-r--r-- | src/cls/journal/cls_journal_client.cc | 507 | ||||
-rw-r--r-- | src/cls/journal/cls_journal_client.h | 109 | ||||
-rw-r--r-- | src/cls/journal/cls_journal_types.cc | 199 | ||||
-rw-r--r-- | src/cls/journal/cls_journal_types.h | 157 |
5 files changed, 2286 insertions, 0 deletions
diff --git a/src/cls/journal/cls_journal.cc b/src/cls/journal/cls_journal.cc new file mode 100644 index 000000000..1479e1de6 --- /dev/null +++ b/src/cls/journal/cls_journal.cc @@ -0,0 +1,1314 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include "common/errno.h" +#include "objclass/objclass.h" +#include "cls/journal/cls_journal_types.h" +#include <errno.h> +#include <map> +#include <string> +#include <sstream> + +CLS_VER(1, 0) +CLS_NAME(journal) + +using std::string; + +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; + +namespace { + +static const uint64_t MAX_KEYS_READ = 64; + +static const std::string HEADER_KEY_ORDER = "order"; +static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width"; +static const std::string HEADER_KEY_POOL_ID = "pool_id"; +static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set"; +static const std::string HEADER_KEY_ACTIVE_SET = "active_set"; +static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid"; +static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class"; +static const std::string HEADER_KEY_CLIENT_PREFIX = "client_"; +static const std::string HEADER_KEY_TAG_PREFIX = "tag_"; + +std::string to_hex(uint64_t value) { + std::ostringstream oss; + oss << std::setw(16) << std::setfill('0') << std::hex << value; + return oss.str(); +} + +std::string key_from_client_id(const std::string &client_id) { + return HEADER_KEY_CLIENT_PREFIX + client_id; +} + +std::string key_from_tag_tid(uint64_t tag_tid) { + return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid); +} + +uint64_t tag_tid_from_key(const std::string &key) { + std::istringstream iss(key); + uint64_t id; + iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id; + return id; +} + +template <typename T> +int read_key(cls_method_context_t hctx, const string &key, T *t, + bool ignore_enoent = false) { + bufferlist bl; + int r = cls_cxx_map_get_val(hctx, key, &bl); + if (r == -ENOENT) { + if (ignore_enoent) { + r = 0; + } + return r; + } else if (r < 0) { + CLS_ERR("failed to get omap key: %s", key.c_str()); + return r; + } + + try { + auto iter = bl.cbegin(); + decode(*t, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + return 0; +} + +template <typename T> +int write_key(cls_method_context_t hctx, const string &key, const T &t) { + bufferlist bl; + encode(t, bl); + + int r = cls_cxx_map_set_val(hctx, key, &bl); + if (r < 0) { + CLS_ERR("failed to set omap key: %s", key.c_str()); + return r; + } + return 0; +} + +int remove_key(cls_method_context_t hctx, const string &key) { + int r = cls_cxx_map_remove_key(hctx, key); + if (r < 0 && r != -ENOENT) { + CLS_ERR("failed to remove key: %s", key.c_str()); + return r; + } + return 0; +} + +int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) { + + std::string skip_client_key; + if (skip_client_id != nullptr) { + skip_client_key = key_from_client_id(*skip_client_id); + } + + uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max(); + std::string last_read = ""; + bool more; + do { + std::map<std::string, bufferlist> vals; + int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX, + MAX_KEYS_READ, &vals, &more); + if (r < 0 && r != -ENOENT) { + CLS_ERR("failed to retrieve registered clients: %s", + cpp_strerror(r).c_str()); + return r; + } + + for (auto &val : vals) { + // if we are removing a client, skip its commit positions + if (val.first == skip_client_key) { + continue; + } + + cls::journal::Client client; + auto iter = val.second.cbegin(); + try { + decode(client, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("error decoding registered client: %s", + val.first.c_str()); + return -EIO; + } + + if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + // don't allow a disconnected client to prevent pruning + continue; + } else if (client.commit_position.object_positions.empty()) { + // cannot prune if one or more clients has an empty commit history + return 0; + } + + for (auto object_position : client.commit_position.object_positions) { + minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid); + } + } + if (!vals.empty()) { + last_read = vals.rbegin()->first; + } + } while (more); + + // cannot expire tags if a client hasn't committed yet + if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) { + return 0; + } + + // compute the minimum in-use tag for each class + std::map<uint64_t, uint64_t> minimum_tag_class_to_tids; + typedef enum { TAG_PASS_CALCULATE_MINIMUMS, + TAG_PASS_SCRUB, + TAG_PASS_DONE } TagPass; + int tag_pass = TAG_PASS_CALCULATE_MINIMUMS; + last_read = HEADER_KEY_TAG_PREFIX; + do { + std::map<std::string, bufferlist> vals; + int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX, + MAX_KEYS_READ, &vals, &more); + if (r < 0 && r != -ENOENT) { + CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str()); + return r; + } + + for (auto &val : vals) { + cls::journal::Tag tag; + auto iter = val.second.cbegin(); + try { + decode(tag, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("error decoding tag: %s", val.first.c_str()); + return -EIO; + } + + if (tag.tid != tag_tid_from_key(val.first)) { + CLS_ERR("tag tid mismatched: %s", val.first.c_str()); + return -EINVAL; + } + + if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) { + minimum_tag_class_to_tids[tag.tag_class] = tag.tid; + } else if (tag_pass == TAG_PASS_SCRUB && + tag.tid < minimum_tag_class_to_tids[tag.tag_class]) { + r = remove_key(hctx, val.first); + if (r < 0) { + return r; + } + } + + if (tag.tid >= minimum_tag_tid) { + // no need to check for tag classes beyond this point + vals.clear(); + more = false; + break; + } + } + + if (tag_pass != TAG_PASS_DONE && !more) { + last_read = HEADER_KEY_TAG_PREFIX; + ++tag_pass; + } else if (!vals.empty()) { + last_read = vals.rbegin()->first; + } + } while (tag_pass != TAG_PASS_DONE); + return 0; +} + +int get_client_list_range(cls_method_context_t hctx, + std::set<cls::journal::Client> *clients, + std::string start_after, uint64_t max_return) { + std::string last_read; + if (!start_after.empty()) { + last_read = key_from_client_id(start_after); + } + + std::map<std::string, bufferlist> vals; + bool more; + int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX, + max_return, &vals, &more); + if (r < 0) { + CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str()); + return r; + } + + for (std::map<std::string, bufferlist>::iterator it = vals.begin(); + it != vals.end(); ++it) { + try { + auto iter = it->second.cbegin(); + + cls::journal::Client client; + decode(client, iter); + clients->insert(client); + } catch (const ceph::buffer::error &err) { + CLS_ERR("could not decode client '%s': %s", it->first.c_str(), + err.what()); + return -EIO; + } + } + + return 0; +} + +int find_min_commit_position(cls_method_context_t hctx, + cls::journal::ObjectSetPosition *minset) { + int r; + bool valid = false; + std::string start_after = ""; + uint64_t tag_tid = 0, entry_tid = 0; + + while (true) { + std::set<cls::journal::Client> batch; + + r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN); + if ((r < 0) || batch.empty()) { + break; + } + + start_after = batch.rbegin()->id; + // update the (minimum) commit position from this batch of clients + for (const auto &client : batch) { + if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + continue; + } + const auto &object_set_position = client.commit_position; + if (object_set_position.object_positions.empty()) { + *minset = cls::journal::ObjectSetPosition(); + break; + } + cls::journal::ObjectPosition first = object_set_position.object_positions.front(); + + // least tag_tid (or least entry_tid for matching tag_tid) + if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) { + tag_tid = first.tag_tid; + entry_tid = first.entry_tid; + *minset = cls::journal::ObjectSetPosition(object_set_position); + valid = true; + } + } + + // got the last batch, we're done + if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) { + break; + } + } + + return r; +} + +} // anonymous namespace + +/** + * Input: + * @param order (uint8_t) - bits to shift to compute the object max size + * @param splay width (uint8_t) - number of active journal objects + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + uint8_t order; + uint8_t splay_width; + int64_t pool_id; + try { + auto iter = in->cbegin(); + decode(order, iter); + decode(splay_width, iter); + decode(pool_id, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + bufferlist stored_orderbl; + int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl); + if (r >= 0) { + CLS_ERR("journal already exists"); + return -EEXIST; + } else if (r != -ENOENT) { + return r; + } + + r = write_key(hctx, HEADER_KEY_ORDER, order); + if (r < 0) { + return r; + } + + r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width); + if (r < 0) { + return r; + } + + r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id); + if (r < 0) { + return r; + } + + uint64_t object_set = 0; + r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set); + if (r < 0) { + return r; + } + + r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set); + if (r < 0) { + return r; + } + + uint64_t tag_id = 0; + r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id); + if (r < 0) { + return r; + } + + r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * none + * + * Output: + * order (uint8_t) + * @returns 0 on success, negative error code on failure + */ +int journal_get_order(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint8_t order; + int r = read_key(hctx, HEADER_KEY_ORDER, &order); + if (r < 0) { + return r; + } + + encode(order, *out); + return 0; +} + +/** + * Input: + * none + * + * Output: + * splay_width (uint8_t) + * @returns 0 on success, negative error code on failure + */ +int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint8_t splay_width; + int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width); + if (r < 0) { + return r; + } + + encode(splay_width, *out); + return 0; +} + +/** + * Input: + * none + * + * Output: + * pool_id (int64_t) + * @returns 0 on success, negative error code on failure + */ +int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + int64_t pool_id = 0; + int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id); + if (r < 0) { + return r; + } + + encode(pool_id, *out); + return 0; +} + +/** + * Input: + * none + * + * Output: + * object set (uint64_t) + * @returns 0 on success, negative error code on failure + */ +int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t minimum_set; + int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set); + if (r < 0) { + return r; + } + + encode(minimum_set, *out); + return 0; +} + +/** + * Input: + * @param object set (uint64_t) + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t object_set; + try { + auto iter = in->cbegin(); + decode(object_set, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + uint64_t current_active_set; + int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, ¤t_active_set); + if (r < 0) { + return r; + } + + if (current_active_set < object_set) { + CLS_LOG(10, "active object set earlier than minimum: %" PRIu64 + " < %" PRIu64, current_active_set, object_set); + return -EINVAL; + } + + uint64_t current_minimum_set; + r = read_key(hctx, HEADER_KEY_MINIMUM_SET, ¤t_minimum_set); + if (r < 0) { + return r; + } + + if (object_set == current_minimum_set) { + return 0; + } else if (object_set < current_minimum_set) { + CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64, + object_set, current_minimum_set); + return -ESTALE; + } + + r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * none + * + * Output: + * object set (uint64_t) + * @returns 0 on success, negative error code on failure + */ +int journal_get_active_set(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t active_set; + int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set); + if (r < 0) { + return r; + } + + encode(active_set, *out); + return 0; +} + +/** + * Input: + * @param object set (uint64_t) + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_set_active_set(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t object_set; + try { + auto iter = in->cbegin(); + decode(object_set, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + uint64_t current_minimum_set; + int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, ¤t_minimum_set); + if (r < 0) { + return r; + } + + if (current_minimum_set > object_set) { + CLS_ERR("minimum object set later than active: %" PRIu64 + " > %" PRIu64, current_minimum_set, object_set); + return -EINVAL; + } + + uint64_t current_active_set; + r = read_key(hctx, HEADER_KEY_ACTIVE_SET, ¤t_active_set); + if (r < 0) { + return r; + } + + if (object_set == current_active_set) { + return 0; + } else if (object_set < current_active_set) { + CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64, + object_set, current_active_set); + return -ESTALE; + } + + r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param id (string) - unique client id + * + * Output: + * cls::journal::Client + * @returns 0 on success, negative error code on failure + */ +int journal_get_client(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string id; + try { + auto iter = in->cbegin(); + decode(id, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::string key(key_from_client_id(id)); + cls::journal::Client client; + int r = read_key(hctx, key, &client); + if (r < 0) { + return r; + } + + encode(client, *out); + return 0; +} + +/** + * Input: + * @param id (string) - unique client id + * @param data (bufferlist) - opaque data associated to client + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_client_register(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string id; + bufferlist data; + try { + auto iter = in->cbegin(); + decode(id, iter); + decode(data, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + uint8_t order; + int r = read_key(hctx, HEADER_KEY_ORDER, &order); + if (r < 0) { + return r; + } + + std::string key(key_from_client_id(id)); + bufferlist stored_clientbl; + r = cls_cxx_map_get_val(hctx, key, &stored_clientbl); + if (r >= 0) { + CLS_ERR("duplicate client id: %s", id.c_str()); + return -EEXIST; + } else if (r != -ENOENT) { + return r; + } + + cls::journal::ObjectSetPosition minset; + r = find_min_commit_position(hctx, &minset); + if (r < 0) + return r; + + cls::journal::Client client(id, data, minset); + r = write_key(hctx, key, client); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param id (string) - unique client id + * @param data (bufferlist) - opaque data associated to client + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_client_update_data(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string id; + bufferlist data; + try { + auto iter = in->cbegin(); + decode(id, iter); + decode(data, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::string key(key_from_client_id(id)); + cls::journal::Client client; + int r = read_key(hctx, key, &client); + if (r < 0) { + return r; + } + + client.data = data; + r = write_key(hctx, key, client); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param id (string) - unique client id + * @param state (uint8_t) - client state + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_client_update_state(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string id; + cls::journal::ClientState state; + bufferlist data; + try { + auto iter = in->cbegin(); + decode(id, iter); + uint8_t state_raw; + decode(state_raw, iter); + state = static_cast<cls::journal::ClientState>(state_raw); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::string key(key_from_client_id(id)); + cls::journal::Client client; + int r = read_key(hctx, key, &client); + if (r < 0) { + return r; + } + + client.state = state; + r = write_key(hctx, key, client); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param id (string) - unique client id + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_client_unregister(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string id; + try { + auto iter = in->cbegin(); + decode(id, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::string key(key_from_client_id(id)); + bufferlist bl; + int r = cls_cxx_map_get_val(hctx, key, &bl); + if (r < 0) { + CLS_ERR("client is not registered: %s", id.c_str()); + return r; + } + + r = cls_cxx_map_remove_key(hctx, key); + if (r < 0) { + CLS_ERR("failed to remove omap key: %s", key.c_str()); + return r; + } + + // prune expired tags + r = expire_tags(hctx, &id); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param client_id (uint64_t) - unique client id + * @param commit_position (ObjectSetPosition) + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_client_commit(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string id; + cls::journal::ObjectSetPosition commit_position; + try { + auto iter = in->cbegin(); + decode(id, iter); + decode(commit_position, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + uint8_t splay_width; + int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width); + if (r < 0) { + return r; + } + if (commit_position.object_positions.size() > splay_width) { + CLS_ERR("too many object positions"); + return -EINVAL; + } + + std::string key(key_from_client_id(id)); + cls::journal::Client client; + r = read_key(hctx, key, &client); + if (r < 0) { + return r; + } + + if (client.commit_position == commit_position) { + return 0; + } + + client.commit_position = commit_position; + r = write_key(hctx, key, client); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param start_after (string) + * @param max_return (uint64_t) + * + * Output: + * clients (set<cls::journal::Client>) - collection of registered clients + * @returns 0 on success, negative error code on failure + */ +int journal_client_list(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + std::string start_after; + uint64_t max_return; + try { + auto iter = in->cbegin(); + decode(start_after, iter); + decode(max_return, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::set<cls::journal::Client> clients; + int r = get_client_list_range(hctx, &clients, start_after, max_return); + if (r < 0) + return r; + + encode(clients, *out); + return 0; +} + +/** + * Input: + * none + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t tag_tid; + int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid); + if (r < 0) { + return r; + } + + encode(tag_tid, *out); + return 0; +} + +/** + * Input: + * @param tag_tid (uint64_t) + * + * Output: + * cls::journal::Tag + * @returns 0 on success, negative error code on failure + */ +int journal_get_tag(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t tag_tid; + try { + auto iter = in->cbegin(); + decode(tag_tid, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::string key(key_from_tag_tid(tag_tid)); + cls::journal::Tag tag; + int r = read_key(hctx, key, &tag); + if (r < 0) { + return r; + } + + encode(tag, *out); + return 0; +} + +/** + * Input: + * @param tag_tid (uint64_t) + * @param tag_class (uint64_t) + * @param data (bufferlist) + * + * Output: + * @returns 0 on success, negative error code on failure + */ +int journal_tag_create(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t tag_tid; + uint64_t tag_class; + bufferlist data; + try { + auto iter = in->cbegin(); + decode(tag_tid, iter); + decode(tag_class, iter); + decode(data, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + std::string key(key_from_tag_tid(tag_tid)); + bufferlist stored_tag_bl; + int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl); + if (r >= 0) { + CLS_ERR("duplicate tag id: %" PRIu64, tag_tid); + return -EEXIST; + } else if (r != -ENOENT) { + return r; + } + + // verify tag tid ordering + uint64_t next_tag_tid; + r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid); + if (r < 0) { + return r; + } + if (tag_tid != next_tag_tid) { + CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid); + return -ESTALE; + } + + uint64_t next_tag_class; + r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class); + if (r < 0) { + return r; + } + + if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) { + // allocate a new tag class + tag_class = next_tag_class; + r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1); + if (r < 0) { + return r; + } + } else { + // verify tag class range + if (tag_class >= next_tag_class) { + CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class); + return -EINVAL; + } + } + + // prune expired tags + r = expire_tags(hctx, nullptr); + if (r < 0) { + return r; + } + + // update tag tid sequence + r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1); + if (r < 0) { + return r; + } + + // write tag structure + cls::journal::Tag tag(tag_tid, tag_class, data); + key = key_from_tag_tid(tag_tid); + r = write_key(hctx, key, tag); + if (r < 0) { + return r; + } + return 0; +} + +/** + * Input: + * @param start_after_tag_tid (uint64_t) - first tag tid + * @param max_return (uint64_t) - max tags to return + * @param client_id (std::string) - client id filter + * @param tag_class (boost::optional<uint64_t> - optional tag class filter + * + * Output: + * std::set<cls::journal::Tag> - collection of tags + * @returns 0 on success, negative error code on failure + */ +int journal_tag_list(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t start_after_tag_tid; + uint64_t max_return; + std::string client_id; + boost::optional<uint64_t> tag_class(0); + + // handle compiler false positive about use-before-init + tag_class = boost::none; + try { + auto iter = in->cbegin(); + decode(start_after_tag_tid, iter); + decode(max_return, iter); + decode(client_id, iter); + decode(tag_class, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + // calculate the minimum tag within client's commit position + uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max(); + cls::journal::Client client; + int r = read_key(hctx, key_from_client_id(client_id), &client); + if (r < 0) { + return r; + } + + for (auto object_position : client.commit_position.object_positions) { + minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid); + } + + // compute minimum tags in use per-class + std::set<cls::journal::Tag> tags; + std::map<uint64_t, uint64_t> minimum_tag_class_to_tids; + typedef enum { TAG_PASS_CALCULATE_MINIMUMS, + TAG_PASS_LIST, + TAG_PASS_DONE } TagPass; + int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ? + TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS); + std::string last_read = HEADER_KEY_TAG_PREFIX; + do { + std::map<std::string, bufferlist> vals; + bool more; + r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX, + MAX_KEYS_READ, &vals, &more); + if (r < 0 && r != -ENOENT) { + CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str()); + return r; + } + + for (auto &val : vals) { + cls::journal::Tag tag; + auto iter = val.second.cbegin(); + try { + decode(tag, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("error decoding tag: %s", val.first.c_str()); + return -EIO; + } + + if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) { + minimum_tag_class_to_tids[tag.tag_class] = tag.tid; + + // completed calculation of tag class minimums + if (tag.tid >= minimum_tag_tid) { + vals.clear(); + more = false; + break; + } + } else if (tag_pass == TAG_PASS_LIST) { + if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) { + continue; + } + + if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] && + (!tag_class || *tag_class == tag.tag_class)) { + tags.insert(tag); + } + if (tags.size() >= max_return) { + tag_pass = TAG_PASS_DONE; + } + } + } + + if (tag_pass != TAG_PASS_DONE && !more) { + last_read = HEADER_KEY_TAG_PREFIX; + ++tag_pass; + } else if (!vals.empty()) { + last_read = vals.rbegin()->first; + } + } while (tag_pass != TAG_PASS_DONE); + + encode(tags, *out); + return 0; +} + +/** + * Input: + * @param soft_max_size (uint64_t) + * + * Output: + * @returns 0 if object size less than max, negative error code otherwise + */ +int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t soft_max_size; + try { + auto iter = in->cbegin(); + decode(soft_max_size, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + uint64_t size; + time_t mtime; + int r = cls_cxx_stat(hctx, &size, &mtime); + if (r == -ENOENT) { + return 0; + } else if (r < 0) { + CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str()); + return r; + } + + if (size >= soft_max_size) { + CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64, + size, soft_max_size); + return -EOVERFLOW; + } + return 0; +} + +/** + * Input: + * @param soft_max_size (uint64_t) + * @param data (bufferlist) data to append + * + * Output: + * @returns 0 on success, negative error code on failure + * @returns -EOVERFLOW if object size is equal or more than soft_max_size + */ +int journal_object_append(cls_method_context_t hctx, bufferlist *in, + bufferlist *out) { + uint64_t soft_max_size; + bufferlist data; + try { + auto iter = in->cbegin(); + decode(soft_max_size, iter); + decode(data, iter); + } catch (const ceph::buffer::error &err) { + CLS_ERR("failed to decode input parameters: %s", err.what()); + return -EINVAL; + } + + uint64_t size = 0; + int r = cls_cxx_stat(hctx, &size, nullptr); + if (r < 0 && r != -ENOENT) { + CLS_ERR("append: failed to stat object: %s", cpp_strerror(r).c_str()); + return r; + } + + if (size >= soft_max_size) { + CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64, + size, soft_max_size); + return -EOVERFLOW; + } + + auto offset = size; + r = cls_cxx_write2(hctx, offset, data.length(), &data, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + if (r < 0) { + CLS_ERR("append: error when writing: %s", cpp_strerror(r).c_str()); + return r; + } + + if (cls_get_min_compatible_client(hctx) < ceph_release_t::octopus) { + return 0; + } + + auto min_alloc_size = cls_get_osd_min_alloc_size(hctx); + if (min_alloc_size == 0) { + min_alloc_size = 8; + } + + auto stripe_width = cls_get_pool_stripe_width(hctx); + if (stripe_width > 0) { + min_alloc_size = round_up_to(min_alloc_size, stripe_width); + } + + CLS_LOG(20, "pad to %" PRIu64, min_alloc_size); + + auto end = offset + data.length(); + auto new_end = round_up_to(end, min_alloc_size); + if (new_end == end) { + return 0; + } + + r = cls_cxx_truncate(hctx, new_end); + if (r < 0) { + CLS_ERR("append: error when truncating: %s", cpp_strerror(r).c_str()); + return r; + } + + return 0; +} + +CLS_INIT(journal) +{ + CLS_LOG(20, "Loaded journal class!"); + + cls_handle_t h_class; + cls_method_handle_t h_journal_create; + cls_method_handle_t h_journal_get_order; + cls_method_handle_t h_journal_get_splay_width; + cls_method_handle_t h_journal_get_pool_id; + cls_method_handle_t h_journal_get_minimum_set; + cls_method_handle_t h_journal_set_minimum_set; + cls_method_handle_t h_journal_get_active_set; + cls_method_handle_t h_journal_set_active_set; + cls_method_handle_t h_journal_get_client; + cls_method_handle_t h_journal_client_register; + cls_method_handle_t h_journal_client_update_data; + cls_method_handle_t h_journal_client_update_state; + cls_method_handle_t h_journal_client_unregister; + cls_method_handle_t h_journal_client_commit; + cls_method_handle_t h_journal_client_list; + cls_method_handle_t h_journal_get_next_tag_tid; + cls_method_handle_t h_journal_get_tag; + cls_method_handle_t h_journal_tag_create; + cls_method_handle_t h_journal_tag_list; + cls_method_handle_t h_journal_object_guard_append; + cls_method_handle_t h_journal_object_append; + + cls_register("journal", &h_class); + + /// methods for journal.$journal_id objects + cls_register_cxx_method(h_class, "create", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_create, &h_journal_create); + cls_register_cxx_method(h_class, "get_order", + CLS_METHOD_RD, + journal_get_order, &h_journal_get_order); + cls_register_cxx_method(h_class, "get_splay_width", + CLS_METHOD_RD, + journal_get_splay_width, &h_journal_get_splay_width); + cls_register_cxx_method(h_class, "get_pool_id", + CLS_METHOD_RD, + journal_get_pool_id, &h_journal_get_pool_id); + cls_register_cxx_method(h_class, "get_minimum_set", + CLS_METHOD_RD, + journal_get_minimum_set, + &h_journal_get_minimum_set); + cls_register_cxx_method(h_class, "set_minimum_set", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_set_minimum_set, + &h_journal_set_minimum_set); + cls_register_cxx_method(h_class, "get_active_set", + CLS_METHOD_RD, + journal_get_active_set, + &h_journal_get_active_set); + cls_register_cxx_method(h_class, "set_active_set", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_set_active_set, + &h_journal_set_active_set); + + cls_register_cxx_method(h_class, "get_client", + CLS_METHOD_RD, + journal_get_client, &h_journal_get_client); + cls_register_cxx_method(h_class, "client_register", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_client_register, &h_journal_client_register); + cls_register_cxx_method(h_class, "client_update_data", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_client_update_data, + &h_journal_client_update_data); + cls_register_cxx_method(h_class, "client_update_state", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_client_update_state, + &h_journal_client_update_state); + cls_register_cxx_method(h_class, "client_unregister", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_client_unregister, + &h_journal_client_unregister); + cls_register_cxx_method(h_class, "client_commit", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_client_commit, &h_journal_client_commit); + cls_register_cxx_method(h_class, "client_list", + CLS_METHOD_RD, + journal_client_list, &h_journal_client_list); + + cls_register_cxx_method(h_class, "get_next_tag_tid", + CLS_METHOD_RD, + journal_get_next_tag_tid, + &h_journal_get_next_tag_tid); + cls_register_cxx_method(h_class, "get_tag", + CLS_METHOD_RD, + journal_get_tag, &h_journal_get_tag); + cls_register_cxx_method(h_class, "tag_create", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_tag_create, &h_journal_tag_create); + cls_register_cxx_method(h_class, "tag_list", + CLS_METHOD_RD, + journal_tag_list, &h_journal_tag_list); + + /// methods for journal_data.$journal_id.$object_id objects + cls_register_cxx_method(h_class, "guard_append", + CLS_METHOD_RD | CLS_METHOD_WR, + journal_object_guard_append, + &h_journal_object_guard_append); + cls_register_cxx_method(h_class, "append", CLS_METHOD_RD | CLS_METHOD_WR, + journal_object_append, &h_journal_object_append); +} diff --git a/src/cls/journal/cls_journal_client.cc b/src/cls/journal/cls_journal_client.cc new file mode 100644 index 000000000..88f7ddb1f --- /dev/null +++ b/src/cls/journal/cls_journal_client.cc @@ -0,0 +1,507 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "cls/journal/cls_journal_client.h" +#include "include/rados/librados.hpp" +#include "include/buffer.h" +#include "include/Context.h" +#include "common/Cond.h" +#include <errno.h> + +namespace cls { +namespace journal { +namespace client { +using ceph::encode; +using ceph::decode; + +namespace { + +struct C_AioExec : public Context { + librados::IoCtx &ioctx; + std::string oid; + + C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid) + : ioctx(_ioctx), oid(_oid) { + } + + static void rados_callback(rados_completion_t c, void *arg) { + Context *ctx = reinterpret_cast<Context *>(arg); + ctx->complete(rados_aio_get_return_value(c)); + } +}; + +struct C_ClientList : public C_AioExec { + std::set<cls::journal::Client> *clients; + Context *on_finish; + bufferlist outbl; + + C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid, + std::set<cls::journal::Client> *_clients, + Context *_on_finish) + : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {} + + void send(const std::string &start_after) { + bufferlist inbl; + encode(start_after, inbl); + encode(JOURNAL_MAX_RETURN, inbl); + + librados::ObjectReadOperation op; + op.exec("journal", "client_list", inbl); + + outbl.clear(); + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(this, rados_callback); + int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); + ceph_assert(r == 0); + rados_completion->release(); + } + + void complete(int r) override { + if (r < 0) { + finish(r); + return; + } + + try { + auto iter = outbl.cbegin(); + std::set<cls::journal::Client> partial_clients; + decode(partial_clients, iter); + + std::string start_after; + if (!partial_clients.empty()) { + start_after = partial_clients.rbegin()->id; + clients->insert(partial_clients.begin(), partial_clients.end()); + } + + if (partial_clients.size() < JOURNAL_MAX_RETURN) { + finish(0); + } else { + send(start_after); + } + } catch (const buffer::error &err) { + finish(-EBADMSG); + } + } + + void finish(int r) override { + on_finish->complete(r); + delete this; + } +}; + +struct C_ImmutableMetadata : public C_AioExec { + uint8_t *order; + uint8_t *splay_width; + int64_t *pool_id; + Context *on_finish; + bufferlist outbl; + + C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid, + uint8_t *_order, uint8_t *_splay_width, + int64_t *_pool_id, Context *_on_finish) + : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width), + pool_id(_pool_id), on_finish(_on_finish) { + } + + void send() { + librados::ObjectReadOperation op; + bufferlist inbl; + op.exec("journal", "get_order", inbl); + op.exec("journal", "get_splay_width", inbl); + op.exec("journal", "get_pool_id", inbl); + + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(this, rados_callback); + int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); + ceph_assert(r == 0); + rados_completion->release(); + } + + void finish(int r) override { + if (r == 0) { + try { + auto iter = outbl.cbegin(); + decode(*order, iter); + decode(*splay_width, iter); + decode(*pool_id, iter); + } catch (const buffer::error &err) { + r = -EBADMSG; + } + } + on_finish->complete(r); + } +}; + +struct C_MutableMetadata : public C_AioExec { + uint64_t *minimum_set; + uint64_t *active_set; + C_ClientList *client_list; + bufferlist outbl; + + C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid, + uint64_t *_minimum_set, uint64_t *_active_set, + C_ClientList *_client_list) + : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set), + active_set(_active_set), client_list(_client_list) {} + + void send() { + librados::ObjectReadOperation op; + bufferlist inbl; + op.exec("journal", "get_minimum_set", inbl); + op.exec("journal", "get_active_set", inbl); + + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(this, rados_callback); + int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); + ceph_assert(r == 0); + rados_completion->release(); + } + + void finish(int r) override { + if (r == 0) { + try { + auto iter = outbl.cbegin(); + decode(*minimum_set, iter); + decode(*active_set, iter); + client_list->send(""); + } catch (const buffer::error &err) { + r = -EBADMSG; + } + } + if (r < 0) { + client_list->complete(r); + } + } +}; + + +} // anonymous namespace + +void create(librados::ObjectWriteOperation *op, + uint8_t order, uint8_t splay, int64_t pool_id) { + bufferlist bl; + encode(order, bl); + encode(splay, bl); + encode(pool_id, bl); + + op->exec("journal", "create", bl); +} + +int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order, + uint8_t splay, int64_t pool_id) { + librados::ObjectWriteOperation op; + create(&op, order, splay, pool_id); + + int r = ioctx.operate(oid, &op); + if (r < 0) { + return r; + } + return 0; +} + +void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid, + uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish) { + C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order, + splay_width, pool_id, + on_finish); + metadata->send(); +} + +void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid, + uint64_t *minimum_set, uint64_t *active_set, + std::set<cls::journal::Client> *clients, + Context *on_finish) { + C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish); + C_MutableMetadata *metadata = new C_MutableMetadata( + ioctx, oid, minimum_set, active_set, client_list); + metadata->send(); +} + +void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) { + bufferlist bl; + encode(object_set, bl); + op->exec("journal", "set_minimum_set", bl); +} + +void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) { + bufferlist bl; + encode(object_set, bl); + op->exec("journal", "set_active_set", bl); +} + +int get_client(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, cls::journal::Client *client) { + librados::ObjectReadOperation op; + get_client_start(&op, id); + + bufferlist out_bl; + int r = ioctx.operate(oid, &op, &out_bl); + if (r < 0) { + return r; + } + + auto iter = out_bl.cbegin(); + r = get_client_finish(&iter, client); + if (r < 0) { + return r; + } + return 0; +} + +void get_client_start(librados::ObjectReadOperation *op, + const std::string &id) { + bufferlist bl; + encode(id, bl); + op->exec("journal", "get_client", bl); +} + +int get_client_finish(bufferlist::const_iterator *iter, + cls::journal::Client *client) { + try { + decode(*client, *iter); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +int client_register(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, const bufferlist &data) { + librados::ObjectWriteOperation op; + client_register(&op, id, data); + return ioctx.operate(oid, &op); +} + +void client_register(librados::ObjectWriteOperation *op, + const std::string &id, const bufferlist &data) { + bufferlist bl; + encode(id, bl); + encode(data, bl); + op->exec("journal", "client_register", bl); +} + +int client_update_data(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, const bufferlist &data) { + librados::ObjectWriteOperation op; + client_update_data(&op, id, data); + return ioctx.operate(oid, &op); +} + +void client_update_data(librados::ObjectWriteOperation *op, + const std::string &id, const bufferlist &data) { + bufferlist bl; + encode(id, bl); + encode(data, bl); + op->exec("journal", "client_update_data", bl); +} + +int client_update_state(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, cls::journal::ClientState state) { + librados::ObjectWriteOperation op; + client_update_state(&op, id, state); + return ioctx.operate(oid, &op); +} + +void client_update_state(librados::ObjectWriteOperation *op, + const std::string &id, + cls::journal::ClientState state) { + bufferlist bl; + encode(id, bl); + encode(static_cast<uint8_t>(state), bl); + op->exec("journal", "client_update_state", bl); +} + +int client_unregister(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id) { + librados::ObjectWriteOperation op; + client_unregister(&op, id); + return ioctx.operate(oid, &op); +} + +void client_unregister(librados::ObjectWriteOperation *op, + const std::string &id) { + + bufferlist bl; + encode(id, bl); + op->exec("journal", "client_unregister", bl); +} + +void client_commit(librados::ObjectWriteOperation *op, const std::string &id, + const cls::journal::ObjectSetPosition &commit_position) { + bufferlist bl; + encode(id, bl); + encode(commit_position, bl); + op->exec("journal", "client_commit", bl); +} + +int client_list(librados::IoCtx &ioctx, const std::string &oid, + std::set<cls::journal::Client> *clients) { + C_SaferCond cond; + client_list(ioctx, oid, clients, &cond); + return cond.wait(); +} + +void client_list(librados::IoCtx &ioctx, const std::string &oid, + std::set<cls::journal::Client> *clients, Context *on_finish) { + C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish); + client_list->send(""); +} + +int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid, + uint64_t *tag_tid) { + librados::ObjectReadOperation op; + get_next_tag_tid_start(&op); + + bufferlist out_bl; + int r = ioctx.operate(oid, &op, &out_bl); + if (r < 0) { + return r; + } + + auto iter = out_bl.cbegin(); + r = get_next_tag_tid_finish(&iter, tag_tid); + if (r < 0) { + return r; + } + return 0; +} + +void get_next_tag_tid_start(librados::ObjectReadOperation *op) { + bufferlist bl; + op->exec("journal", "get_next_tag_tid", bl); +} + +int get_next_tag_tid_finish(bufferlist::const_iterator *iter, + uint64_t *tag_tid) { + try { + decode(*tag_tid, *iter); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +int get_tag(librados::IoCtx &ioctx, const std::string &oid, + uint64_t tag_tid, cls::journal::Tag *tag) { + librados::ObjectReadOperation op; + get_tag_start(&op, tag_tid); + + bufferlist out_bl; + int r = ioctx.operate(oid, &op, &out_bl); + if (r < 0) { + return r; + } + + auto iter = out_bl.cbegin(); + r = get_tag_finish(&iter, tag); + if (r < 0) { + return r; + } + return 0; +} + +void get_tag_start(librados::ObjectReadOperation *op, + uint64_t tag_tid) { + bufferlist bl; + encode(tag_tid, bl); + op->exec("journal", "get_tag", bl); +} + +int get_tag_finish(bufferlist::const_iterator *iter, cls::journal::Tag *tag) { + try { + decode(*tag, *iter); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +int tag_create(librados::IoCtx &ioctx, const std::string &oid, + uint64_t tag_tid, uint64_t tag_class, + const bufferlist &data) { + librados::ObjectWriteOperation op; + tag_create(&op, tag_tid, tag_class, data); + return ioctx.operate(oid, &op); +} + +void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid, + uint64_t tag_class, const bufferlist &data) { + bufferlist bl; + encode(tag_tid, bl); + encode(tag_class, bl); + encode(data, bl); + op->exec("journal", "tag_create", bl); +} + +int tag_list(librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, boost::optional<uint64_t> tag_class, + std::set<cls::journal::Tag> *tags) { + tags->clear(); + uint64_t start_after_tag_tid = 0; + while (true) { + librados::ObjectReadOperation op; + tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id, + tag_class); + + bufferlist out_bl; + int r = ioctx.operate(oid, &op, &out_bl); + if (r < 0) { + return r; + } + + auto iter = out_bl.cbegin(); + std::set<cls::journal::Tag> decode_tags; + r = tag_list_finish(&iter, &decode_tags); + if (r < 0) { + return r; + } + + tags->insert(decode_tags.begin(), decode_tags.end()); + if (decode_tags.size() < JOURNAL_MAX_RETURN) { + break; + } + } + return 0; +} + +void tag_list_start(librados::ObjectReadOperation *op, + uint64_t start_after_tag_tid, uint64_t max_return, + const std::string &client_id, + boost::optional<uint64_t> tag_class) { + bufferlist bl; + encode(start_after_tag_tid, bl); + encode(max_return, bl); + encode(client_id, bl); + encode(tag_class, bl); + op->exec("journal", "tag_list", bl); +} + +int tag_list_finish(bufferlist::const_iterator *iter, + std::set<cls::journal::Tag> *tags) { + try { + decode(*tags, *iter); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) { + bufferlist bl; + encode(soft_max_size, bl); + op->exec("journal", "guard_append", bl); +} + +void append(librados::ObjectWriteOperation *op, uint64_t soft_max_size, + bufferlist &data) { + bufferlist bl; + encode(soft_max_size, bl); + encode(data, bl); + + op->exec("journal", "append", bl); +} + +} // namespace client +} // namespace journal +} // namespace cls diff --git a/src/cls/journal/cls_journal_client.h b/src/cls/journal/cls_journal_client.h new file mode 100644 index 000000000..f8ad9db51 --- /dev/null +++ b/src/cls/journal/cls_journal_client.h @@ -0,0 +1,109 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_JOURNAL_CLIENT_H +#define CEPH_CLS_JOURNAL_CLIENT_H + +#include "include/rados/librados_fwd.hpp" +#include "cls/journal/cls_journal_types.h" +#include <set> +#include <boost/optional.hpp> + +class Context; + +namespace cls { +namespace journal { +namespace client { + +void create(librados::ObjectWriteOperation *op, + uint8_t order, uint8_t splay, int64_t pool_id); +int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order, + uint8_t splay, int64_t pool_id); + +void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid, + uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish); +void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid, + uint64_t *minimum_set, uint64_t *active_set, + std::set<cls::journal::Client> *clients, + Context *on_finish); + +void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set); +void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set); + +// journal client helpers +int get_client(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, cls::journal::Client *client); +void get_client_start(librados::ObjectReadOperation *op, + const std::string &id); +int get_client_finish(bufferlist::const_iterator *iter, + cls::journal::Client *client); + +int client_register(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, const bufferlist &data); +void client_register(librados::ObjectWriteOperation *op, + const std::string &id, const bufferlist &data); + +int client_update_data(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, const bufferlist &data); +void client_update_data(librados::ObjectWriteOperation *op, + const std::string &id, const bufferlist &data); +int client_update_state(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id, cls::journal::ClientState state); +void client_update_state(librados::ObjectWriteOperation *op, + const std::string &id, + cls::journal::ClientState state); + +int client_unregister(librados::IoCtx &ioctx, const std::string &oid, + const std::string &id); +void client_unregister(librados::ObjectWriteOperation *op, + const std::string &id); + +void client_commit(librados::ObjectWriteOperation *op, const std::string &id, + const cls::journal::ObjectSetPosition &commit_position); + +int client_list(librados::IoCtx &ioctx, const std::string &oid, + std::set<cls::journal::Client> *clients); +void client_list(librados::IoCtx &ioctx, const std::string &oid, + std::set<cls::journal::Client> *clients, Context *on_finish); + +// journal tag helpers +int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid, + uint64_t *tag_tid); +void get_next_tag_tid_start(librados::ObjectReadOperation *op); +int get_next_tag_tid_finish(bufferlist::const_iterator *iter, + uint64_t *tag_tid); + +int get_tag(librados::IoCtx &ioctx, const std::string &oid, + uint64_t tag_tid, cls::journal::Tag *tag); +void get_tag_start(librados::ObjectReadOperation *op, + uint64_t tag_tid); +int get_tag_finish(bufferlist::const_iterator *iter, cls::journal::Tag *tag); + +int tag_create(librados::IoCtx &ioctx, const std::string &oid, + uint64_t tag_tid, uint64_t tag_class, + const bufferlist &data); +void tag_create(librados::ObjectWriteOperation *op, + uint64_t tag_tid, uint64_t tag_class, + const bufferlist &data); + +int tag_list(librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, boost::optional<uint64_t> tag_class, + std::set<cls::journal::Tag> *tags); +void tag_list_start(librados::ObjectReadOperation *op, + uint64_t start_after_tag_tid, uint64_t max_return, + const std::string &client_id, + boost::optional<uint64_t> tag_class); +int tag_list_finish(bufferlist::const_iterator *iter, + std::set<cls::journal::Tag> *tags); + +// journal entry helpers +void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size); +void append(librados::ObjectWriteOperation *op, uint64_t soft_max_size, + bufferlist &data); + +} // namespace client +} // namespace journal +} // namespace cls + +#endif // CEPH_CLS_JOURNAL_CLIENT_H diff --git a/src/cls/journal/cls_journal_types.cc b/src/cls/journal/cls_journal_types.cc new file mode 100644 index 000000000..6e9dfde87 --- /dev/null +++ b/src/cls/journal/cls_journal_types.cc @@ -0,0 +1,199 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "cls/journal/cls_journal_types.h" +#include "include/stringify.h" +#include "common/Formatter.h" + +using ceph::bufferlist; +using ceph::Formatter; + +namespace cls { +namespace journal { + +void ObjectPosition::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(object_number, bl); + encode(tag_tid, bl); + encode(entry_tid, bl); + ENCODE_FINISH(bl); +} + +void ObjectPosition::decode(bufferlist::const_iterator& iter) { + DECODE_START(1, iter); + decode(object_number, iter); + decode(tag_tid, iter); + decode(entry_tid, iter); + DECODE_FINISH(iter); +} + +void ObjectPosition::dump(Formatter *f) const { + f->dump_unsigned("object_number", object_number); + f->dump_unsigned("tag_tid", tag_tid); + f->dump_unsigned("entry_tid", entry_tid); +} + +void ObjectPosition::generate_test_instances(std::list<ObjectPosition *> &o) { + o.push_back(new ObjectPosition()); + o.push_back(new ObjectPosition(1, 2, 3)); +} + +void ObjectSetPosition::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(object_positions, bl); + ENCODE_FINISH(bl); +} + +void ObjectSetPosition::decode(bufferlist::const_iterator& iter) { + DECODE_START(1, iter); + decode(object_positions, iter); + DECODE_FINISH(iter); +} + +void ObjectSetPosition::dump(Formatter *f) const { + f->open_array_section("object_positions"); + for (auto &pos : object_positions) { + f->open_object_section("object_position"); + pos.dump(f); + f->close_section(); + } + f->close_section(); +} + +void ObjectSetPosition::generate_test_instances( + std::list<ObjectSetPosition *> &o) { + o.push_back(new ObjectSetPosition()); + o.push_back(new ObjectSetPosition({{0, 1, 120}, {121, 2, 121}})); +} + +void Client::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(data, bl); + encode(commit_position, bl); + encode(static_cast<uint8_t>(state), bl); + ENCODE_FINISH(bl); +} + +void Client::decode(bufferlist::const_iterator& iter) { + DECODE_START(1, iter); + decode(id, iter); + decode(data, iter); + decode(commit_position, iter); + + uint8_t state_raw; + decode(state_raw, iter); + state = static_cast<ClientState>(state_raw); + DECODE_FINISH(iter); +} + +void Client::dump(Formatter *f) const { + f->dump_string("id", id); + + std::stringstream data_ss; + data.hexdump(data_ss); + f->dump_string("data", data_ss.str()); + + f->open_object_section("commit_position"); + commit_position.dump(f); + f->close_section(); + + f->dump_string("state", stringify(state)); +} + +void Client::generate_test_instances(std::list<Client *> &o) { + bufferlist data; + data.append(std::string(128, '1')); + + o.push_back(new Client()); + o.push_back(new Client("id", data)); + o.push_back(new Client("id", data, {{{1, 2, 120}, {2, 3, 121}}})); +} + +void Tag::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(tid, bl); + encode(tag_class, bl); + encode(data, bl); + ENCODE_FINISH(bl); +} + +void Tag::decode(bufferlist::const_iterator& iter) { + DECODE_START(1, iter); + decode(tid, iter); + decode(tag_class, iter); + decode(data, iter); + DECODE_FINISH(iter); +} + +void Tag::dump(Formatter *f) const { + f->dump_unsigned("tid", tid); + f->dump_unsigned("tag_class", tag_class); + + std::stringstream data_ss; + data.hexdump(data_ss); + f->dump_string("data", data_ss.str()); +} + +void Tag::generate_test_instances(std::list<Tag *> &o) { + o.push_back(new Tag()); + + bufferlist data; + data.append(std::string(128, '1')); + o.push_back(new Tag(123, 234, data)); +} + +std::ostream &operator<<(std::ostream &os, const ClientState &state) { + switch (state) { + case CLIENT_STATE_CONNECTED: + os << "connected"; + break; + case CLIENT_STATE_DISCONNECTED: + os << "disconnected"; + break; + default: + os << "unknown (" << static_cast<uint32_t>(state) << ")"; + break; + } + return os; +} + +std::ostream &operator<<(std::ostream &os, + const ObjectPosition &object_position) { + os << "[" + << "object_number=" << object_position.object_number << ", " + << "tag_tid=" << object_position.tag_tid << ", " + << "entry_tid=" << object_position.entry_tid << "]"; + return os; +} + +std::ostream &operator<<(std::ostream &os, + const ObjectSetPosition &object_set_position) { + os << "[positions=["; + std::string delim; + for (auto &object_position : object_set_position.object_positions) { + os << delim << object_position; + delim = ", "; + } + os << "]]"; + return os; +} + +std::ostream &operator<<(std::ostream &os, const Client &client) { + os << "[id=" << client.id << ", " + << "commit_position=" << client.commit_position << ", " + << "state=" << client.state << "]"; + return os; +} + +std::ostream &operator<<(std::ostream &os, const Tag &tag) { + os << "[tid=" << tag.tid << ", " + << "tag_class=" << tag.tag_class << ", " + << "data="; + tag.data.hexdump(os); + os << "]"; + return os; +} + +} // namespace journal +} // namespace cls diff --git a/src/cls/journal/cls_journal_types.h b/src/cls/journal/cls_journal_types.h new file mode 100644 index 000000000..f82d30c7e --- /dev/null +++ b/src/cls/journal/cls_journal_types.h @@ -0,0 +1,157 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_JOURNAL_TYPES_H +#define CEPH_CLS_JOURNAL_TYPES_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/encoding.h" +#include <iosfwd> +#include <list> +#include <string> + +namespace ceph { +class Formatter; +} + +namespace cls { +namespace journal { + +static const uint64_t JOURNAL_MAX_RETURN = 256; + +struct ObjectPosition { + uint64_t object_number; + uint64_t tag_tid; + uint64_t entry_tid; + + ObjectPosition() : object_number(0), tag_tid(0), entry_tid(0) {} + ObjectPosition(uint64_t _object_number, uint64_t _tag_tid, + uint64_t _entry_tid) + : object_number(_object_number), tag_tid(_tag_tid), entry_tid(_entry_tid) {} + + inline bool operator==(const ObjectPosition& rhs) const { + return (object_number == rhs.object_number && + tag_tid == rhs.tag_tid && + entry_tid == rhs.entry_tid); + } + inline bool operator!=(const ObjectPosition& rhs) const { + return !(*this == rhs); + } + + void encode(ceph::buffer::list& bl) const; + void decode(ceph::buffer::list::const_iterator& iter); + void dump(ceph::Formatter *f) const; + + inline bool operator<(const ObjectPosition &rhs) const { + if (object_number != rhs.object_number) { + return object_number < rhs.object_number; + } else if (tag_tid != rhs.tag_tid) { + return tag_tid < rhs.tag_tid; + } + return entry_tid < rhs.entry_tid; + } + + static void generate_test_instances(std::list<ObjectPosition *> &o); +}; + +typedef std::list<ObjectPosition> ObjectPositions; + +struct ObjectSetPosition { + // stored in most-recent -> least recent committed entry order + ObjectPositions object_positions; + + ObjectSetPosition() {} + ObjectSetPosition(const ObjectPositions &_object_positions) + : object_positions(_object_positions) {} + + void encode(ceph::buffer::list& bl) const; + void decode(ceph::buffer::list::const_iterator& iter); + void dump(ceph::Formatter *f) const; + + inline bool operator==(const ObjectSetPosition &rhs) const { + return (object_positions == rhs.object_positions); + } + + static void generate_test_instances(std::list<ObjectSetPosition *> &o); +}; + +enum ClientState { + CLIENT_STATE_CONNECTED = 0, + CLIENT_STATE_DISCONNECTED = 1 +}; + +struct Client { + std::string id; + ceph::buffer::list data; + ObjectSetPosition commit_position; + ClientState state; + + Client() : state(CLIENT_STATE_CONNECTED) {} + Client(const std::string& _id, const ceph::buffer::list &_data, + const ObjectSetPosition &_commit_position = ObjectSetPosition(), + ClientState _state = CLIENT_STATE_CONNECTED) + : id(_id), data(_data), commit_position(_commit_position), state(_state) {} + + inline bool operator==(const Client &rhs) const { + return (id == rhs.id && + data.contents_equal(rhs.data) && + commit_position == rhs.commit_position && + state == rhs.state); + } + inline bool operator<(const Client &rhs) const { + return (id < rhs.id); + } + + void encode(ceph::buffer::list& bl) const; + void decode(ceph::buffer::list::const_iterator& iter); + void dump(ceph::Formatter *f) const; + + static void generate_test_instances(std::list<Client *> &o); +}; + +struct Tag { + static const uint64_t TAG_CLASS_NEW = static_cast<uint64_t>(-1); + + uint64_t tid; + uint64_t tag_class; + ceph::buffer::list data; + + Tag() : tid(0), tag_class(0) {} + Tag(uint64_t tid, uint64_t tag_class, const ceph::buffer::list &data) + : tid(tid), tag_class(tag_class), data(data) {} + + inline bool operator==(const Tag &rhs) const { + return (tid == rhs.tid && + tag_class == rhs.tag_class && + data.contents_equal(rhs.data)); + } + inline bool operator<(const Tag &rhs) const { + return (tid < rhs.tid); + } + + void encode(ceph::buffer::list& bl) const; + void decode(ceph::buffer::list::const_iterator& iter); + void dump(ceph::Formatter *f) const; + + static void generate_test_instances(std::list<Tag *> &o); +}; + +WRITE_CLASS_ENCODER(ObjectPosition); +WRITE_CLASS_ENCODER(ObjectSetPosition); +WRITE_CLASS_ENCODER(Client); +WRITE_CLASS_ENCODER(Tag); + +std::ostream &operator<<(std::ostream &os, const ClientState &state); +std::ostream &operator<<(std::ostream &os, + const ObjectPosition &object_position); +std::ostream &operator<<(std::ostream &os, + const ObjectSetPosition &object_set_position); +std::ostream &operator<<(std::ostream &os, + const Client &client); +std::ostream &operator<<(std::ostream &os, const Tag &tag); + +} // namespace journal +} // namespace cls + +#endif // CEPH_CLS_JOURNAL_TYPES_H |