summaryrefslogtreecommitdiffstats
path: root/src/cls/journal
diff options
context:
space:
mode:
Diffstat (limited to 'src/cls/journal')
-rw-r--r--src/cls/journal/cls_journal.cc1230
-rw-r--r--src/cls/journal/cls_journal_client.cc498
-rw-r--r--src/cls/journal/cls_journal_client.h107
-rw-r--r--src/cls/journal/cls_journal_types.cc196
-rw-r--r--src/cls/journal/cls_journal_types.h154
5 files changed, 2185 insertions, 0 deletions
diff --git a/src/cls/journal/cls_journal.cc b/src/cls/journal/cls_journal.cc
new file mode 100644
index 00000000..ff909bea
--- /dev/null
+++ b/src/cls/journal/cls_journal.cc
@@ -0,0 +1,1230 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "common/errno.h"
+#include "objclass/objclass.h"
+#include "cls/journal/cls_journal_types.h"
+#include <errno.h>
+#include <map>
+#include <string>
+#include <sstream>
+
+CLS_VER(1, 0)
+CLS_NAME(journal)
+
+namespace {
+
+static const uint64_t MAX_KEYS_READ = 64;
+
+static const std::string HEADER_KEY_ORDER = "order";
+static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
+static const std::string HEADER_KEY_POOL_ID = "pool_id";
+static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
+static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
+static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
+static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
+static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
+static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
+
+std::string to_hex(uint64_t value) {
+ std::ostringstream oss;
+ oss << std::setw(16) << std::setfill('0') << std::hex << value;
+ return oss.str();
+}
+
+std::string key_from_client_id(const std::string &client_id) {
+ return HEADER_KEY_CLIENT_PREFIX + client_id;
+}
+
+std::string key_from_tag_tid(uint64_t tag_tid) {
+ return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
+}
+
+uint64_t tag_tid_from_key(const std::string &key) {
+ std::istringstream iss(key);
+ uint64_t id;
+ iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
+ return id;
+}
+
+template <typename T>
+int read_key(cls_method_context_t hctx, const string &key, T *t,
+ bool ignore_enoent = false) {
+ bufferlist bl;
+ int r = cls_cxx_map_get_val(hctx, key, &bl);
+ if (r == -ENOENT) {
+ if (ignore_enoent) {
+ r = 0;
+ }
+ return r;
+ } else if (r < 0) {
+ CLS_ERR("failed to get omap key: %s", key.c_str());
+ return r;
+ }
+
+ try {
+ auto iter = bl.cbegin();
+ decode(*t, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+ return 0;
+}
+
+template <typename T>
+int write_key(cls_method_context_t hctx, const string &key, const T &t) {
+ bufferlist bl;
+ encode(t, bl);
+
+ int r = cls_cxx_map_set_val(hctx, key, &bl);
+ if (r < 0) {
+ CLS_ERR("failed to set omap key: %s", key.c_str());
+ return r;
+ }
+ return 0;
+}
+
+int remove_key(cls_method_context_t hctx, const string &key) {
+ int r = cls_cxx_map_remove_key(hctx, key);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to remove key: %s", key.c_str());
+ return r;
+ }
+ return 0;
+}
+
+int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
+
+ std::string skip_client_key;
+ if (skip_client_id != nullptr) {
+ skip_client_key = key_from_client_id(*skip_client_id);
+ }
+
+ uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
+ std::string last_read = "";
+ bool more;
+ do {
+ std::map<std::string, bufferlist> vals;
+ int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
+ MAX_KEYS_READ, &vals, &more);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to retrieve registered clients: %s",
+ cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (auto &val : vals) {
+ // if we are removing a client, skip its commit positions
+ if (val.first == skip_client_key) {
+ continue;
+ }
+
+ cls::journal::Client client;
+ auto iter = val.second.cbegin();
+ try {
+ decode(client, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding registered client: %s",
+ val.first.c_str());
+ return -EIO;
+ }
+
+ if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ // don't allow a disconnected client to prevent pruning
+ continue;
+ } else if (client.commit_position.object_positions.empty()) {
+ // cannot prune if one or more clients has an empty commit history
+ return 0;
+ }
+
+ for (auto object_position : client.commit_position.object_positions) {
+ minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
+ }
+ }
+ if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ }
+ } while (more);
+
+ // cannot expire tags if a client hasn't committed yet
+ if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
+ return 0;
+ }
+
+ // compute the minimum in-use tag for each class
+ std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
+ typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
+ TAG_PASS_SCRUB,
+ TAG_PASS_DONE } TagPass;
+ int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
+ last_read = HEADER_KEY_TAG_PREFIX;
+ do {
+ std::map<std::string, bufferlist> vals;
+ int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
+ MAX_KEYS_READ, &vals, &more);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (auto &val : vals) {
+ cls::journal::Tag tag;
+ auto iter = val.second.cbegin();
+ try {
+ decode(tag, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding tag: %s", val.first.c_str());
+ return -EIO;
+ }
+
+ if (tag.tid != tag_tid_from_key(val.first)) {
+ CLS_ERR("tag tid mismatched: %s", val.first.c_str());
+ return -EINVAL;
+ }
+
+ if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
+ minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
+ } else if (tag_pass == TAG_PASS_SCRUB &&
+ tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
+ r = remove_key(hctx, val.first);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ if (tag.tid >= minimum_tag_tid) {
+ // no need to check for tag classes beyond this point
+ vals.clear();
+ more = false;
+ break;
+ }
+ }
+
+ if (tag_pass != TAG_PASS_DONE && !more) {
+ last_read = HEADER_KEY_TAG_PREFIX;
+ ++tag_pass;
+ } else if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ }
+ } while (tag_pass != TAG_PASS_DONE);
+ return 0;
+}
+
+int get_client_list_range(cls_method_context_t hctx,
+ std::set<cls::journal::Client> *clients,
+ std::string start_after, uint64_t max_return) {
+ std::string last_read;
+ if (!start_after.empty()) {
+ last_read = key_from_client_id(start_after);
+ }
+
+ std::map<std::string, bufferlist> vals;
+ bool more;
+ int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
+ max_return, &vals, &more);
+ if (r < 0) {
+ CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (std::map<std::string, bufferlist>::iterator it = vals.begin();
+ it != vals.end(); ++it) {
+ try {
+ auto iter = it->second.cbegin();
+
+ cls::journal::Client client;
+ decode(client, iter);
+ clients->insert(client);
+ } catch (const buffer::error &err) {
+ CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
+ err.what());
+ return -EIO;
+ }
+ }
+
+ return 0;
+}
+
+int find_min_commit_position(cls_method_context_t hctx,
+ cls::journal::ObjectSetPosition *minset) {
+ int r;
+ bool valid = false;
+ std::string start_after = "";
+ uint64_t tag_tid = 0, entry_tid = 0;
+
+ while (true) {
+ std::set<cls::journal::Client> batch;
+
+ r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
+ if ((r < 0) || batch.empty()) {
+ break;
+ }
+
+ start_after = batch.rbegin()->id;
+
+ // update the (minimum) commit position from this batch of clients
+ for(std::set<cls::journal::Client>::iterator it = batch.begin();
+ it != batch.end(); ++it) {
+ cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
+ if (object_set_position.object_positions.empty()) {
+ *minset = cls::journal::ObjectSetPosition();
+ break;
+ }
+ cls::journal::ObjectPosition first = object_set_position.object_positions.front();
+
+ // least tag_tid (or least entry_tid for matching tag_tid)
+ if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
+ tag_tid = first.tag_tid;
+ entry_tid = first.entry_tid;
+ *minset = cls::journal::ObjectSetPosition(object_set_position);
+ valid = true;
+ }
+ }
+
+ // got the last batch, we're done
+ if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
+ break;
+ }
+ }
+
+ return r;
+}
+
+} // anonymous namespace
+
+/**
+ * Input:
+ * @param order (uint8_t) - bits to shift to compute the object max size
+ * @param splay width (uint8_t) - number of active journal objects
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ uint8_t order;
+ uint8_t splay_width;
+ int64_t pool_id;
+ try {
+ auto iter = in->cbegin();
+ decode(order, iter);
+ decode(splay_width, iter);
+ decode(pool_id, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ bufferlist stored_orderbl;
+ int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
+ if (r >= 0) {
+ CLS_ERR("journal already exists");
+ return -EEXIST;
+ } else if (r != -ENOENT) {
+ return r;
+ }
+
+ r = write_key(hctx, HEADER_KEY_ORDER, order);
+ if (r < 0) {
+ return r;
+ }
+
+ r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
+ if (r < 0) {
+ return r;
+ }
+
+ r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
+ if (r < 0) {
+ return r;
+ }
+
+ uint64_t object_set = 0;
+ r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
+ if (r < 0) {
+ return r;
+ }
+
+ r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
+ if (r < 0) {
+ return r;
+ }
+
+ uint64_t tag_id = 0;
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
+ if (r < 0) {
+ return r;
+ }
+
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * order (uint8_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_order(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint8_t order;
+ int r = read_key(hctx, HEADER_KEY_ORDER, &order);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(order, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * splay_width (uint8_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint8_t splay_width;
+ int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(splay_width, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * pool_id (int64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ int64_t pool_id = 0;
+ int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(pool_id, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * object set (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t minimum_set;
+ int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(minimum_set, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param object set (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t object_set;
+ try {
+ auto iter = in->cbegin();
+ decode(object_set, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ uint64_t current_active_set;
+ int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
+ if (r < 0) {
+ return r;
+ }
+
+ if (current_active_set < object_set) {
+ CLS_LOG(10, "active object set earlier than minimum: %" PRIu64
+ " < %" PRIu64, current_active_set, object_set);
+ return -EINVAL;
+ }
+
+ uint64_t current_minimum_set;
+ r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
+ if (r < 0) {
+ return r;
+ }
+
+ if (object_set == current_minimum_set) {
+ return 0;
+ } else if (object_set < current_minimum_set) {
+ CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
+ object_set, current_minimum_set);
+ return -ESTALE;
+ }
+
+ r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * object set (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t active_set;
+ int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(active_set, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param object set (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t object_set;
+ try {
+ auto iter = in->cbegin();
+ decode(object_set, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ uint64_t current_minimum_set;
+ int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
+ if (r < 0) {
+ return r;
+ }
+
+ if (current_minimum_set > object_set) {
+ CLS_ERR("minimum object set later than active: %" PRIu64
+ " > %" PRIu64, current_minimum_set, object_set);
+ return -EINVAL;
+ }
+
+ uint64_t current_active_set;
+ r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
+ if (r < 0) {
+ return r;
+ }
+
+ if (object_set == current_active_set) {
+ return 0;
+ } else if (object_set < current_active_set) {
+ CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
+ object_set, current_active_set);
+ return -ESTALE;
+ }
+
+ r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param id (string) - unique client id
+ *
+ * Output:
+ * cls::journal::Client
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_client(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ try {
+ auto iter = in->cbegin();
+ decode(id, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_client_id(id));
+ cls::journal::Client client;
+ int r = read_key(hctx, key, &client);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(client, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param id (string) - unique client id
+ * @param data (bufferlist) - opaque data associated to client
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_register(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ bufferlist data;
+ try {
+ auto iter = in->cbegin();
+ decode(id, iter);
+ decode(data, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ uint8_t order;
+ int r = read_key(hctx, HEADER_KEY_ORDER, &order);
+ if (r < 0) {
+ return r;
+ }
+
+ std::string key(key_from_client_id(id));
+ bufferlist stored_clientbl;
+ r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
+ if (r >= 0) {
+ CLS_ERR("duplicate client id: %s", id.c_str());
+ return -EEXIST;
+ } else if (r != -ENOENT) {
+ return r;
+ }
+
+ cls::journal::ObjectSetPosition minset;
+ r = find_min_commit_position(hctx, &minset);
+ if (r < 0)
+ return r;
+
+ cls::journal::Client client(id, data, minset);
+ r = write_key(hctx, key, client);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param id (string) - unique client id
+ * @param data (bufferlist) - opaque data associated to client
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ bufferlist data;
+ try {
+ auto iter = in->cbegin();
+ decode(id, iter);
+ decode(data, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_client_id(id));
+ cls::journal::Client client;
+ int r = read_key(hctx, key, &client);
+ if (r < 0) {
+ return r;
+ }
+
+ client.data = data;
+ r = write_key(hctx, key, client);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param id (string) - unique client id
+ * @param state (uint8_t) - client state
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ cls::journal::ClientState state;
+ bufferlist data;
+ try {
+ auto iter = in->cbegin();
+ decode(id, iter);
+ uint8_t state_raw;
+ decode(state_raw, iter);
+ state = static_cast<cls::journal::ClientState>(state_raw);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_client_id(id));
+ cls::journal::Client client;
+ int r = read_key(hctx, key, &client);
+ if (r < 0) {
+ return r;
+ }
+
+ client.state = state;
+ r = write_key(hctx, key, client);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param id (string) - unique client id
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ try {
+ auto iter = in->cbegin();
+ decode(id, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_client_id(id));
+ bufferlist bl;
+ int r = cls_cxx_map_get_val(hctx, key, &bl);
+ if (r < 0) {
+ CLS_ERR("client is not registered: %s", id.c_str());
+ return r;
+ }
+
+ r = cls_cxx_map_remove_key(hctx, key);
+ if (r < 0) {
+ CLS_ERR("failed to remove omap key: %s", key.c_str());
+ return r;
+ }
+
+ // prune expired tags
+ r = expire_tags(hctx, &id);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param client_id (uint64_t) - unique client id
+ * @param commit_position (ObjectSetPosition)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ cls::journal::ObjectSetPosition commit_position;
+ try {
+ auto iter = in->cbegin();
+ decode(id, iter);
+ decode(commit_position, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ uint8_t splay_width;
+ int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
+ if (r < 0) {
+ return r;
+ }
+ if (commit_position.object_positions.size() > splay_width) {
+ CLS_ERR("too many object positions");
+ return -EINVAL;
+ }
+
+ std::string key(key_from_client_id(id));
+ cls::journal::Client client;
+ r = read_key(hctx, key, &client);
+ if (r < 0) {
+ return r;
+ }
+
+ if (client.commit_position == commit_position) {
+ return 0;
+ }
+
+ client.commit_position = commit_position;
+ r = write_key(hctx, key, client);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param start_after (string)
+ * @param max_return (uint64_t)
+ *
+ * Output:
+ * clients (set<cls::journal::Client>) - collection of registered clients
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_list(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string start_after;
+ uint64_t max_return;
+ try {
+ auto iter = in->cbegin();
+ decode(start_after, iter);
+ decode(max_return, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::set<cls::journal::Client> clients;
+ int r = get_client_list_range(hctx, &clients, start_after, max_return);
+ if (r < 0)
+ return r;
+
+ encode(clients, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t tag_tid;
+ int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(tag_tid, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param tag_tid (uint64_t)
+ *
+ * Output:
+ * cls::journal::Tag
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t tag_tid;
+ try {
+ auto iter = in->cbegin();
+ decode(tag_tid, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_tag_tid(tag_tid));
+ cls::journal::Tag tag;
+ int r = read_key(hctx, key, &tag);
+ if (r < 0) {
+ return r;
+ }
+
+ encode(tag, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param tag_tid (uint64_t)
+ * @param tag_class (uint64_t)
+ * @param data (bufferlist)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t tag_tid;
+ uint64_t tag_class;
+ bufferlist data;
+ try {
+ auto iter = in->cbegin();
+ decode(tag_tid, iter);
+ decode(tag_class, iter);
+ decode(data, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_tag_tid(tag_tid));
+ bufferlist stored_tag_bl;
+ int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
+ if (r >= 0) {
+ CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
+ return -EEXIST;
+ } else if (r != -ENOENT) {
+ return r;
+ }
+
+ // verify tag tid ordering
+ uint64_t next_tag_tid;
+ r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
+ if (r < 0) {
+ return r;
+ }
+ if (tag_tid != next_tag_tid) {
+ CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
+ return -ESTALE;
+ }
+
+ uint64_t next_tag_class;
+ r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
+ if (r < 0) {
+ return r;
+ }
+
+ if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
+ // allocate a new tag class
+ tag_class = next_tag_class;
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
+ if (r < 0) {
+ return r;
+ }
+ } else {
+ // verify tag class range
+ if (tag_class >= next_tag_class) {
+ CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
+ return -EINVAL;
+ }
+ }
+
+ // prune expired tags
+ r = expire_tags(hctx, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ // update tag tid sequence
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
+ if (r < 0) {
+ return r;
+ }
+
+ // write tag structure
+ cls::journal::Tag tag(tag_tid, tag_class, data);
+ key = key_from_tag_tid(tag_tid);
+ r = write_key(hctx, key, tag);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param start_after_tag_tid (uint64_t) - first tag tid
+ * @param max_return (uint64_t) - max tags to return
+ * @param client_id (std::string) - client id filter
+ * @param tag_class (boost::optional<uint64_t> - optional tag class filter
+ *
+ * Output:
+ * std::set<cls::journal::Tag> - collection of tags
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t start_after_tag_tid;
+ uint64_t max_return;
+ std::string client_id;
+ boost::optional<uint64_t> tag_class(0);
+
+ // handle compiler false positive about use-before-init
+ tag_class = boost::none;
+ try {
+ auto iter = in->cbegin();
+ decode(start_after_tag_tid, iter);
+ decode(max_return, iter);
+ decode(client_id, iter);
+ decode(tag_class, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ // calculate the minimum tag within client's commit position
+ uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
+ cls::journal::Client client;
+ int r = read_key(hctx, key_from_client_id(client_id), &client);
+ if (r < 0) {
+ return r;
+ }
+
+ for (auto object_position : client.commit_position.object_positions) {
+ minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
+ }
+
+ // compute minimum tags in use per-class
+ std::set<cls::journal::Tag> tags;
+ std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
+ typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
+ TAG_PASS_LIST,
+ TAG_PASS_DONE } TagPass;
+ int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
+ TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
+ std::string last_read = HEADER_KEY_TAG_PREFIX;
+ do {
+ std::map<std::string, bufferlist> vals;
+ bool more;
+ r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
+ MAX_KEYS_READ, &vals, &more);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (auto &val : vals) {
+ cls::journal::Tag tag;
+ auto iter = val.second.cbegin();
+ try {
+ decode(tag, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding tag: %s", val.first.c_str());
+ return -EIO;
+ }
+
+ if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
+ minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
+
+ // completed calculation of tag class minimums
+ if (tag.tid >= minimum_tag_tid) {
+ vals.clear();
+ more = false;
+ break;
+ }
+ } else if (tag_pass == TAG_PASS_LIST) {
+ if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
+ continue;
+ }
+
+ if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
+ (!tag_class || *tag_class == tag.tag_class)) {
+ tags.insert(tag);
+ }
+ if (tags.size() >= max_return) {
+ tag_pass = TAG_PASS_DONE;
+ }
+ }
+ }
+
+ if (tag_pass != TAG_PASS_DONE && !more) {
+ last_read = HEADER_KEY_TAG_PREFIX;
+ ++tag_pass;
+ } else if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ }
+ } while (tag_pass != TAG_PASS_DONE);
+
+ encode(tags, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param soft_max_size (uint64_t)
+ *
+ * Output:
+ * @returns 0 if object size less than max, negative error code otherwise
+ */
+int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t soft_max_size;
+ try {
+ auto iter = in->cbegin();
+ decode(soft_max_size, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ uint64_t size;
+ time_t mtime;
+ int r = cls_cxx_stat(hctx, &size, &mtime);
+ if (r == -ENOENT) {
+ return 0;
+ } else if (r < 0) {
+ CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ if (size >= soft_max_size) {
+ CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
+ size, soft_max_size);
+ return -EOVERFLOW;
+ }
+ return 0;
+}
+
+CLS_INIT(journal)
+{
+ CLS_LOG(20, "Loaded journal class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_journal_create;
+ cls_method_handle_t h_journal_get_order;
+ cls_method_handle_t h_journal_get_splay_width;
+ cls_method_handle_t h_journal_get_pool_id;
+ cls_method_handle_t h_journal_get_minimum_set;
+ cls_method_handle_t h_journal_set_minimum_set;
+ cls_method_handle_t h_journal_get_active_set;
+ cls_method_handle_t h_journal_set_active_set;
+ cls_method_handle_t h_journal_get_client;
+ cls_method_handle_t h_journal_client_register;
+ cls_method_handle_t h_journal_client_update_data;
+ cls_method_handle_t h_journal_client_update_state;
+ cls_method_handle_t h_journal_client_unregister;
+ cls_method_handle_t h_journal_client_commit;
+ cls_method_handle_t h_journal_client_list;
+ cls_method_handle_t h_journal_get_next_tag_tid;
+ cls_method_handle_t h_journal_get_tag;
+ cls_method_handle_t h_journal_tag_create;
+ cls_method_handle_t h_journal_tag_list;
+ cls_method_handle_t h_journal_object_guard_append;
+
+ cls_register("journal", &h_class);
+
+ /// methods for journal.$journal_id objects
+ cls_register_cxx_method(h_class, "create",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_create, &h_journal_create);
+ cls_register_cxx_method(h_class, "get_order",
+ CLS_METHOD_RD,
+ journal_get_order, &h_journal_get_order);
+ cls_register_cxx_method(h_class, "get_splay_width",
+ CLS_METHOD_RD,
+ journal_get_splay_width, &h_journal_get_splay_width);
+ cls_register_cxx_method(h_class, "get_pool_id",
+ CLS_METHOD_RD,
+ journal_get_pool_id, &h_journal_get_pool_id);
+ cls_register_cxx_method(h_class, "get_minimum_set",
+ CLS_METHOD_RD,
+ journal_get_minimum_set,
+ &h_journal_get_minimum_set);
+ cls_register_cxx_method(h_class, "set_minimum_set",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_set_minimum_set,
+ &h_journal_set_minimum_set);
+ cls_register_cxx_method(h_class, "get_active_set",
+ CLS_METHOD_RD,
+ journal_get_active_set,
+ &h_journal_get_active_set);
+ cls_register_cxx_method(h_class, "set_active_set",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_set_active_set,
+ &h_journal_set_active_set);
+
+ cls_register_cxx_method(h_class, "get_client",
+ CLS_METHOD_RD,
+ journal_get_client, &h_journal_get_client);
+ cls_register_cxx_method(h_class, "client_register",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_client_register, &h_journal_client_register);
+ cls_register_cxx_method(h_class, "client_update_data",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_client_update_data,
+ &h_journal_client_update_data);
+ cls_register_cxx_method(h_class, "client_update_state",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_client_update_state,
+ &h_journal_client_update_state);
+ cls_register_cxx_method(h_class, "client_unregister",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_client_unregister,
+ &h_journal_client_unregister);
+ cls_register_cxx_method(h_class, "client_commit",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_client_commit, &h_journal_client_commit);
+ cls_register_cxx_method(h_class, "client_list",
+ CLS_METHOD_RD,
+ journal_client_list, &h_journal_client_list);
+
+ cls_register_cxx_method(h_class, "get_next_tag_tid",
+ CLS_METHOD_RD,
+ journal_get_next_tag_tid,
+ &h_journal_get_next_tag_tid);
+ cls_register_cxx_method(h_class, "get_tag",
+ CLS_METHOD_RD,
+ journal_get_tag, &h_journal_get_tag);
+ cls_register_cxx_method(h_class, "tag_create",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_tag_create, &h_journal_tag_create);
+ cls_register_cxx_method(h_class, "tag_list",
+ CLS_METHOD_RD,
+ journal_tag_list, &h_journal_tag_list);
+
+ /// methods for journal_data.$journal_id.$object_id objects
+ cls_register_cxx_method(h_class, "guard_append",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_object_guard_append,
+ &h_journal_object_guard_append);
+}
diff --git a/src/cls/journal/cls_journal_client.cc b/src/cls/journal/cls_journal_client.cc
new file mode 100644
index 00000000..c22a32cf
--- /dev/null
+++ b/src/cls/journal/cls_journal_client.cc
@@ -0,0 +1,498 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "cls/journal/cls_journal_client.h"
+#include "include/rados/librados.hpp"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "common/Cond.h"
+#include <errno.h>
+
+namespace cls {
+namespace journal {
+namespace client {
+using ceph::encode;
+using ceph::decode;
+
+namespace {
+
+struct C_AioExec : public Context {
+ librados::IoCtx &ioctx;
+ std::string oid;
+
+ C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid)
+ : ioctx(_ioctx), oid(_oid) {
+ }
+
+ static void rados_callback(rados_completion_t c, void *arg) {
+ Context *ctx = reinterpret_cast<Context *>(arg);
+ ctx->complete(rados_aio_get_return_value(c));
+ }
+};
+
+struct C_ClientList : public C_AioExec {
+ std::set<cls::journal::Client> *clients;
+ Context *on_finish;
+ bufferlist outbl;
+
+ C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid,
+ std::set<cls::journal::Client> *_clients,
+ Context *_on_finish)
+ : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {}
+
+ void send(const std::string &start_after) {
+ bufferlist inbl;
+ encode(start_after, inbl);
+ encode(JOURNAL_MAX_RETURN, inbl);
+
+ librados::ObjectReadOperation op;
+ op.exec("journal", "client_list", inbl);
+
+ outbl.clear();
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(this, rados_callback, NULL);
+ int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
+ ceph_assert(r == 0);
+ rados_completion->release();
+ }
+
+ void complete(int r) override {
+ if (r < 0) {
+ finish(r);
+ return;
+ }
+
+ try {
+ auto iter = outbl.cbegin();
+ std::set<cls::journal::Client> partial_clients;
+ decode(partial_clients, iter);
+
+ std::string start_after;
+ if (!partial_clients.empty()) {
+ start_after = partial_clients.rbegin()->id;
+ clients->insert(partial_clients.begin(), partial_clients.end());
+ }
+
+ if (partial_clients.size() < JOURNAL_MAX_RETURN) {
+ finish(0);
+ } else {
+ send(start_after);
+ }
+ } catch (const buffer::error &err) {
+ finish(-EBADMSG);
+ }
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ delete this;
+ }
+};
+
+struct C_ImmutableMetadata : public C_AioExec {
+ uint8_t *order;
+ uint8_t *splay_width;
+ int64_t *pool_id;
+ Context *on_finish;
+ bufferlist outbl;
+
+ C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
+ uint8_t *_order, uint8_t *_splay_width,
+ int64_t *_pool_id, Context *_on_finish)
+ : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width),
+ pool_id(_pool_id), on_finish(_on_finish) {
+ }
+
+ void send() {
+ librados::ObjectReadOperation op;
+ bufferlist inbl;
+ op.exec("journal", "get_order", inbl);
+ op.exec("journal", "get_splay_width", inbl);
+ op.exec("journal", "get_pool_id", inbl);
+
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(this, rados_callback, NULL);
+ int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
+ ceph_assert(r == 0);
+ rados_completion->release();
+ }
+
+ void finish(int r) override {
+ if (r == 0) {
+ try {
+ auto iter = outbl.cbegin();
+ decode(*order, iter);
+ decode(*splay_width, iter);
+ decode(*pool_id, iter);
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+ on_finish->complete(r);
+ }
+};
+
+struct C_MutableMetadata : public C_AioExec {
+ uint64_t *minimum_set;
+ uint64_t *active_set;
+ C_ClientList *client_list;
+ bufferlist outbl;
+
+ C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
+ uint64_t *_minimum_set, uint64_t *_active_set,
+ C_ClientList *_client_list)
+ : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set),
+ active_set(_active_set), client_list(_client_list) {}
+
+ void send() {
+ librados::ObjectReadOperation op;
+ bufferlist inbl;
+ op.exec("journal", "get_minimum_set", inbl);
+ op.exec("journal", "get_active_set", inbl);
+
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(this, rados_callback, NULL);
+ int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
+ ceph_assert(r == 0);
+ rados_completion->release();
+ }
+
+ void finish(int r) override {
+ if (r == 0) {
+ try {
+ auto iter = outbl.cbegin();
+ decode(*minimum_set, iter);
+ decode(*active_set, iter);
+ client_list->send("");
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+ if (r < 0) {
+ client_list->complete(r);
+ }
+ }
+};
+
+
+} // anonymous namespace
+
+void create(librados::ObjectWriteOperation *op,
+ uint8_t order, uint8_t splay, int64_t pool_id) {
+ bufferlist bl;
+ encode(order, bl);
+ encode(splay, bl);
+ encode(pool_id, bl);
+
+ op->exec("journal", "create", bl);
+}
+
+int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
+ uint8_t splay, int64_t pool_id) {
+ librados::ObjectWriteOperation op;
+ create(&op, order, splay, pool_id);
+
+ int r = ioctx.operate(oid, &op);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
+ uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id, Context *on_finish) {
+ C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order,
+ splay_width, pool_id,
+ on_finish);
+ metadata->send();
+}
+
+void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t *minimum_set, uint64_t *active_set,
+ std::set<cls::journal::Client> *clients,
+ Context *on_finish) {
+ C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
+ C_MutableMetadata *metadata = new C_MutableMetadata(
+ ioctx, oid, minimum_set, active_set, client_list);
+ metadata->send();
+}
+
+void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
+ bufferlist bl;
+ encode(object_set, bl);
+ op->exec("journal", "set_minimum_set", bl);
+}
+
+void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
+ bufferlist bl;
+ encode(object_set, bl);
+ op->exec("journal", "set_active_set", bl);
+}
+
+int get_client(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, cls::journal::Client *client) {
+ librados::ObjectReadOperation op;
+ get_client_start(&op, id);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ auto iter = out_bl.cbegin();
+ r = get_client_finish(&iter, client);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void get_client_start(librados::ObjectReadOperation *op,
+ const std::string &id) {
+ bufferlist bl;
+ encode(id, bl);
+ op->exec("journal", "get_client", bl);
+}
+
+int get_client_finish(bufferlist::const_iterator *iter,
+ cls::journal::Client *client) {
+ try {
+ decode(*client, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+int client_register(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, const bufferlist &data) {
+ librados::ObjectWriteOperation op;
+ client_register(&op, id, data);
+ return ioctx.operate(oid, &op);
+}
+
+void client_register(librados::ObjectWriteOperation *op,
+ const std::string &id, const bufferlist &data) {
+ bufferlist bl;
+ encode(id, bl);
+ encode(data, bl);
+ op->exec("journal", "client_register", bl);
+}
+
+int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, const bufferlist &data) {
+ librados::ObjectWriteOperation op;
+ client_update_data(&op, id, data);
+ return ioctx.operate(oid, &op);
+}
+
+void client_update_data(librados::ObjectWriteOperation *op,
+ const std::string &id, const bufferlist &data) {
+ bufferlist bl;
+ encode(id, bl);
+ encode(data, bl);
+ op->exec("journal", "client_update_data", bl);
+}
+
+int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, cls::journal::ClientState state) {
+ librados::ObjectWriteOperation op;
+ client_update_state(&op, id, state);
+ return ioctx.operate(oid, &op);
+}
+
+void client_update_state(librados::ObjectWriteOperation *op,
+ const std::string &id,
+ cls::journal::ClientState state) {
+ bufferlist bl;
+ encode(id, bl);
+ encode(static_cast<uint8_t>(state), bl);
+ op->exec("journal", "client_update_state", bl);
+}
+
+int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id) {
+ librados::ObjectWriteOperation op;
+ client_unregister(&op, id);
+ return ioctx.operate(oid, &op);
+}
+
+void client_unregister(librados::ObjectWriteOperation *op,
+ const std::string &id) {
+
+ bufferlist bl;
+ encode(id, bl);
+ op->exec("journal", "client_unregister", bl);
+}
+
+void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
+ const cls::journal::ObjectSetPosition &commit_position) {
+ bufferlist bl;
+ encode(id, bl);
+ encode(commit_position, bl);
+ op->exec("journal", "client_commit", bl);
+}
+
+int client_list(librados::IoCtx &ioctx, const std::string &oid,
+ std::set<cls::journal::Client> *clients) {
+ C_SaferCond cond;
+ client_list(ioctx, oid, clients, &cond);
+ return cond.wait();
+}
+
+void client_list(librados::IoCtx &ioctx, const std::string &oid,
+ std::set<cls::journal::Client> *clients, Context *on_finish) {
+ C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
+ client_list->send("");
+}
+
+int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t *tag_tid) {
+ librados::ObjectReadOperation op;
+ get_next_tag_tid_start(&op);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ auto iter = out_bl.cbegin();
+ r = get_next_tag_tid_finish(&iter, tag_tid);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
+ bufferlist bl;
+ op->exec("journal", "get_next_tag_tid", bl);
+}
+
+int get_next_tag_tid_finish(bufferlist::const_iterator *iter,
+ uint64_t *tag_tid) {
+ try {
+ decode(*tag_tid, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+int get_tag(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, cls::journal::Tag *tag) {
+ librados::ObjectReadOperation op;
+ get_tag_start(&op, tag_tid);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ auto iter = out_bl.cbegin();
+ r = get_tag_finish(&iter, tag);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void get_tag_start(librados::ObjectReadOperation *op,
+ uint64_t tag_tid) {
+ bufferlist bl;
+ encode(tag_tid, bl);
+ op->exec("journal", "get_tag", bl);
+}
+
+int get_tag_finish(bufferlist::const_iterator *iter, cls::journal::Tag *tag) {
+ try {
+ decode(*tag, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+int tag_create(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, uint64_t tag_class,
+ const bufferlist &data) {
+ librados::ObjectWriteOperation op;
+ tag_create(&op, tag_tid, tag_class, data);
+ return ioctx.operate(oid, &op);
+}
+
+void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
+ uint64_t tag_class, const bufferlist &data) {
+ bufferlist bl;
+ encode(tag_tid, bl);
+ encode(tag_class, bl);
+ encode(data, bl);
+ op->exec("journal", "tag_create", bl);
+}
+
+int tag_list(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, boost::optional<uint64_t> tag_class,
+ std::set<cls::journal::Tag> *tags) {
+ tags->clear();
+ uint64_t start_after_tag_tid = 0;
+ while (true) {
+ librados::ObjectReadOperation op;
+ tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
+ tag_class);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ auto iter = out_bl.cbegin();
+ std::set<cls::journal::Tag> decode_tags;
+ r = tag_list_finish(&iter, &decode_tags);
+ if (r < 0) {
+ return r;
+ }
+
+ tags->insert(decode_tags.begin(), decode_tags.end());
+ if (decode_tags.size() < JOURNAL_MAX_RETURN) {
+ break;
+ }
+ }
+ return 0;
+}
+
+void tag_list_start(librados::ObjectReadOperation *op,
+ uint64_t start_after_tag_tid, uint64_t max_return,
+ const std::string &client_id,
+ boost::optional<uint64_t> tag_class) {
+ bufferlist bl;
+ encode(start_after_tag_tid, bl);
+ encode(max_return, bl);
+ encode(client_id, bl);
+ encode(tag_class, bl);
+ op->exec("journal", "tag_list", bl);
+}
+
+int tag_list_finish(bufferlist::const_iterator *iter,
+ std::set<cls::journal::Tag> *tags) {
+ try {
+ decode(*tags, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
+ bufferlist bl;
+ encode(soft_max_size, bl);
+ op->exec("journal", "guard_append", bl);
+}
+
+} // namespace client
+} // namespace journal
+} // namespace cls
diff --git a/src/cls/journal/cls_journal_client.h b/src/cls/journal/cls_journal_client.h
new file mode 100644
index 00000000..50579cfc
--- /dev/null
+++ b/src/cls/journal/cls_journal_client.h
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_JOURNAL_CLIENT_H
+#define CEPH_CLS_JOURNAL_CLIENT_H
+
+#include "include/rados/librados_fwd.hpp"
+#include "cls/journal/cls_journal_types.h"
+#include <set>
+#include <boost/optional.hpp>
+
+class Context;
+
+namespace cls {
+namespace journal {
+namespace client {
+
+void create(librados::ObjectWriteOperation *op,
+ uint8_t order, uint8_t splay, int64_t pool_id);
+int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
+ uint8_t splay, int64_t pool_id);
+
+void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
+ uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id, Context *on_finish);
+void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t *minimum_set, uint64_t *active_set,
+ std::set<cls::journal::Client> *clients,
+ Context *on_finish);
+
+void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set);
+void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set);
+
+// journal client helpers
+int get_client(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, cls::journal::Client *client);
+void get_client_start(librados::ObjectReadOperation *op,
+ const std::string &id);
+int get_client_finish(bufferlist::const_iterator *iter,
+ cls::journal::Client *client);
+
+int client_register(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, const bufferlist &data);
+void client_register(librados::ObjectWriteOperation *op,
+ const std::string &id, const bufferlist &data);
+
+int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, const bufferlist &data);
+void client_update_data(librados::ObjectWriteOperation *op,
+ const std::string &id, const bufferlist &data);
+int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, cls::journal::ClientState state);
+void client_update_state(librados::ObjectWriteOperation *op,
+ const std::string &id,
+ cls::journal::ClientState state);
+
+int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id);
+void client_unregister(librados::ObjectWriteOperation *op,
+ const std::string &id);
+
+void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
+ const cls::journal::ObjectSetPosition &commit_position);
+
+int client_list(librados::IoCtx &ioctx, const std::string &oid,
+ std::set<cls::journal::Client> *clients);
+void client_list(librados::IoCtx &ioctx, const std::string &oid,
+ std::set<cls::journal::Client> *clients, Context *on_finish);
+
+// journal tag helpers
+int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t *tag_tid);
+void get_next_tag_tid_start(librados::ObjectReadOperation *op);
+int get_next_tag_tid_finish(bufferlist::const_iterator *iter,
+ uint64_t *tag_tid);
+
+int get_tag(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, cls::journal::Tag *tag);
+void get_tag_start(librados::ObjectReadOperation *op,
+ uint64_t tag_tid);
+int get_tag_finish(bufferlist::const_iterator *iter, cls::journal::Tag *tag);
+
+int tag_create(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, uint64_t tag_class,
+ const bufferlist &data);
+void tag_create(librados::ObjectWriteOperation *op,
+ uint64_t tag_tid, uint64_t tag_class,
+ const bufferlist &data);
+
+int tag_list(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, boost::optional<uint64_t> tag_class,
+ std::set<cls::journal::Tag> *tags);
+void tag_list_start(librados::ObjectReadOperation *op,
+ uint64_t start_after_tag_tid, uint64_t max_return,
+ const std::string &client_id,
+ boost::optional<uint64_t> tag_class);
+int tag_list_finish(bufferlist::const_iterator *iter,
+ std::set<cls::journal::Tag> *tags);
+
+// journal entry helpers
+void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size);
+
+} // namespace client
+} // namespace journal
+} // namespace cls
+
+#endif // CEPH_CLS_JOURNAL_CLIENT_H
diff --git a/src/cls/journal/cls_journal_types.cc b/src/cls/journal/cls_journal_types.cc
new file mode 100644
index 00000000..6976304d
--- /dev/null
+++ b/src/cls/journal/cls_journal_types.cc
@@ -0,0 +1,196 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "cls/journal/cls_journal_types.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+
+namespace cls {
+namespace journal {
+
+void ObjectPosition::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(object_number, bl);
+ encode(tag_tid, bl);
+ encode(entry_tid, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ObjectPosition::decode(bufferlist::const_iterator& iter) {
+ DECODE_START(1, iter);
+ decode(object_number, iter);
+ decode(tag_tid, iter);
+ decode(entry_tid, iter);
+ DECODE_FINISH(iter);
+}
+
+void ObjectPosition::dump(Formatter *f) const {
+ f->dump_unsigned("object_number", object_number);
+ f->dump_unsigned("tag_tid", tag_tid);
+ f->dump_unsigned("entry_tid", entry_tid);
+}
+
+void ObjectPosition::generate_test_instances(std::list<ObjectPosition *> &o) {
+ o.push_back(new ObjectPosition());
+ o.push_back(new ObjectPosition(1, 2, 3));
+}
+
+void ObjectSetPosition::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(object_positions, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ObjectSetPosition::decode(bufferlist::const_iterator& iter) {
+ DECODE_START(1, iter);
+ decode(object_positions, iter);
+ DECODE_FINISH(iter);
+}
+
+void ObjectSetPosition::dump(Formatter *f) const {
+ f->open_array_section("object_positions");
+ for (auto &pos : object_positions) {
+ f->open_object_section("object_position");
+ pos.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void ObjectSetPosition::generate_test_instances(
+ std::list<ObjectSetPosition *> &o) {
+ o.push_back(new ObjectSetPosition());
+ o.push_back(new ObjectSetPosition({{0, 1, 120}, {121, 2, 121}}));
+}
+
+void Client::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(data, bl);
+ encode(commit_position, bl);
+ encode(static_cast<uint8_t>(state), bl);
+ ENCODE_FINISH(bl);
+}
+
+void Client::decode(bufferlist::const_iterator& iter) {
+ DECODE_START(1, iter);
+ decode(id, iter);
+ decode(data, iter);
+ decode(commit_position, iter);
+
+ uint8_t state_raw;
+ decode(state_raw, iter);
+ state = static_cast<ClientState>(state_raw);
+ DECODE_FINISH(iter);
+}
+
+void Client::dump(Formatter *f) const {
+ f->dump_string("id", id);
+
+ std::stringstream data_ss;
+ data.hexdump(data_ss);
+ f->dump_string("data", data_ss.str());
+
+ f->open_object_section("commit_position");
+ commit_position.dump(f);
+ f->close_section();
+
+ f->dump_string("state", stringify(state));
+}
+
+void Client::generate_test_instances(std::list<Client *> &o) {
+ bufferlist data;
+ data.append(std::string(128, '1'));
+
+ o.push_back(new Client());
+ o.push_back(new Client("id", data));
+ o.push_back(new Client("id", data, {{{1, 2, 120}, {2, 3, 121}}}));
+}
+
+void Tag::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(tid, bl);
+ encode(tag_class, bl);
+ encode(data, bl);
+ ENCODE_FINISH(bl);
+}
+
+void Tag::decode(bufferlist::const_iterator& iter) {
+ DECODE_START(1, iter);
+ decode(tid, iter);
+ decode(tag_class, iter);
+ decode(data, iter);
+ DECODE_FINISH(iter);
+}
+
+void Tag::dump(Formatter *f) const {
+ f->dump_unsigned("tid", tid);
+ f->dump_unsigned("tag_class", tag_class);
+
+ std::stringstream data_ss;
+ data.hexdump(data_ss);
+ f->dump_string("data", data_ss.str());
+}
+
+void Tag::generate_test_instances(std::list<Tag *> &o) {
+ o.push_back(new Tag());
+
+ bufferlist data;
+ data.append(std::string(128, '1'));
+ o.push_back(new Tag(123, 234, data));
+}
+
+std::ostream &operator<<(std::ostream &os, const ClientState &state) {
+ switch (state) {
+ case CLIENT_STATE_CONNECTED:
+ os << "connected";
+ break;
+ case CLIENT_STATE_DISCONNECTED:
+ os << "disconnected";
+ break;
+ default:
+ os << "unknown (" << static_cast<uint32_t>(state) << ")";
+ break;
+ }
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const ObjectPosition &object_position) {
+ os << "["
+ << "object_number=" << object_position.object_number << ", "
+ << "tag_tid=" << object_position.tag_tid << ", "
+ << "entry_tid=" << object_position.entry_tid << "]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const ObjectSetPosition &object_set_position) {
+ os << "[positions=[";
+ std::string delim;
+ for (auto &object_position : object_set_position.object_positions) {
+ os << delim << object_position;
+ delim = ", ";
+ }
+ os << "]]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const Client &client) {
+ os << "[id=" << client.id << ", "
+ << "commit_position=" << client.commit_position << ", "
+ << "state=" << client.state << "]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const Tag &tag) {
+ os << "[tid=" << tag.tid << ", "
+ << "tag_class=" << tag.tag_class << ", "
+ << "data=";
+ tag.data.hexdump(os);
+ os << "]";
+ return os;
+}
+
+} // namespace journal
+} // namespace cls
diff --git a/src/cls/journal/cls_journal_types.h b/src/cls/journal/cls_journal_types.h
new file mode 100644
index 00000000..2a617698
--- /dev/null
+++ b/src/cls/journal/cls_journal_types.h
@@ -0,0 +1,154 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_JOURNAL_TYPES_H
+#define CEPH_CLS_JOURNAL_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <list>
+#include <string>
+
+namespace ceph {
+class Formatter;
+}
+
+namespace cls {
+namespace journal {
+
+static const uint64_t JOURNAL_MAX_RETURN = 256;
+
+struct ObjectPosition {
+ uint64_t object_number;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+
+ ObjectPosition() : object_number(0), tag_tid(0), entry_tid(0) {}
+ ObjectPosition(uint64_t _object_number, uint64_t _tag_tid,
+ uint64_t _entry_tid)
+ : object_number(_object_number), tag_tid(_tag_tid), entry_tid(_entry_tid) {}
+
+ inline bool operator==(const ObjectPosition& rhs) const {
+ return (object_number == rhs.object_number &&
+ tag_tid == rhs.tag_tid &&
+ entry_tid == rhs.entry_tid);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& iter);
+ void dump(Formatter *f) const;
+
+ inline bool operator<(const ObjectPosition &rhs) const {
+ if (object_number != rhs.object_number) {
+ return object_number < rhs.object_number;
+ } else if (tag_tid != rhs.tag_tid) {
+ return tag_tid < rhs.tag_tid;
+ }
+ return entry_tid < rhs.entry_tid;
+ }
+
+ static void generate_test_instances(std::list<ObjectPosition *> &o);
+};
+
+typedef std::list<ObjectPosition> ObjectPositions;
+
+struct ObjectSetPosition {
+ // stored in most-recent -> least recent committed entry order
+ ObjectPositions object_positions;
+
+ ObjectSetPosition() {}
+ ObjectSetPosition(const ObjectPositions &_object_positions)
+ : object_positions(_object_positions) {}
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& iter);
+ void dump(Formatter *f) const;
+
+ inline bool operator==(const ObjectSetPosition &rhs) const {
+ return (object_positions == rhs.object_positions);
+ }
+
+ static void generate_test_instances(std::list<ObjectSetPosition *> &o);
+};
+
+enum ClientState {
+ CLIENT_STATE_CONNECTED = 0,
+ CLIENT_STATE_DISCONNECTED = 1
+};
+
+struct Client {
+ std::string id;
+ bufferlist data;
+ ObjectSetPosition commit_position;
+ ClientState state;
+
+ Client() : state(CLIENT_STATE_CONNECTED) {}
+ Client(const std::string& _id, const bufferlist &_data,
+ const ObjectSetPosition &_commit_position = ObjectSetPosition(),
+ ClientState _state = CLIENT_STATE_CONNECTED)
+ : id(_id), data(_data), commit_position(_commit_position), state(_state) {}
+
+ inline bool operator==(const Client &rhs) const {
+ return (id == rhs.id &&
+ data.contents_equal(rhs.data) &&
+ commit_position == rhs.commit_position &&
+ state == rhs.state);
+ }
+ inline bool operator<(const Client &rhs) const {
+ return (id < rhs.id);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& iter);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<Client *> &o);
+};
+
+struct Tag {
+ static const uint64_t TAG_CLASS_NEW = static_cast<uint64_t>(-1);
+
+ uint64_t tid;
+ uint64_t tag_class;
+ bufferlist data;
+
+ Tag() : tid(0), tag_class(0) {}
+ Tag(uint64_t tid, uint64_t tag_class, const bufferlist &data)
+ : tid(tid), tag_class(tag_class), data(data) {}
+
+ inline bool operator==(const Tag &rhs) const {
+ return (tid == rhs.tid &&
+ tag_class == rhs.tag_class &&
+ data.contents_equal(rhs.data));
+ }
+ inline bool operator<(const Tag &rhs) const {
+ return (tid < rhs.tid);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& iter);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<Tag *> &o);
+};
+
+WRITE_CLASS_ENCODER(ObjectPosition);
+WRITE_CLASS_ENCODER(ObjectSetPosition);
+WRITE_CLASS_ENCODER(Client);
+WRITE_CLASS_ENCODER(Tag);
+
+std::ostream &operator<<(std::ostream &os, const ClientState &state);
+std::ostream &operator<<(std::ostream &os,
+ const ObjectPosition &object_position);
+std::ostream &operator<<(std::ostream &os,
+ const ObjectSetPosition &object_set_position);
+std::ostream &operator<<(std::ostream &os,
+ const Client &client);
+std::ostream &operator<<(std::ostream &os, const Tag &tag);
+
+} // namespace journal
+} // namespace cls
+
+#endif // CEPH_CLS_JOURNAL_TYPES_H