diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/cls/queue | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/cls/queue')
-rw-r--r-- | src/cls/queue/cls_queue.cc | 145 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_client.cc | 88 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_client.h | 16 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_const.h | 12 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_ops.h | 139 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_src.cc | 519 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_src.h | 16 | ||||
-rw-r--r-- | src/cls/queue/cls_queue_types.h | 120 |
8 files changed, 1055 insertions, 0 deletions
diff --git a/src/cls/queue/cls_queue.cc b/src/cls/queue/cls_queue.cc new file mode 100644 index 000000000..cf4daaac8 --- /dev/null +++ b/src/cls/queue/cls_queue.cc @@ -0,0 +1,145 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include <errno.h> + +#include "objclass/objclass.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue_src.h" + +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; + +CLS_VER(1,0) +CLS_NAME(queue) + +static int cls_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_init_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_init_op(): failed to decode entry\n"); + return -EINVAL; + } + + return queue_init(hctx, op); +} + +static int cls_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + cls_queue_get_capacity_ret op_ret; + auto ret = queue_get_capacity(hctx, op_ret); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_queue_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto iter = in->cbegin(); + cls_queue_enqueue_op op; + try { + decode(op, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_queue_enqueue: failed to decode input data \n"); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + ret = queue_enqueue(hctx, op, head); + if (ret < 0) { + return ret; + } + + //Write back head + return queue_write_head(hctx, head); +} + +static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_list_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(5, "ERROR: cls_queue_list_entries(): failed to decode input data\n"); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + cls_queue_list_ret op_ret; + ret = queue_list_entries(hctx, op, op_ret, head); + if (ret < 0) { + return ret; + } + + encode(op_ret, *out); + return 0; +} + +static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + cls_queue_remove_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(5, "ERROR: cls_queue_remove_entries: failed to decode input data\n"); + return -EINVAL; + } + + cls_queue_head head; + auto ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + ret = queue_remove_entries(hctx, op, head); + if (ret < 0) { + return ret; + } + return queue_write_head(hctx, head); +} + +CLS_INIT(queue) +{ + CLS_LOG(1, "Loaded queue class!"); + + cls_handle_t h_class; + cls_method_handle_t h_queue_init; + cls_method_handle_t h_queue_get_capacity; + cls_method_handle_t h_queue_enqueue; + cls_method_handle_t h_queue_list_entries; + cls_method_handle_t h_queue_remove_entries; + + cls_register(QUEUE_CLASS, &h_class); + + /* queue*/ + cls_register_cxx_method(h_class, QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_init, &h_queue_init); + cls_register_cxx_method(h_class, QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_queue_get_capacity, &h_queue_get_capacity); + cls_register_cxx_method(h_class, QUEUE_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_enqueue, &h_queue_enqueue); + cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_queue_list_entries, &h_queue_list_entries); + cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries); + + return; +} + diff --git a/src/cls/queue/cls_queue_client.cc b/src/cls/queue/cls_queue_client.cc new file mode 100644 index 000000000..87d17bb9e --- /dev/null +++ b/src/cls/queue/cls_queue_client.cc @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include <errno.h> + +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue_client.h" + +using namespace std; +using namespace librados; + +void cls_queue_init(ObjectWriteOperation& op, const string& queue_name, uint64_t size) +{ + bufferlist in; + cls_queue_init_op call; + call.max_urgent_data_size = 0; + call.queue_size = size; + encode(call, in); + op.exec(QUEUE_CLASS, QUEUE_INIT, in); +} + +int cls_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& size) +{ + bufferlist in, out; + int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out); + if (r < 0) + return r; + + cls_queue_get_capacity_ret op_ret; + auto iter = out.cbegin(); + try { + decode(op_ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + size = op_ret.queue_capacity; + + return 0; +} + +void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vector<bufferlist> bl_data_vec) +{ + bufferlist in; + cls_queue_enqueue_op call; + call.bl_data_vec = std::move(bl_data_vec); + encode(call, in); + op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in); +} + +int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max, + vector<cls_queue_entry>& entries, + bool *truncated, string& next_marker) +{ + bufferlist in, out; + cls_queue_list_op op; + op.start_marker = marker; + op.max = max; + encode(op, in); + + int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out); + if (r < 0) + return r; + + cls_queue_list_ret ret; + auto iter = out.cbegin(); + try { + decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + entries = std::move(ret.entries); + *truncated = ret.is_truncated; + + next_marker = std::move(ret.next_marker); + + return 0; +} + +void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker) +{ + bufferlist in, out; + cls_queue_remove_op rem_op; + rem_op.end_marker = end_marker; + encode(rem_op, in); + op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in); +} diff --git a/src/cls/queue/cls_queue_client.h b/src/cls/queue/cls_queue_client.h new file mode 100644 index 000000000..895a51c11 --- /dev/null +++ b/src/cls/queue/cls_queue_client.h @@ -0,0 +1,16 @@ +#ifndef CEPH_CLS_QUEUE_CLIENT_H +#define CEPH_CLS_QUEUE_CLIENT_H + +#include "include/rados/librados.hpp" +#include "cls/queue/cls_queue_types.h" +#include "cls_queue_ops.h" +#include "common/ceph_time.h" + +void cls_queue_init(librados::ObjectWriteOperation& op, const std::string& queue_name, uint64_t size); +int cls_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& oid, uint64_t& size); +void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, std::vector<bufferlist> bl_data_vec); +int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, uint32_t max, + std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker); +void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker); + +#endif diff --git a/src/cls/queue/cls_queue_const.h b/src/cls/queue/cls_queue_const.h new file mode 100644 index 000000000..3f289abb0 --- /dev/null +++ b/src/cls/queue/cls_queue_const.h @@ -0,0 +1,12 @@ +#ifndef CEPH_CLS_QUEUE_CONSTS_H +#define CEPH_CLS_QUEUE_CONSTS_H + +#define QUEUE_CLASS "queue" + +#define QUEUE_INIT "queue_init" +#define QUEUE_GET_CAPACITY "queue_get_capacity" +#define QUEUE_ENQUEUE "queue_enqueue" +#define QUEUE_LIST_ENTRIES "queue_list_entries" +#define QUEUE_REMOVE_ENTRIES "queue_remove_entries" + +#endif
\ No newline at end of file diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h new file mode 100644 index 000000000..64891cffb --- /dev/null +++ b/src/cls/queue/cls_queue_ops.h @@ -0,0 +1,139 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_QUEUE_OPS_H +#define CEPH_CLS_QUEUE_OPS_H + +#include "cls/queue/cls_queue_types.h" + +struct cls_queue_init_op { + uint64_t queue_size{0}; + uint64_t max_urgent_data_size{0}; + ceph::buffer::list bl_urgent_data; + + cls_queue_init_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(queue_size, bl); + encode(max_urgent_data_size, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(queue_size, bl); + decode(max_urgent_data_size, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } + +}; +WRITE_CLASS_ENCODER(cls_queue_init_op) + +struct cls_queue_enqueue_op { + std::vector<ceph::buffer::list> bl_data_vec; + + cls_queue_enqueue_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(bl_data_vec, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(bl_data_vec, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_enqueue_op) + +struct cls_queue_list_op { + uint64_t max; + std::string start_marker; + + cls_queue_list_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(max, bl); + encode(start_marker, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(max, bl); + decode(start_marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_list_op) + +struct cls_queue_list_ret { + bool is_truncated; + std::string next_marker; + std::vector<cls_queue_entry> entries; + + cls_queue_list_ret() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(is_truncated, bl); + encode(next_marker, bl); + encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(is_truncated, bl); + decode(next_marker, bl); + decode(entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_list_ret) + +struct cls_queue_remove_op { + std::string end_marker; + + cls_queue_remove_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(end_marker, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(end_marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_remove_op) + +struct cls_queue_get_capacity_ret { + uint64_t queue_capacity; + + cls_queue_get_capacity_ret() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(queue_capacity, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(queue_capacity, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret) + +#endif /* CEPH_CLS_QUEUE_OPS_H */ diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc new file mode 100644 index 000000000..b34d9929b --- /dev/null +++ b/src/cls/queue/cls_queue_src.cc @@ -0,0 +1,519 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include "objclass/objclass.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue_src.h" + +using std::string; +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; + +const uint64_t page_size = 4096; +const uint64_t large_chunk_size = 1ul << 22; + +int queue_write_head(cls_method_context_t hctx, cls_queue_head& head) +{ + bufferlist bl; + uint16_t entry_start = QUEUE_HEAD_START; + encode(entry_start, bl); + + bufferlist bl_head; + encode(head, bl_head); + + uint64_t encoded_len = bl_head.length(); + encode(encoded_len, bl); + + bl.claim_append(bl_head); + + if (bl.length() > head.max_head_size) { + CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length()); + return -EINVAL; + } + + int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(5, "ERROR: queue_write_head: failed to write head"); + return ret; + } + return 0; +} + +int queue_read_head(cls_method_context_t hctx, cls_queue_head& head) +{ + uint64_t chunk_size = page_size, start_offset = 0; + + bufferlist bl_head; + const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head); + if (ret < 0) { + CLS_LOG(5, "ERROR: queue_read_head: failed to read head"); + return ret; + } + if (ret == 0) { + CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet"); + return -EINVAL; + } + + //Process the chunk of data read + auto it = bl_head.cbegin(); + // Queue head start + uint16_t queue_head_start; + try { + decode(queue_head_start, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what()); + return -EINVAL; + } + if (queue_head_start != QUEUE_HEAD_START) { + CLS_LOG(0, "ERROR: queue_read_head: invalid queue start"); + return -EINVAL; + } + + uint64_t encoded_len; + try { + decode(encoded_len, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what()); + return -EINVAL; + } + + if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) { + start_offset = chunk_size; + chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD)); + bufferlist bl_remaining_head; + const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head"); + return ret; + } + bl_head.claim_append(bl_remaining_head); + } + + try { + decode(head, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what()); + return -EINVAL; + } + + return 0; +} + +int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op) +{ + //get head and its size + cls_queue_head head; + int ret = queue_read_head(hctx, head); + + //head is already initialized + if (ret == 0) { + return -EEXIST; + } + + if (ret < 0 && ret != -EINVAL) { + return ret; + } + + if (op.bl_urgent_data.length() > 0) { + head.bl_urgent_data = op.bl_urgent_data; + } + + head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size; + head.queue_size = op.queue_size + head.max_head_size; + head.max_urgent_data_size = op.max_urgent_data_size; + head.tail.gen = head.front.gen = 0; + head.tail.offset = head.front.offset = head.max_head_size; + + CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size); + CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size); + CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str()); + CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size); + + return queue_write_head(hctx, head); +} + +int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret) +{ + //get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + op_ret.queue_capacity = head.queue_size - head.max_head_size; + + CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity); + + return 0; +} + + +/* +enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in +one of two states: + +(1) split free space ++-------------+--------------------------------------------------------------------+ +| object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX | +| | ^ ^ | +| front tail | | | | ++---+------+--+----------------|-------------------------|-------------------------+ + | | | | + | +-------------------|-------------------------+ + +--------------------------+ + +(2) continuous free space ++-------------+--------------------------------------------------------------------+ +| object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX| +| | ^ ^ | +| front tail | | | | ++---+------+--+----------------|-------------------------|-------------------------+ + | | | | + | +-------------------+ | + +----------------------------------------------------+ +*/ + +int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head) +{ + if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) { + CLS_LOG(0, "ERROR: No space left in queue"); + return -ENOSPC; + } + + for (auto& bl_data : op.bl_data_vec) { + bufferlist bl; + uint16_t entry_start = QUEUE_ENTRY_START; + encode(entry_start, bl); + uint64_t data_size = bl_data.length(); + encode(data_size, bl); + bl.claim_append(bl_data); + + CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size); + + if (head.tail.offset >= head.front.offset) { + // check if data can fit in the remaining space in queue + if ((head.tail.offset + bl.length()) <= head.queue_size) { + CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length()); + //write data size and data at tail offset + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset += bl.length(); + } else { + uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size); + //Split data if there is free space available + if (bl.length() <= free_space_available) { + uint64_t size_before_wrap = head.queue_size - head.tail.offset; + bufferlist bl_data_before_wrap; + bl.splice(0, size_before_wrap, &bl_data_before_wrap); + //write spliced (data size and data) at tail offset + CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length()); + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset = head.max_head_size; + head.tail.gen += 1; + //write remaining data at tail offset after wrapping around + CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length()); + ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset += bl.length(); + } else { + CLS_LOG(0, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + } else if (head.front.offset > head.tail.offset) { + if ((head.tail.offset + bl.length()) <= head.front.offset) { + CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length()); + //write data size and data at tail offset + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset += bl.length(); + } else { + CLS_LOG(0, "ERROR: No space left in queue"); + // return queue full error + return -ENOSPC; + } + } + + if (head.tail.offset == head.queue_size) { + head.tail.offset = head.max_head_size; + head.tail.gen += 1; + } + CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str()); + } //end - for + + return 0; +} + +int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head) +{ + // If queue is empty, return from here + if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { + CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str()); + op_ret.next_marker = head.front.to_str(); + op_ret.is_truncated = false; + return 0; + } + + cls_queue_marker start_marker; + start_marker.from_str(op.start_marker.c_str()); + cls_queue_marker next_marker = {0, 0}; + + uint64_t start_offset = 0, gen = 0; + if (start_marker.offset == 0) { + start_offset = head.front.offset; + gen = head.front.gen; + } else { + start_offset = start_marker.offset; + gen = start_marker.gen; + } + + op_ret.is_truncated = true; + uint64_t contiguous_data_size = 0, size_to_read = 0; + bool wrap_around = false; + + //Calculate length of contiguous data to be read depending on front, tail and start offset + if (head.tail.offset > head.front.offset) { + contiguous_data_size = head.tail.offset - start_offset; + } else if (head.front.offset >= head.tail.offset) { + if (start_offset >= head.front.offset) { + contiguous_data_size = head.queue_size - start_offset; + wrap_around = true; + } else if (start_offset <= head.tail.offset) { + contiguous_data_size = head.tail.offset - start_offset; + } + } + + CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str()); + + bool offset_populated = false, entry_start_processed = false; + uint64_t data_size = 0, num_ops = 0; + uint16_t entry_start = 0; + bufferlist bl; + string last_marker; + do + { + CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset); + + bufferlist bl_chunk; + //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size + size_to_read = std::min(contiguous_data_size, large_chunk_size); + CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read); + if (size_to_read == 0) { + next_marker = head.tail; + op_ret.is_truncated = false; + CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n"); + break; + } + + auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); + if (ret < 0) { + return ret; + } + + //If there is leftover data from previous iteration, append new data to leftover data + uint64_t entry_start_offset = start_offset - bl.length(); + CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset); + bl.claim_append(bl_chunk); + bl_chunk = std::move(bl); + + CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length()); + + //Process the chunk of data read + unsigned index = 0; + auto it = bl_chunk.cbegin(); + uint64_t size_to_process = bl_chunk.length(); + do { + CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process); + cls_queue_entry entry; + ceph_assert(it.get_off() == index); + //Use the last marker saved in previous iteration as the marker for this entry + if (offset_populated) { + entry.marker = last_marker; + } + //Populate offset if not done in previous iteration + if (! offset_populated) { + cls_queue_marker marker = {entry_start_offset + index, gen}; + CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str()); + entry.marker = marker.to_str(); + } + // Magic number + Data size - process if not done in previous iteration + if (! entry_start_processed ) { + if (size_to_process >= QUEUE_ENTRY_OVERHEAD) { + // Decode magic number at start + try { + decode(entry_start, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what()); + return -EINVAL; + } + if (entry_start != QUEUE_ENTRY_START) { + CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start); + return -EINVAL; + } + index += sizeof(uint16_t); + size_to_process -= sizeof(uint16_t); + // Decode data size + try { + decode(data_size, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what()); + return -EINVAL; + } + } else { + // Copy unprocessed data to bl + bl_chunk.splice(index, size_to_process, &bl); + offset_populated = true; + last_marker = entry.marker; + CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!"); + break; + } + CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size); + index += sizeof(uint64_t); + size_to_process -= sizeof(uint64_t); + } + // Data + if (data_size <= size_to_process) { + it.copy(data_size, entry.data); + index += entry.data.length(); + size_to_process -= entry.data.length(); + } else { + it.copy(size_to_process, bl); + offset_populated = true; + entry_start_processed = true; + last_marker = entry.marker; + CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!"); + break; + } + op_ret.entries.emplace_back(entry); + // Resetting some values + offset_populated = false; + entry_start_processed = false; + data_size = 0; + entry_start = 0; + num_ops++; + last_marker.clear(); + if (num_ops == op.max) { + CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!"); + break; + } + } while(index < bl_chunk.length()); + + CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); + + if (num_ops == op.max) { + next_marker = cls_queue_marker{(entry_start_offset + index), gen}; + CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset); + break; + } + + //Calculate new start_offset and contiguous data size + start_offset += size_to_read; + contiguous_data_size -= size_to_read; + if (contiguous_data_size == 0) { + if (wrap_around) { + start_offset = head.max_head_size; + contiguous_data_size = head.tail.offset - head.max_head_size; + gen += 1; + wrap_around = false; + } else { + CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!"); + next_marker = head.tail; + op_ret.is_truncated = false; + break; + } + } + + } while(num_ops < op.max); + + //Wrap around next offset if it has reached end of queue + if (next_marker.offset == head.queue_size) { + next_marker.offset = head.max_head_size; + next_marker.gen += 1; + } + if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) { + op_ret.is_truncated = false; + } + + CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str()); + op_ret.next_marker = next_marker.to_str(); + + return 0; +} + +int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head) +{ + //Queue is empty + if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { + return 0; + } + + cls_queue_marker end_marker; + end_marker.from_str(op.end_marker.c_str()); + + CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str()); + + //Zero out the entries that have been removed, to reclaim storage space + if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) { + uint64_t len = end_marker.offset - head.front.offset; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); + if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str()); + return ret; + } + } + } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset + uint64_t len = head.queue_size - head.front.offset; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); + if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str()); + return ret; + } + } + len = end_marker.offset - head.max_head_size; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len); + if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size); + return ret; + } + } + } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) { + //no-op + } else { + CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen); + return -EINVAL; + } + + head.front = end_marker; + + // Check if it is the end, then wrap around + if (head.front.offset == head.queue_size) { + head.front.offset = head.max_head_size; + head.front.gen += 1; + } + + CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str()); + + return 0; +} diff --git a/src/cls/queue/cls_queue_src.h b/src/cls/queue/cls_queue_src.h new file mode 100644 index 000000000..9970b98ea --- /dev/null +++ b/src/cls/queue/cls_queue_src.h @@ -0,0 +1,16 @@ +#ifndef CEPH_CLS_QUEUE_SRC_H +#define CEPH_CLS_QUEUE_SRC_H + +#include "objclass/objclass.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" + +int queue_write_head(cls_method_context_t hctx, cls_queue_head& head); +int queue_read_head(cls_method_context_t hctx, cls_queue_head& head); +int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op); +int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret); +int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head); +int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head); +int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head); + +#endif /* CEPH_CLS_QUEUE_SRC_H */ diff --git a/src/cls/queue/cls_queue_types.h b/src/cls/queue/cls_queue_types.h new file mode 100644 index 000000000..cc46df405 --- /dev/null +++ b/src/cls/queue/cls_queue_types.h @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_QUEUE_TYPES_H +#define CEPH_CLS_QUEUE_TYPES_H + +#include <errno.h> +#include "include/types.h" + +//Size of head leaving out urgent data +#define QUEUE_HEAD_SIZE_1K 1024 + +#define QUEUE_START_OFFSET_1K QUEUE_HEAD_SIZE_1K + +constexpr unsigned int QUEUE_HEAD_START = 0xDEAD; +constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF; +constexpr unsigned int QUEUE_ENTRY_OVERHEAD = sizeof(uint16_t) + sizeof(uint64_t); + +struct cls_queue_entry +{ + ceph::buffer::list data; + std::string marker; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(data, bl); + encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(data, bl); + decode(marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_entry) + +struct cls_queue_marker +{ + uint64_t offset{0}; + uint64_t gen{0}; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(gen, bl); + encode(offset, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(gen, bl); + decode(offset, bl); + DECODE_FINISH(bl); + } + + std::string to_str() { + return std::to_string(gen) + '/' + std::to_string(offset); + } + + int from_str(const char* str) { + errno = 0; + char* end = nullptr; + gen = ::strtoull(str, &end, 10); + if (errno) { + return errno; + } + if (str == end || *end != '/') { // expects delimiter + return -EINVAL; + } + str = end + 1; + offset = ::strtoull(str, &end, 10); + if (errno) { + return errno; + } + if (str == end || *end != 0) { // expects null terminator + return -EINVAL; + } + return 0; + } + +}; +WRITE_CLASS_ENCODER(cls_queue_marker) + +struct cls_queue_head +{ + uint64_t max_head_size = QUEUE_HEAD_SIZE_1K; + cls_queue_marker front{QUEUE_START_OFFSET_1K, 0}; + cls_queue_marker tail{QUEUE_START_OFFSET_1K, 0}; + uint64_t queue_size{0}; // size of queue requested by user, with head size added to it + uint64_t max_urgent_data_size{0}; + ceph::buffer::list bl_urgent_data; // special data known to application using queue + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(max_head_size, bl); + encode(front, bl); + encode(tail, bl); + encode(queue_size, bl); + encode(max_urgent_data_size, bl); + encode(bl_urgent_data, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(max_head_size, bl); + decode(front, bl); + decode(tail, bl); + decode(queue_size, bl); + decode(max_urgent_data_size, bl); + decode(bl_urgent_data, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_queue_head) + +#endif |