summaryrefslogtreecommitdiffstats
path: root/src/cls/2pc_queue
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/cls/2pc_queue
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cls/2pc_queue')
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue.cc602
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.cc210
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_client.h84
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_const.h14
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_ops.h117
-rw-r--r--src/cls/2pc_queue/cls_2pc_queue_types.h62
6 files changed, 1089 insertions, 0 deletions
diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc
new file mode 100644
index 000000000..fba763955
--- /dev/null
+++ b/src/cls/2pc_queue/cls_2pc_queue.cc
@@ -0,0 +1,602 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_ops.h"
+#include "cls/2pc_queue/cls_2pc_queue_const.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_src.h"
+#include "objclass/objclass.h"
+
+CLS_VER(1,0)
+CLS_NAME(2pc_queue)
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+
+constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data";
+
+static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ auto in_iter = in->cbegin();
+
+ cls_queue_init_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_init: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ cls_2pc_urgent_data urgent_data;
+
+ cls_queue_init_op init_op;
+
+ CLS_LOG(20, "INFO: cls_2pc_queue_init: max size is %lu (bytes)", op.queue_size);
+
+ init_op.queue_size = op.queue_size;
+ init_op.max_urgent_data_size = 23552; // overall head is 24KB ~ pending 1K reservations ops
+ encode(urgent_data, init_op.bl_urgent_data);
+
+ return queue_init(hctx, init_op);
+}
+
+static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_queue_get_capacity_ret op_ret;
+ auto ret = queue_get_capacity(hctx, op_ret);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ cls_2pc_queue_reserve_op res_op;
+ try {
+ auto in_iter = in->cbegin();
+ decode(res_op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ if (res_op.size == 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero bytes");
+ return -EINVAL;
+ }
+ if (res_op.entries == 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero entries");
+ return -EINVAL;
+ }
+
+ // get head
+ cls_queue_head head;
+ int ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD;
+ const auto remaining_size = (head.tail.offset >= head.front.offset) ?
+ (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
+ head.front.offset - head.tail.offset;
+
+
+ if (res_op.size + urgent_data.reserved_size + overhead > remaining_size) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservations exceeded maximum capacity");
+ CLS_LOG(10, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size);
+ CLS_LOG(10, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size);
+ CLS_LOG(10, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size);
+ return -ENOSPC;
+ }
+
+ urgent_data.reserved_size += res_op.size + overhead;
+ // note that last id is incremented regadless of failures
+ // to avoid "old reservation" issues below
+ ++urgent_data.last_id;
+ bool result;
+ cls_2pc_reservations::iterator last_reservation;
+ std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
+ std::forward_as_tuple(urgent_data.last_id),
+ std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+ if (!result) {
+ // an old reservation that was never committed or aborted is in the map
+ // caller should try again assuming other IDs are ok
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict after rollover: %u", urgent_data.last_id);
+ return -EAGAIN;
+ }
+
+ // write back head
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+
+ const uint64_t urgent_data_length = head.bl_urgent_data.length();
+
+ if (head.max_urgent_data_size < urgent_data_length) {
+ CLS_LOG(10, "INFO: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu using xattrs", urgent_data_length, head.max_urgent_data_size);
+ // add the last reservation to xattrs
+ bufferlist bl_xattrs;
+ auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ cls_2pc_reservations xattr_reservations;
+ if (ret >= 0) {
+ // xattrs exist
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ }
+ std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
+ std::forward_as_tuple(urgent_data.last_id),
+ std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+ if (!result) {
+ // an old reservation that was never committed or aborted is in the map
+ // caller should try again assuming other IDs are ok
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict inside xattrs after rollover: %u", urgent_data.last_id);
+ return -EAGAIN;
+ }
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ // remove the last reservation from the reservation list
+ // and indicate that spillover happened
+ urgent_data.has_xattrs = true;
+ urgent_data.reservations.erase(last_reservation);
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+ }
+
+ ret = queue_write_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ CLS_LOG(20, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size);
+ CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size);
+ CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size);
+ CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length);
+
+ cls_2pc_queue_reserve_ret op_ret;
+ op_ret.id = urgent_data.last_id;
+ encode(op_ret, *out);
+
+ return 0;
+}
+
+static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ cls_2pc_queue_commit_op commit_op;
+ try {
+ auto in_iter = in->cbegin();
+ decode(commit_op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ // get head
+ cls_queue_head head;
+ int ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ auto it = urgent_data.reservations.find(commit_op.id);
+ cls_2pc_reservations xattr_reservations;
+ bufferlist bl_xattrs;
+ if (it == urgent_data.reservations.end()) {
+ if (!urgent_data.has_xattrs) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+ return -ENOENT;
+ }
+ // try to look for the reservation in xattrs
+ auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ if (ret == -ENOENT || ret == -ENODATA) {
+ // no xattrs, reservation does not exists
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+ return -ENOENT;
+ }
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ it = xattr_reservations.find(commit_op.id);
+ if (it == urgent_data.reservations.end()) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id);
+ return -ENOENT;
+ }
+ }
+
+ auto& res = it->second;
+ const auto actual_size = std::accumulate(commit_op.bl_data_vec.begin(),
+ commit_op.bl_data_vec.end(), 0UL, [] (uint64_t sum, const bufferlist& bl) {
+ return sum + bl.length();
+ });
+
+ if (res.size < actual_size) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: trying to commit %lu bytes to a %lu bytes reservation",
+ actual_size,
+ res.size);
+ return -EINVAL;
+ }
+
+ // commit the data to the queue
+ cls_queue_enqueue_op enqueue_op;
+ enqueue_op.bl_data_vec = std::move(commit_op.bl_data_vec);
+ ret = queue_enqueue(hctx, enqueue_op, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ urgent_data.reserved_size -= res.size;
+
+ if (xattr_reservations.empty()) {
+ // remove the reservation from urgent data
+ urgent_data.reservations.erase(it);
+ } else {
+ // remove the reservation from xattrs
+ xattr_reservations.erase(it);
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ }
+
+ CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size);
+ CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservation entries: %lu",
+ urgent_data.reservations.size() + xattr_reservations.size());
+
+ // write back head
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+ return queue_write_head(hctx, head);
+}
+
+static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ cls_2pc_queue_abort_op abort_op;
+ try {
+ auto in_iter = in->cbegin();
+ decode(abort_op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ // get head
+ cls_queue_head head;
+ int ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ auto it = urgent_data.reservations.find(abort_op.id);
+ uint64_t reservation_size;
+ if (it == urgent_data.reservations.end()) {
+ if (!urgent_data.has_xattrs) {
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+ return 0;
+ }
+ // try to look for the reservation in xattrs
+ bufferlist bl_xattrs;
+ auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ if (ret == -ENOENT || ret == -ENODATA) {
+ // no xattrs, reservation does not exists
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+ return 0;
+ }
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ auto iter = bl_xattrs.cbegin();
+ cls_2pc_reservations xattr_reservations;
+ try {
+ decode(xattr_reservations, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ it = xattr_reservations.find(abort_op.id);
+ if (it == xattr_reservations.end()) {
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id);
+ return 0;
+ }
+ reservation_size = it->second.size;
+ xattr_reservations.erase(it);
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ } else {
+ reservation_size = it->second.size;
+ urgent_data.reservations.erase(it);
+ }
+
+ // remove the reservation
+ urgent_data.reserved_size -= reservation_size;
+
+ CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size);
+
+ // write back head
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+ return queue_write_head(hctx, head);
+}
+
+static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ //get head
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found", urgent_data.reservations.size());
+ cls_2pc_queue_reservations_ret op_ret;
+ op_ret.reservations = std::move(urgent_data.reservations);
+ if (urgent_data.has_xattrs) {
+ // try to look for the reservation in xattrs
+ cls_2pc_reservations xattr_reservations;
+ bufferlist bl_xattrs;
+ ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ if (ret >= 0) {
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found in xatts", xattr_reservations.size());
+ op_ret.reservations.merge(xattr_reservations);
+ }
+ }
+ encode(op_ret, *out);
+
+ return 0;
+}
+
+static int cls_2pc_queue_expire_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+ cls_2pc_queue_expire_op expire_op;
+ try {
+ auto in_iter = in->cbegin();
+ decode(expire_op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ //get head
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: %lu reservation entries found", urgent_data.reservations.size());
+ CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: current reservations: %lu (bytes)", urgent_data.reserved_size);
+
+ uint64_t reservation_size = 0U;
+ auto stale_found = false;
+ auto xattr_stale_found = false;
+
+ for (auto it = urgent_data.reservations.begin(); it != urgent_data.reservations.end();) {
+ if (it->second.timestamp < expire_op.stale_time) {
+ CLS_LOG(5, "WARNING: cls_2pc_queue_expire_reservations: stale reservation %u will be removed", it->first);
+ reservation_size += it->second.size;
+ it = urgent_data.reservations.erase(it);
+ stale_found = true;
+ } else {
+ ++it;
+ }
+ }
+
+ if (urgent_data.has_xattrs) {
+ // try to look for the reservation in xattrs
+ cls_2pc_reservations xattr_reservations;
+ bufferlist bl_xattrs;
+ ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to read xattrs with: %d", ret);
+ return ret;
+ }
+ if (ret >= 0) {
+ auto iter = bl_xattrs.cbegin();
+ try {
+ decode(xattr_reservations, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to decode xattrs urgent data map");
+ return -EINVAL;
+ } //end - catch
+ CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: %lu reservation entries found in xatts", xattr_reservations.size());
+ for (auto it = xattr_reservations.begin(); it != xattr_reservations.end();) {
+ if (it->second.timestamp < expire_op.stale_time) {
+ CLS_LOG(5, "WARNING: cls_2pc_queue_expire_reservations: stale reservation %u will be removed", it->first);
+ reservation_size += it->second.size;
+ it = xattr_reservations.erase(it);
+ xattr_stale_found = true;
+ } else {
+ ++it;
+ }
+ }
+ if (xattr_stale_found) {
+ // write xattr back without stale reservations
+ bl_xattrs.clear();
+ encode(xattr_reservations, bl_xattrs);
+ ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs);
+ if (ret < 0) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to write xattrs with: %d", ret);
+ return ret;
+ }
+ }
+ }
+ }
+
+ if (stale_found || xattr_stale_found) {
+ urgent_data.reserved_size -= reservation_size;
+ CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: reservations after cleanup: %lu (bytes)", urgent_data.reserved_size);
+ // write back head without stale reservations
+ head.bl_urgent_data.clear();
+ encode(urgent_data, head.bl_urgent_data);
+ return queue_write_head(hctx, head);
+ }
+
+ return 0;
+}
+
+static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_list_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_list_entries: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_queue_list_ret op_ret;
+ ret = queue_list_entries(hctx, op, op_ret, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_remove_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what());
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = queue_remove_entries(hctx, op, head);
+ if (ret < 0) {
+ return ret;
+ }
+ return queue_write_head(hctx, head);
+}
+
+CLS_INIT(2pc_queue)
+{
+ CLS_LOG(1, "Loaded 2pc queue class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_2pc_queue_init;
+ cls_method_handle_t h_2pc_queue_get_capacity;
+ cls_method_handle_t h_2pc_queue_reserve;
+ cls_method_handle_t h_2pc_queue_commit;
+ cls_method_handle_t h_2pc_queue_abort;
+ cls_method_handle_t h_2pc_queue_list_reservations;
+ cls_method_handle_t h_2pc_queue_list_entries;
+ cls_method_handle_t h_2pc_queue_remove_entries;
+ cls_method_handle_t h_2pc_queue_expire_reservations;
+
+ cls_register(TPC_QUEUE_CLASS, &h_class);
+
+ cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
+ cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
+ cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
+ cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
+ cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
+ cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations);
+ cls_register_cxx_method(h_class, TPC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_2pc_queue_list_entries, &h_2pc_queue_list_entries);
+ cls_register_cxx_method(h_class, TPC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_remove_entries, &h_2pc_queue_remove_entries);
+ cls_register_cxx_method(h_class, TPC_QUEUE_EXPIRE_RESERVATIONS, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_expire_reservations, &h_2pc_queue_expire_reservations);
+
+ return;
+}
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc
new file mode 100644
index 000000000..6868b2b6f
--- /dev/null
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "cls/2pc_queue/cls_2pc_queue_client.h"
+#include "cls/2pc_queue/cls_2pc_queue_ops.h"
+#include "cls/2pc_queue/cls_2pc_queue_const.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_const.h"
+
+using namespace librados;
+
+void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, uint64_t size) {
+ bufferlist in;
+ cls_queue_init_op call;
+ call.queue_size = size;
+ encode(call, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in);
+}
+
+int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
+ cls_queue_get_capacity_ret op_ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ size = op_ret.queue_capacity;
+
+ return 0;
+}
+
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) {
+ bufferlist in, out;
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
+ if (r < 0 ) {
+ return r;
+ }
+
+ return cls_2pc_queue_get_capacity_result(out, size);
+}
+#endif
+
+// optionally async method for getting capacity (bytes)
+// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
+void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+ bufferlist in;
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
+}
+
+
+int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
+ cls_2pc_queue_reserve_ret op_ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ res_id = op_ret.id;
+
+ return 0;
+}
+
+int cls_2pc_queue_reserve(IoCtx& io_ctx, const std::string& queue_name,
+ uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) {
+ bufferlist in, out;
+ cls_2pc_queue_reserve_op reserve_op;
+ reserve_op.size = res_size;
+ reserve_op.entries = entries;
+
+ encode(reserve_op, in);
+ int rval;
+ ObjectWriteOperation op;
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval);
+ const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC);
+
+ if (r < 0) {
+ return r;
+ }
+
+ return cls_2pc_queue_reserve_result(out, res_id);
+}
+
+void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size,
+ uint32_t entries, bufferlist* obl, int* prval) {
+ bufferlist in;
+ cls_2pc_queue_reserve_op reserve_op;
+ reserve_op.size = res_size;
+ reserve_op.entries = entries;
+ encode(reserve_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval);
+}
+
+void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec,
+ cls_2pc_reservation::id_t res_id) {
+ bufferlist in;
+ cls_2pc_queue_commit_op commit_op;
+ commit_op.id = res_id;
+ commit_op.bl_data_vec = std::move(bl_data_vec);
+ encode(commit_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_COMMIT, in);
+}
+
+void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res_id) {
+ bufferlist in;
+ cls_2pc_queue_abort_op abort_op;
+ abort_op.id = res_id;
+ encode(abort_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in);
+}
+
+int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
+ bool *truncated, std::string& next_marker) {
+ cls_queue_list_ret ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ entries = std::move(ret.entries);
+ *truncated = ret.is_truncated;
+
+ next_marker = std::move(ret.next_marker);
+
+ return 0;
+}
+
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_list_entries(IoCtx& io_ctx,
+ const std::string& queue_name,
+ const std::string& marker, uint32_t max,
+ std::vector<cls_queue_entry>& entries,
+ bool *truncated, std::string& next_marker) {
+ bufferlist in, out;
+ cls_queue_list_op op;
+ op.start_marker = marker;
+ op.max = max;
+ encode(op, in);
+
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
+ if (r < 0) {
+ return r;
+ }
+ return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker);
+}
+#endif
+
+void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) {
+ bufferlist in;
+ cls_queue_list_op list_op;
+ list_op.start_marker = marker;
+ list_op.max = max;
+ encode(list_op, in);
+
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval);
+}
+
+int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) {
+ cls_2pc_queue_reservations_ret ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ reservations = std::move(ret.reservations);
+
+ return 0;
+}
+
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
+ bufferlist in, out;
+
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
+ if (r < 0) {
+ return r;
+ }
+ return cls_2pc_queue_list_reservations_result(out, reservations);
+}
+#endif
+
+void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+ bufferlist in;
+
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
+}
+
+void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
+ bufferlist in;
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = end_marker;
+ encode(rem_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
+}
+
+void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time) {
+ bufferlist in;
+ cls_2pc_queue_expire_op expire_op;
+ expire_op.stale_time = stale_time;
+ encode(expire_op, in);
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_EXPIRE_RESERVATIONS, in);
+}
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h
new file mode 100644
index 000000000..e0bdeafd5
--- /dev/null
+++ b/src/cls/2pc_queue/cls_2pc_queue_client.h
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include "include/rados/librados.hpp"
+#include "cls/queue/cls_queue_types.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+
+// initialize the queue with maximum size (bytes)
+// note that the actual size of the queue will be larger, as 24K bytes will be allocated in the head object
+// and more may be allocated as xattrs of the object (depending with the number of concurrent reservations)
+void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size);
+
+// these overloads which call io_ctx.operate() or io_ctx.exec() should not be called in the rgw.
+// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()/exec()
+#ifndef CLS_CLIENT_HIDE_IOCTX
+// return capacity (bytes)
+int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size);
+
+// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
+// return a reservation id if reservations is possible, 0 otherwise
+int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name,
+ uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id);
+
+// incremental listing of all entries in the queue
+int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max,
+ std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
+
+// list all pending reservations in the queue
+int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations);
+#endif
+
+// optionally async method for getting capacity (bytes)
+// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
+void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+
+int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);
+
+// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
+// notes:
+// (1) make sure that librados::OPERATION_RETURNVEC is passed to the executing function
+// (2) multiple operations cannot be executed in a batch (operations both read and write)
+// after answer is received, call cls_2pc_queue_reserve_result() to parse the results
+void cls_2pc_queue_reserve(librados::ObjectWriteOperation& op, uint64_t res_size,
+ uint32_t entries, bufferlist* obl, int* prval);
+
+int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id);
+
+// commit data using a reservation done beforehand
+// res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once
+// the size of bl_data_vec must be equal or smaller to the size reserved for the res_id
+// note that the number of entries in bl_data_vec does not have to match the number of entries reserved
+// only size (including the overhead of the entries) is checked
+void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec,
+ cls_2pc_reservation::id_t res_id);
+
+// abort a reservation
+// res_id must be allocated using cls_2pc_queue_reserve
+void cls_2pc_queue_abort(librados::ObjectWriteOperation& op,
+ cls_2pc_reservation::id_t res_id);
+
+// optionally async incremental listing of all entries in the queue
+// after answer is received, call cls_2pc_queue_list_entries_result() to parse the results
+void cls_2pc_queue_list_entries(librados::ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval);
+
+int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
+ bool *truncated, std::string& next_marker);
+
+// optionally async listing of all pending reservations in the queue
+// after answer is received, call cls_2pc_queue_list_reservations_result() to parse the results
+void cls_2pc_queue_list_reservations(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+
+int cls_2pc_queue_list_reservations_result(const librados::bufferlist& bl, cls_2pc_reservations& reservations);
+
+// expire stale reservations (older than the given time)
+void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op,
+ ceph::coarse_real_time stale_time);
+
+// remove all entries up to the given marker
+void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h
new file mode 100644
index 000000000..160c5b66e
--- /dev/null
+++ b/src/cls/2pc_queue/cls_2pc_queue_const.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#define TPC_QUEUE_CLASS "2pc_queue"
+
+#define TPC_QUEUE_INIT "2pc_queue_init"
+#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity"
+#define TPC_QUEUE_RESERVE "2pc_queue_reserve"
+#define TPC_QUEUE_COMMIT "2pc_queue_commit"
+#define TPC_QUEUE_ABORT "2pc_queue_abort"
+#define TPC_QUEUE_LIST_RESERVATIONS "2pc_queue_list_reservations"
+#define TPC_QUEUE_LIST_ENTRIES "2pc_queue_list_entries"
+#define TPC_QUEUE_REMOVE_ENTRIES "2pc_queue_remove_entries"
+#define TPC_QUEUE_EXPIRE_RESERVATIONS "2pc_queue_expire_reservations"
+
diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h
new file mode 100644
index 000000000..d0b84193d
--- /dev/null
+++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h
@@ -0,0 +1,117 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "include/types.h"
+#include "cls_2pc_queue_types.h"
+
+struct cls_2pc_queue_reserve_op {
+ uint64_t size;
+ uint32_t entries;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(size, bl);
+ encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(size, bl);
+ decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_op)
+
+struct cls_2pc_queue_reserve_ret {
+ cls_2pc_reservation::id_t id; // allocated reservation id
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_ret)
+
+struct cls_2pc_queue_commit_op {
+ cls_2pc_reservation::id_t id; // reservation to commit
+ std::vector<ceph::buffer::list> bl_data_vec; // the data to enqueue
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(bl_data_vec, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(bl_data_vec, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_commit_op)
+
+struct cls_2pc_queue_abort_op {
+ cls_2pc_reservation::id_t id; // reservation to abort
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_abort_op)
+
+struct cls_2pc_queue_expire_op {
+ // any reservation older than this time should expire
+ ceph::coarse_real_time stale_time;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(stale_time, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(stale_time, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_expire_op)
+
+struct cls_2pc_queue_reservations_ret {
+ cls_2pc_reservations reservations; // reservation list (keyed by id)
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(reservations, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(reservations, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h
new file mode 100644
index 000000000..7c94cdebf
--- /dev/null
+++ b/src/cls/2pc_queue/cls_2pc_queue_types.h
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include "include/types.h"
+
+struct cls_2pc_reservation
+{
+ using id_t = uint32_t;
+ inline static const id_t NO_ID{0};
+ uint64_t size; // how many entries are reserved
+ ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations)
+
+ cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) :
+ size(_size), timestamp(_timestamp) {}
+
+ cls_2pc_reservation() = default;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(size, bl);
+ encode(timestamp, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(size, bl);
+ decode(timestamp, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_reservation)
+
+using cls_2pc_reservations = ceph::unordered_map<cls_2pc_reservation::id_t, cls_2pc_reservation>;
+
+struct cls_2pc_urgent_data
+{
+ uint64_t reserved_size{0}; // pending reservations size in bytes
+ cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
+ cls_2pc_reservations reservations; // reservation list (keyed by id)
+ bool has_xattrs{false};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(reserved_size, bl);
+ encode(last_id, bl);
+ encode(reservations, bl);
+ encode(has_xattrs, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(reserved_size, bl);
+ decode(last_id, bl);
+ decode(reservations, bl);
+ decode(has_xattrs, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_2pc_urgent_data)