From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/cls/2pc_queue/cls_2pc_queue.cc | 602 ++++++++++++++++++++++++++++++ src/cls/2pc_queue/cls_2pc_queue_client.cc | 208 +++++++++++ src/cls/2pc_queue/cls_2pc_queue_client.h | 84 +++++ src/cls/2pc_queue/cls_2pc_queue_const.h | 14 + src/cls/2pc_queue/cls_2pc_queue_ops.h | 117 ++++++ src/cls/2pc_queue/cls_2pc_queue_types.h | 62 +++ 6 files changed, 1087 insertions(+) create mode 100644 src/cls/2pc_queue/cls_2pc_queue.cc create mode 100644 src/cls/2pc_queue/cls_2pc_queue_client.cc create mode 100644 src/cls/2pc_queue/cls_2pc_queue_client.h create mode 100644 src/cls/2pc_queue/cls_2pc_queue_const.h create mode 100644 src/cls/2pc_queue/cls_2pc_queue_ops.h create mode 100644 src/cls/2pc_queue/cls_2pc_queue_types.h (limited to 'src/cls/2pc_queue') diff --git a/src/cls/2pc_queue/cls_2pc_queue.cc b/src/cls/2pc_queue/cls_2pc_queue.cc new file mode 100644 index 000000000..fba763955 --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue.cc @@ -0,0 +1,602 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include "cls/2pc_queue/cls_2pc_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_ops.h" +#include "cls/2pc_queue/cls_2pc_queue_const.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_src.h" +#include "objclass/objclass.h" + +CLS_VER(1,0) +CLS_NAME(2pc_queue) + +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; + +constexpr auto CLS_QUEUE_URGENT_DATA_XATTR_NAME = "cls_queue_urgent_data"; + +static int cls_2pc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + auto in_iter = in->cbegin(); + + cls_queue_init_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_init: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_2pc_urgent_data urgent_data; + + cls_queue_init_op init_op; + + CLS_LOG(20, "INFO: cls_2pc_queue_init: max size is %lu (bytes)", op.queue_size); + + init_op.queue_size = op.queue_size; + init_op.max_urgent_data_size = 23552; // overall head is 24KB ~ pending 1K reservations ops + encode(urgent_data, init_op.bl_urgent_data); + + return queue_init(hctx, init_op); +} + +static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_queue_get_capacity_ret op_ret; + auto ret = queue_get_capacity(hctx, op_ret); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_reserve_op res_op; + try { + auto in_iter = in->cbegin(); + decode(res_op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + if (res_op.size == 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero bytes"); + return -EINVAL; + } + if (res_op.entries == 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: cannot reserve zero entries"); + return -EINVAL; + } + + // get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + const auto overhead = res_op.entries*QUEUE_ENTRY_OVERHEAD; + const auto remaining_size = (head.tail.offset >= head.front.offset) ? + (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) : + head.front.offset - head.tail.offset; + + + if (res_op.size + urgent_data.reserved_size + overhead > remaining_size) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservations exceeded maximum capacity"); + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size); + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size); + return -ENOSPC; + } + + urgent_data.reserved_size += res_op.size + overhead; + // note that last id is incremented regadless of failures + // to avoid "old reservation" issues below + ++urgent_data.last_id; + bool result; + cls_2pc_reservations::iterator last_reservation; + std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct, + std::forward_as_tuple(urgent_data.last_id), + std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now())); + if (!result) { + // an old reservation that was never committed or aborted is in the map + // caller should try again assuming other IDs are ok + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict after rollover: %u", urgent_data.last_id); + return -EAGAIN; + } + + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + + const uint64_t urgent_data_length = head.bl_urgent_data.length(); + + if (head.max_urgent_data_size < urgent_data_length) { + CLS_LOG(10, "INFO: cls_2pc_queue_reserve: urgent data size: %lu exceeded maximum: %lu using xattrs", urgent_data_length, head.max_urgent_data_size); + // add the last reservation to xattrs + bufferlist bl_xattrs; + auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to read xattrs with: %d", ret); + return ret; + } + cls_2pc_reservations xattr_reservations; + if (ret >= 0) { + // xattrs exist + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + } + std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct, + std::forward_as_tuple(urgent_data.last_id), + std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now())); + if (!result) { + // an old reservation that was never committed or aborted is in the map + // caller should try again assuming other IDs are ok + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: reservation id conflict inside xattrs after rollover: %u", urgent_data.last_id); + return -EAGAIN; + } + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_reserve: failed to write xattrs with: %d", ret); + return ret; + } + // remove the last reservation from the reservation list + // and indicate that spillover happened + urgent_data.has_xattrs = true; + urgent_data.reservations.erase(last_reservation); + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + } + + ret = queue_write_head(hctx, head); + if (ret < 0) { + return ret; + } + + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: remaining size: %lu (bytes)", remaining_size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: requested size: %lu (bytes)", res_op.size); + CLS_LOG(20, "INFO: cls_2pc_queue_reserve: urgent data size: %lu (bytes)", urgent_data_length); + + cls_2pc_queue_reserve_ret op_ret; + op_ret.id = urgent_data.last_id; + encode(op_ret, *out); + + return 0; +} + +static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_commit_op commit_op; + try { + auto in_iter = in->cbegin(); + decode(commit_op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + // get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + auto it = urgent_data.reservations.find(commit_op.id); + cls_2pc_reservations xattr_reservations; + bufferlist bl_xattrs; + if (it == urgent_data.reservations.end()) { + if (!urgent_data.has_xattrs) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } + // try to look for the reservation in xattrs + auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + if (ret == -ENOENT || ret == -ENODATA) { + // no xattrs, reservation does not exists + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to read xattrs with: %d", ret); + return ret; + } + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + it = xattr_reservations.find(commit_op.id); + if (it == urgent_data.reservations.end()) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: reservation does not exist: %u", commit_op.id); + return -ENOENT; + } + } + + auto& res = it->second; + const auto actual_size = std::accumulate(commit_op.bl_data_vec.begin(), + commit_op.bl_data_vec.end(), 0UL, [] (uint64_t sum, const bufferlist& bl) { + return sum + bl.length(); + }); + + if (res.size < actual_size) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: trying to commit %lu bytes to a %lu bytes reservation", + actual_size, + res.size); + return -EINVAL; + } + + // commit the data to the queue + cls_queue_enqueue_op enqueue_op; + enqueue_op.bl_data_vec = std::move(commit_op.bl_data_vec); + ret = queue_enqueue(hctx, enqueue_op, head); + if (ret < 0) { + return ret; + } + + urgent_data.reserved_size -= res.size; + + if (xattr_reservations.empty()) { + // remove the reservation from urgent data + urgent_data.reservations.erase(it); + } else { + // remove the reservation from xattrs + xattr_reservations.erase(it); + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_commit: failed to write xattrs with: %d", ret); + return ret; + } + } + + CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservations: %lu (bytes)", urgent_data.reserved_size); + CLS_LOG(20, "INFO: cls_2pc_queue_commit: current reservation entries: %lu", + urgent_data.reservations.size() + xattr_reservations.size()); + + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); +} + +static int cls_2pc_queue_abort(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_abort_op abort_op; + try { + auto in_iter = in->cbegin(); + decode(abort_op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + // get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + auto it = urgent_data.reservations.find(abort_op.id); + uint64_t reservation_size; + if (it == urgent_data.reservations.end()) { + if (!urgent_data.has_xattrs) { + CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + // try to look for the reservation in xattrs + bufferlist bl_xattrs; + auto ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + if (ret == -ENOENT || ret == -ENODATA) { + // no xattrs, reservation does not exists + CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to read xattrs with: %d", ret); + return ret; + } + auto iter = bl_xattrs.cbegin(); + cls_2pc_reservations xattr_reservations; + try { + decode(xattr_reservations, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + it = xattr_reservations.find(abort_op.id); + if (it == xattr_reservations.end()) { + CLS_LOG(20, "INFO: cls_2pc_queue_abort: reservation does not exist: %u", abort_op.id); + return 0; + } + reservation_size = it->second.size; + xattr_reservations.erase(it); + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_abort: failed to write xattrs with: %d", ret); + return ret; + } + } else { + reservation_size = it->second.size; + urgent_data.reservations.erase(it); + } + + // remove the reservation + urgent_data.reserved_size -= reservation_size; + + CLS_LOG(20, "INFO: cls_2pc_queue_abort: current reservations: %lu (bytes)", urgent_data.reserved_size); + + // write back head + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); +} + +static int cls_2pc_queue_list_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + //get head + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found", urgent_data.reservations.size()); + cls_2pc_queue_reservations_ret op_ret; + op_ret.reservations = std::move(urgent_data.reservations); + if (urgent_data.has_xattrs) { + // try to look for the reservation in xattrs + cls_2pc_reservations xattr_reservations; + bufferlist bl_xattrs; + ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to read xattrs with: %d", ret); + return ret; + } + if (ret >= 0) { + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_reservations: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + CLS_LOG(20, "INFO: cls_2pc_queue_list_reservations: %lu reservation entries found in xatts", xattr_reservations.size()); + op_ret.reservations.merge(xattr_reservations); + } + } + encode(op_ret, *out); + + return 0; +} + +static int cls_2pc_queue_expire_reservations(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { + cls_2pc_queue_expire_op expire_op; + try { + auto in_iter = in->cbegin(); + decode(expire_op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + //get head + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_2pc_urgent_data urgent_data; + try { + auto in_iter = head.bl_urgent_data.cbegin(); + decode(urgent_data, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: %lu reservation entries found", urgent_data.reservations.size()); + CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: current reservations: %lu (bytes)", urgent_data.reserved_size); + + uint64_t reservation_size = 0U; + auto stale_found = false; + auto xattr_stale_found = false; + + for (auto it = urgent_data.reservations.begin(); it != urgent_data.reservations.end();) { + if (it->second.timestamp < expire_op.stale_time) { + CLS_LOG(5, "WARNING: cls_2pc_queue_expire_reservations: stale reservation %u will be removed", it->first); + reservation_size += it->second.size; + it = urgent_data.reservations.erase(it); + stale_found = true; + } else { + ++it; + } + } + + if (urgent_data.has_xattrs) { + // try to look for the reservation in xattrs + cls_2pc_reservations xattr_reservations; + bufferlist bl_xattrs; + ret = cls_cxx_getxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) { + CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to read xattrs with: %d", ret); + return ret; + } + if (ret >= 0) { + auto iter = bl_xattrs.cbegin(); + try { + decode(xattr_reservations, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to decode xattrs urgent data map"); + return -EINVAL; + } //end - catch + CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: %lu reservation entries found in xatts", xattr_reservations.size()); + for (auto it = xattr_reservations.begin(); it != xattr_reservations.end();) { + if (it->second.timestamp < expire_op.stale_time) { + CLS_LOG(5, "WARNING: cls_2pc_queue_expire_reservations: stale reservation %u will be removed", it->first); + reservation_size += it->second.size; + it = xattr_reservations.erase(it); + xattr_stale_found = true; + } else { + ++it; + } + } + if (xattr_stale_found) { + // write xattr back without stale reservations + bl_xattrs.clear(); + encode(xattr_reservations, bl_xattrs); + ret = cls_cxx_setxattr(hctx, CLS_QUEUE_URGENT_DATA_XATTR_NAME, &bl_xattrs); + if (ret < 0) { + CLS_LOG(1, "ERROR: cls_2pc_queue_expire_reservations: failed to write xattrs with: %d", ret); + return ret; + } + } + } + } + + if (stale_found || xattr_stale_found) { + urgent_data.reserved_size -= reservation_size; + CLS_LOG(20, "INFO: cls_2pc_queue_expire_reservations: reservations after cleanup: %lu (bytes)", urgent_data.reserved_size); + // write back head without stale reservations + head.bl_urgent_data.clear(); + encode(urgent_data, head.bl_urgent_data); + return queue_write_head(hctx, head); + } + + return 0; +} + +static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_list_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_list_entries: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_queue_list_ret op_ret; + ret = queue_list_entries(hctx, op, op_ret, head); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_remove_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_2pc_queue_remove_entries: failed to decode entry: %s", err.what()); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + ret = queue_remove_entries(hctx, op, head); + if (ret < 0) { + return ret; + } + return queue_write_head(hctx, head); +} + +CLS_INIT(2pc_queue) +{ + CLS_LOG(1, "Loaded 2pc queue class!"); + + cls_handle_t h_class; + cls_method_handle_t h_2pc_queue_init; + cls_method_handle_t h_2pc_queue_get_capacity; + cls_method_handle_t h_2pc_queue_reserve; + cls_method_handle_t h_2pc_queue_commit; + cls_method_handle_t h_2pc_queue_abort; + cls_method_handle_t h_2pc_queue_list_reservations; + cls_method_handle_t h_2pc_queue_list_entries; + cls_method_handle_t h_2pc_queue_remove_entries; + cls_method_handle_t h_2pc_queue_expire_reservations; + + cls_register(TPC_QUEUE_CLASS, &h_class); + + cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init); + cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity); + cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve); + cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit); + cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort); + cls_register_cxx_method(h_class, TPC_QUEUE_LIST_RESERVATIONS, CLS_METHOD_RD, cls_2pc_queue_list_reservations, &h_2pc_queue_list_reservations); + cls_register_cxx_method(h_class, TPC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_2pc_queue_list_entries, &h_2pc_queue_list_entries); + cls_register_cxx_method(h_class, TPC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_remove_entries, &h_2pc_queue_remove_entries); + cls_register_cxx_method(h_class, TPC_QUEUE_EXPIRE_RESERVATIONS, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_expire_reservations, &h_2pc_queue_expire_reservations); + + return; +} + diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.cc b/src/cls/2pc_queue/cls_2pc_queue_client.cc new file mode 100644 index 000000000..3eb2d0f6a --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_client.cc @@ -0,0 +1,208 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "cls/2pc_queue/cls_2pc_queue_client.h" +#include "cls/2pc_queue/cls_2pc_queue_ops.h" +#include "cls/2pc_queue/cls_2pc_queue_const.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" + +using namespace librados; + +void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, uint64_t size) { + bufferlist in; + cls_queue_init_op call; + call.queue_size = size; + encode(call, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in); +} + +int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) { + cls_queue_get_capacity_ret op_ret; + auto iter = bl.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + size = op_ret.queue_capacity; + + return 0; +} + +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const string& queue_name, uint64_t& size) { + bufferlist in, out; + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out); + if (r < 0 ) { + return r; + } + + return cls_2pc_queue_get_capacity_result(out, size); +} +#endif + +// optionally async method for getting capacity (bytes) +// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results +void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) { + bufferlist in; + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval); +} + + +int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) { + cls_2pc_queue_reserve_ret op_ret; + auto iter = bl.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + res_id = op_ret.id; + + return 0; +} + +int cls_2pc_queue_reserve(IoCtx& io_ctx, const string& queue_name, + uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) { + bufferlist in, out; + cls_2pc_queue_reserve_op reserve_op; + reserve_op.size = res_size; + reserve_op.entries = entries; + + encode(reserve_op, in); + int rval; + ObjectWriteOperation op; + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval); + const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC); + + if (r < 0) { + return r; + } + + return cls_2pc_queue_reserve_result(out, res_id); +} + +void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size, + uint32_t entries, bufferlist* obl, int* prval) { + bufferlist in; + cls_2pc_queue_reserve_op reserve_op; + reserve_op.size = res_size; + reserve_op.entries = entries; + encode(reserve_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval); +} + +void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector bl_data_vec, + cls_2pc_reservation::id_t res_id) { + bufferlist in; + cls_2pc_queue_commit_op commit_op; + commit_op.id = res_id; + commit_op.bl_data_vec = std::move(bl_data_vec); + encode(commit_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_COMMIT, in); +} + +void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res_id) { + bufferlist in; + cls_2pc_queue_abort_op abort_op; + abort_op.id = res_id; + encode(abort_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in); +} + +int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector& entries, + bool *truncated, std::string& next_marker) { + cls_queue_list_ret ret; + auto iter = bl.cbegin(); + try { + decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + entries = std::move(ret.entries); + *truncated = ret.is_truncated; + + next_marker = std::move(ret.next_marker); + + return 0; +} + +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_list_entries(IoCtx& io_ctx, const string& queue_name, const string& marker, uint32_t max, + std::vector& entries, + bool *truncated, std::string& next_marker) { + bufferlist in, out; + cls_queue_list_op op; + op.start_marker = marker; + op.max = max; + encode(op, in); + + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out); + if (r < 0) { + return r; + } + return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker); +} +#endif + +void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) { + bufferlist in; + cls_queue_list_op list_op; + list_op.start_marker = marker; + list_op.max = max; + encode(list_op, in); + + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval); +} + +int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) { + cls_2pc_queue_reservations_ret ret; + auto iter = bl.cbegin(); + try { + decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + reservations = std::move(ret.reservations); + + return 0; +} + +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) { + bufferlist in, out; + + const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out); + if (r < 0) { + return r; + } + return cls_2pc_queue_list_reservations_result(out, reservations); +} +#endif + +void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) { + bufferlist in; + + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval); +} + +void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) { + bufferlist in; + cls_queue_remove_op rem_op; + rem_op.end_marker = end_marker; + encode(rem_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in); +} + +void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time) { + bufferlist in; + cls_2pc_queue_expire_op expire_op; + expire_op.stale_time = stale_time; + encode(expire_op, in); + op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_EXPIRE_RESERVATIONS, in); +} + diff --git a/src/cls/2pc_queue/cls_2pc_queue_client.h b/src/cls/2pc_queue/cls_2pc_queue_client.h new file mode 100644 index 000000000..bbdfcc2e0 --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_client.h @@ -0,0 +1,84 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include "include/rados/librados.hpp" +#include "cls/queue/cls_queue_types.h" +#include "cls/2pc_queue/cls_2pc_queue_types.h" + +// initialize the queue with maximum size (bytes) +// note that the actual size of the queue will be larger, as 24K bytes will be allocated in the head object +// and more may be allocated as xattrs of the object (depending with the number of concurrent reservations) +void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size); + +// these overloads which call io_ctx.operate() or io_ctx.exec() should not be called in the rgw. +// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()/exec() +#ifndef CLS_CLIENT_HIDE_IOCTX +// return capacity (bytes) +int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const string& queue_name, uint64_t& size); + +// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) +// return a reservation id if reservations is possible, 0 otherwise +int cls_2pc_queue_reserve(librados::IoCtx& io_ctx, const std::string& queue_name, + uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id); + +// incremental listing of all entries in the queue +int cls_2pc_queue_list_entries(librados::IoCtx& io_ctx, const std::string& queue_name, const std::string& marker, uint32_t max, + std::vector& entries, bool *truncated, std::string& next_marker); + +// list all pending reservations in the queue +int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations); +#endif + +// optionally async method for getting capacity (bytes) +// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results +void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); + +int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size); + +// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead) +// notes: +// (1) make sure that librados::OPERATION_RETURNVEC is passed to the executing function +// (2) multiple operations cannot be executed in a batch (operations both read and write) +// after answer is received, call cls_2pc_queue_reserve_result() to parse the results +void cls_2pc_queue_reserve(librados::ObjectWriteOperation& op, uint64_t res_size, + uint32_t entries, bufferlist* obl, int* prval); + +int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id); + +// commit data using a reservation done beforehand +// res_id must be allocated using cls_2pc_queue_reserve, and could be either committed or aborted once +// the size of bl_data_vec must be equal or smaller to the size reserved for the res_id +// note that the number of entries in bl_data_vec does not have to match the number of entries reserved +// only size (including the overhead of the entries) is checked +void cls_2pc_queue_commit(librados::ObjectWriteOperation& op, std::vector bl_data_vec, + cls_2pc_reservation::id_t res_id); + +// abort a reservation +// res_id must be allocated using cls_2pc_queue_reserve +void cls_2pc_queue_abort(librados::ObjectWriteOperation& op, + cls_2pc_reservation::id_t res_id); + +// optionally async incremental listing of all entries in the queue +// after answer is received, call cls_2pc_queue_list_entries_result() to parse the results +void cls_2pc_queue_list_entries(librados::ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval); + +int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector& entries, + bool *truncated, std::string& next_marker); + +// optionally async listing of all pending reservations in the queue +// after answer is received, call cls_2pc_queue_list_reservations_result() to parse the results +void cls_2pc_queue_list_reservations(librados::ObjectReadOperation& op, bufferlist* obl, int* prval); + +int cls_2pc_queue_list_reservations_result(const librados::bufferlist& bl, cls_2pc_reservations& reservations); + +// expire stale reservations (older than the given time) +void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, + ceph::coarse_real_time stale_time); + +// remove all entries up to the given marker +void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); + diff --git a/src/cls/2pc_queue/cls_2pc_queue_const.h b/src/cls/2pc_queue/cls_2pc_queue_const.h new file mode 100644 index 000000000..160c5b66e --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_const.h @@ -0,0 +1,14 @@ +#pragma once + +#define TPC_QUEUE_CLASS "2pc_queue" + +#define TPC_QUEUE_INIT "2pc_queue_init" +#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity" +#define TPC_QUEUE_RESERVE "2pc_queue_reserve" +#define TPC_QUEUE_COMMIT "2pc_queue_commit" +#define TPC_QUEUE_ABORT "2pc_queue_abort" +#define TPC_QUEUE_LIST_RESERVATIONS "2pc_queue_list_reservations" +#define TPC_QUEUE_LIST_ENTRIES "2pc_queue_list_entries" +#define TPC_QUEUE_REMOVE_ENTRIES "2pc_queue_remove_entries" +#define TPC_QUEUE_EXPIRE_RESERVATIONS "2pc_queue_expire_reservations" + diff --git a/src/cls/2pc_queue/cls_2pc_queue_ops.h b/src/cls/2pc_queue/cls_2pc_queue_ops.h new file mode 100644 index 000000000..d0b84193d --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_ops.h @@ -0,0 +1,117 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "include/types.h" +#include "cls_2pc_queue_types.h" + +struct cls_2pc_queue_reserve_op { + uint64_t size; + uint32_t entries; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(size, bl); + encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(size, bl); + decode(entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_op) + +struct cls_2pc_queue_reserve_ret { + cls_2pc_reservation::id_t id; // allocated reservation id + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_reserve_ret) + +struct cls_2pc_queue_commit_op { + cls_2pc_reservation::id_t id; // reservation to commit + std::vector bl_data_vec; // the data to enqueue + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(bl_data_vec, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(bl_data_vec, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_commit_op) + +struct cls_2pc_queue_abort_op { + cls_2pc_reservation::id_t id; // reservation to abort + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_abort_op) + +struct cls_2pc_queue_expire_op { + // any reservation older than this time should expire + ceph::coarse_real_time stale_time; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(stale_time, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(stale_time, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_expire_op) + +struct cls_2pc_queue_reservations_ret { + cls_2pc_reservations reservations; // reservation list (keyed by id) + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(reservations, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(reservations, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret) diff --git a/src/cls/2pc_queue/cls_2pc_queue_types.h b/src/cls/2pc_queue/cls_2pc_queue_types.h new file mode 100644 index 000000000..7c94cdebf --- /dev/null +++ b/src/cls/2pc_queue/cls_2pc_queue_types.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#pragma once + +#include "include/types.h" + +struct cls_2pc_reservation +{ + using id_t = uint32_t; + inline static const id_t NO_ID{0}; + uint64_t size; // how many entries are reserved + ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations) + + cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) : + size(_size), timestamp(_timestamp) {} + + cls_2pc_reservation() = default; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(size, bl); + encode(timestamp, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(size, bl); + decode(timestamp, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_reservation) + +using cls_2pc_reservations = ceph::unordered_map; + +struct cls_2pc_urgent_data +{ + uint64_t reserved_size{0}; // pending reservations size in bytes + cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id + cls_2pc_reservations reservations; // reservation list (keyed by id) + bool has_xattrs{false}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(reserved_size, bl); + encode(last_id, bl); + encode(reservations, bl); + encode(has_xattrs, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(reserved_size, bl); + decode(last_id, bl); + decode(reservations, bl); + decode(has_xattrs, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_2pc_urgent_data) -- cgit v1.2.3