summaryrefslogtreecommitdiffstats
path: root/src/cls/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/cls/queue')
-rw-r--r--src/cls/queue/cls_queue.cc145
-rw-r--r--src/cls/queue/cls_queue_client.cc87
-rw-r--r--src/cls/queue/cls_queue_client.h16
-rw-r--r--src/cls/queue/cls_queue_const.h12
-rw-r--r--src/cls/queue/cls_queue_ops.h139
-rw-r--r--src/cls/queue/cls_queue_src.cc520
-rw-r--r--src/cls/queue/cls_queue_src.h16
-rw-r--r--src/cls/queue/cls_queue_types.h120
8 files changed, 1055 insertions, 0 deletions
diff --git a/src/cls/queue/cls_queue.cc b/src/cls/queue/cls_queue.cc
new file mode 100644
index 000000000..cf4daaac8
--- /dev/null
+++ b/src/cls/queue/cls_queue.cc
@@ -0,0 +1,145 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include <errno.h>
+
+#include "objclass/objclass.h"
+#include "cls/queue/cls_queue_types.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_const.h"
+#include "cls/queue/cls_queue_src.h"
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+
+CLS_VER(1,0)
+CLS_NAME(queue)
+
+static int cls_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_init_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_queue_init_op(): failed to decode entry\n");
+ return -EINVAL;
+ }
+
+ return queue_init(hctx, op);
+}
+
+static int cls_queue_get_capacity(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_queue_get_capacity_ret op_ret;
+ auto ret = queue_get_capacity(hctx, op_ret);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_queue_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto iter = in->cbegin();
+ cls_queue_enqueue_op op;
+ try {
+ decode(op, iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_queue_enqueue: failed to decode input data \n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = queue_enqueue(hctx, op, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ //Write back head
+ return queue_write_head(hctx, head);
+}
+
+static int cls_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_list_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_queue_list_entries(): failed to decode input data\n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ cls_queue_list_ret op_ret;
+ ret = queue_list_entries(hctx, op, op_ret, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ encode(op_ret, *out);
+ return 0;
+}
+
+static int cls_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ auto in_iter = in->cbegin();
+ cls_queue_remove_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(5, "ERROR: cls_queue_remove_entries: failed to decode input data\n");
+ return -EINVAL;
+ }
+
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = queue_remove_entries(hctx, op, head);
+ if (ret < 0) {
+ return ret;
+ }
+ return queue_write_head(hctx, head);
+}
+
+CLS_INIT(queue)
+{
+ CLS_LOG(1, "Loaded queue class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_queue_init;
+ cls_method_handle_t h_queue_get_capacity;
+ cls_method_handle_t h_queue_enqueue;
+ cls_method_handle_t h_queue_list_entries;
+ cls_method_handle_t h_queue_remove_entries;
+
+ cls_register(QUEUE_CLASS, &h_class);
+
+ /* queue*/
+ cls_register_cxx_method(h_class, QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_init, &h_queue_init);
+ cls_register_cxx_method(h_class, QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_queue_get_capacity, &h_queue_get_capacity);
+ cls_register_cxx_method(h_class, QUEUE_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_enqueue, &h_queue_enqueue);
+ cls_register_cxx_method(h_class, QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_queue_list_entries, &h_queue_list_entries);
+ cls_register_cxx_method(h_class, QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_queue_remove_entries, &h_queue_remove_entries);
+
+ return;
+}
+
diff --git a/src/cls/queue/cls_queue_client.cc b/src/cls/queue/cls_queue_client.cc
new file mode 100644
index 000000000..97f67d3d7
--- /dev/null
+++ b/src/cls/queue/cls_queue_client.cc
@@ -0,0 +1,87 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include <errno.h>
+
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_const.h"
+#include "cls/queue/cls_queue_client.h"
+
+using namespace librados;
+
+void cls_queue_init(ObjectWriteOperation& op, const string& queue_name, uint64_t size)
+{
+ bufferlist in;
+ cls_queue_init_op call;
+ call.max_urgent_data_size = 0;
+ call.queue_size = size;
+ encode(call, in);
+ op.exec(QUEUE_CLASS, QUEUE_INIT, in);
+}
+
+int cls_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& size)
+{
+ bufferlist in, out;
+ int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
+ if (r < 0)
+ return r;
+
+ cls_queue_get_capacity_ret op_ret;
+ auto iter = out.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ size = op_ret.queue_capacity;
+
+ return 0;
+}
+
+void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vector<bufferlist> bl_data_vec)
+{
+ bufferlist in;
+ cls_queue_enqueue_op call;
+ call.bl_data_vec = std::move(bl_data_vec);
+ encode(call, in);
+ op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in);
+}
+
+int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
+ vector<cls_queue_entry>& entries,
+ bool *truncated, string& next_marker)
+{
+ bufferlist in, out;
+ cls_queue_list_op op;
+ op.start_marker = marker;
+ op.max = max;
+ encode(op, in);
+
+ int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
+ if (r < 0)
+ return r;
+
+ cls_queue_list_ret ret;
+ auto iter = out.cbegin();
+ try {
+ decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ entries = std::move(ret.entries);
+ *truncated = ret.is_truncated;
+
+ next_marker = std::move(ret.next_marker);
+
+ return 0;
+}
+
+void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker)
+{
+ bufferlist in, out;
+ cls_queue_remove_op rem_op;
+ rem_op.end_marker = end_marker;
+ encode(rem_op, in);
+ op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in);
+}
diff --git a/src/cls/queue/cls_queue_client.h b/src/cls/queue/cls_queue_client.h
new file mode 100644
index 000000000..59d32bfa1
--- /dev/null
+++ b/src/cls/queue/cls_queue_client.h
@@ -0,0 +1,16 @@
+#ifndef CEPH_CLS_QUEUE_CLIENT_H
+#define CEPH_CLS_QUEUE_CLIENT_H
+
+#include "include/rados/librados.hpp"
+#include "cls/queue/cls_queue_types.h"
+#include "cls_queue_ops.h"
+#include "common/ceph_time.h"
+
+void cls_queue_init(librados::ObjectWriteOperation& op, const string& queue_name, uint64_t size);
+int cls_queue_get_capacity(librados::IoCtx& io_ctx, const string& oid, uint64_t& size);
+void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, vector<bufferlist> bl_data_vec);
+int cls_queue_list_entries(librados::IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
+ vector<cls_queue_entry>& entries, bool *truncated, string& next_marker);
+void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const string& end_marker);
+
+#endif \ No newline at end of file
diff --git a/src/cls/queue/cls_queue_const.h b/src/cls/queue/cls_queue_const.h
new file mode 100644
index 000000000..3f289abb0
--- /dev/null
+++ b/src/cls/queue/cls_queue_const.h
@@ -0,0 +1,12 @@
+#ifndef CEPH_CLS_QUEUE_CONSTS_H
+#define CEPH_CLS_QUEUE_CONSTS_H
+
+#define QUEUE_CLASS "queue"
+
+#define QUEUE_INIT "queue_init"
+#define QUEUE_GET_CAPACITY "queue_get_capacity"
+#define QUEUE_ENQUEUE "queue_enqueue"
+#define QUEUE_LIST_ENTRIES "queue_list_entries"
+#define QUEUE_REMOVE_ENTRIES "queue_remove_entries"
+
+#endif \ No newline at end of file
diff --git a/src/cls/queue/cls_queue_ops.h b/src/cls/queue/cls_queue_ops.h
new file mode 100644
index 000000000..64891cffb
--- /dev/null
+++ b/src/cls/queue/cls_queue_ops.h
@@ -0,0 +1,139 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_QUEUE_OPS_H
+#define CEPH_CLS_QUEUE_OPS_H
+
+#include "cls/queue/cls_queue_types.h"
+
+struct cls_queue_init_op {
+ uint64_t queue_size{0};
+ uint64_t max_urgent_data_size{0};
+ ceph::buffer::list bl_urgent_data;
+
+ cls_queue_init_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(queue_size, bl);
+ encode(max_urgent_data_size, bl);
+ encode(bl_urgent_data, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(queue_size, bl);
+ decode(max_urgent_data_size, bl);
+ decode(bl_urgent_data, bl);
+ DECODE_FINISH(bl);
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_queue_init_op)
+
+struct cls_queue_enqueue_op {
+ std::vector<ceph::buffer::list> bl_data_vec;
+
+ cls_queue_enqueue_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(bl_data_vec, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(bl_data_vec, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_enqueue_op)
+
+struct cls_queue_list_op {
+ uint64_t max;
+ std::string start_marker;
+
+ cls_queue_list_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(max, bl);
+ encode(start_marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(max, bl);
+ decode(start_marker, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_list_op)
+
+struct cls_queue_list_ret {
+ bool is_truncated;
+ std::string next_marker;
+ std::vector<cls_queue_entry> entries;
+
+ cls_queue_list_ret() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(is_truncated, bl);
+ encode(next_marker, bl);
+ encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(is_truncated, bl);
+ decode(next_marker, bl);
+ decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_list_ret)
+
+struct cls_queue_remove_op {
+ std::string end_marker;
+
+ cls_queue_remove_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(end_marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(end_marker, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_remove_op)
+
+struct cls_queue_get_capacity_ret {
+ uint64_t queue_capacity;
+
+ cls_queue_get_capacity_ret() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(queue_capacity, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(queue_capacity, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret)
+
+#endif /* CEPH_CLS_QUEUE_OPS_H */
diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc
new file mode 100644
index 000000000..8806b5804
--- /dev/null
+++ b/src/cls/queue/cls_queue_src.cc
@@ -0,0 +1,520 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "include/types.h"
+
+#include "objclass/objclass.h"
+#include "cls/queue/cls_queue_types.h"
+#include "cls/queue/cls_queue_ops.h"
+#include "cls/queue/cls_queue_const.h"
+#include "cls/queue/cls_queue_src.h"
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+
+int queue_write_head(cls_method_context_t hctx, cls_queue_head& head)
+{
+ bufferlist bl;
+ uint16_t entry_start = QUEUE_HEAD_START;
+ encode(entry_start, bl);
+
+ bufferlist bl_head;
+ encode(head, bl_head);
+
+ uint64_t encoded_len = bl_head.length();
+ encode(encoded_len, bl);
+
+ bl.claim_append(bl_head);
+
+ if (bl.length() > head.max_head_size) {
+ CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length());
+ return -EINVAL;
+ }
+
+ int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: queue_write_head: failed to write head");
+ return ret;
+ }
+ return 0;
+}
+
+int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
+{
+ uint64_t chunk_size = 1024, start_offset = 0;
+
+ bufferlist bl_head;
+ const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: queue_read_head: failed to read head");
+ return ret;
+ }
+ if (ret == 0) {
+ CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet");
+ return -EINVAL;
+ }
+
+ //Process the chunk of data read
+ auto it = bl_head.cbegin();
+ // Queue head start
+ uint16_t queue_head_start;
+ try {
+ decode(queue_head_start, it);
+ } catch (const ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what());
+ return -EINVAL;
+ }
+ if (queue_head_start != QUEUE_HEAD_START) {
+ CLS_LOG(0, "ERROR: queue_read_head: invalid queue start");
+ return -EINVAL;
+ }
+
+ uint64_t encoded_len;
+ try {
+ decode(encoded_len, it);
+ } catch (const ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what());
+ return -EINVAL;
+ }
+
+ if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) {
+ start_offset = chunk_size;
+ chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD));
+ bufferlist bl_remaining_head;
+ const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (ret < 0) {
+ CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head");
+ return ret;
+ }
+ bl_head.claim_append(bl_remaining_head);
+ }
+
+ try {
+ decode(head, it);
+ } catch (const ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what());
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op)
+{
+ //get head and its size
+ cls_queue_head head;
+ int ret = queue_read_head(hctx, head);
+
+ //head is already initialized
+ if (ret == 0) {
+ return -EEXIST;
+ }
+
+ if (ret < 0 && ret != -EINVAL) {
+ return ret;
+ }
+
+ if (op.bl_urgent_data.length() > 0) {
+ head.bl_urgent_data = op.bl_urgent_data;
+ }
+
+ head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size;
+ head.queue_size = op.queue_size + head.max_head_size;
+ head.max_urgent_data_size = op.max_urgent_data_size;
+ head.tail.gen = head.front.gen = 0;
+ head.tail.offset = head.front.offset = head.max_head_size;
+
+ CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
+ CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
+ CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
+ CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
+
+ return queue_write_head(hctx, head);
+}
+
+int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret)
+{
+ //get head
+ cls_queue_head head;
+ int ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+
+ op_ret.queue_capacity = head.queue_size - head.max_head_size;
+
+ CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity);
+
+ return 0;
+}
+
+
+/*
+enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in
+one of two states:
+
+(1) split free space
++-------------+--------------------------------------------------------------------+
+| object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX |
+| | ^ ^ |
+| front tail | | | |
++---+------+--+----------------|-------------------------|-------------------------+
+ | | | |
+ | +-------------------|-------------------------+
+ +--------------------------+
+
+(2) continuous free space
++-------------+--------------------------------------------------------------------+
+| object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX|
+| | ^ ^ |
+| front tail | | | |
++---+------+--+----------------|-------------------------|-------------------------+
+ | | | |
+ | +-------------------+ |
+ +----------------------------------------------------+
+*/
+
+int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
+{
+ if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
+ CLS_LOG(0, "ERROR: No space left in queue");
+ return -ENOSPC;
+ }
+
+ for (auto& bl_data : op.bl_data_vec) {
+ bufferlist bl;
+ uint16_t entry_start = QUEUE_ENTRY_START;
+ encode(entry_start, bl);
+ uint64_t data_size = bl_data.length();
+ encode(data_size, bl);
+ bl.claim_append(bl_data);
+
+ CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size);
+
+ if (head.tail.offset >= head.front.offset) {
+ // check if data can fit in the remaining space in queue
+ if ((head.tail.offset + bl.length()) <= head.queue_size) {
+ CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
+ //write data size and data at tail offset
+ auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (ret < 0) {
+ return ret;
+ }
+ head.tail.offset += bl.length();
+ } else {
+ uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
+ //Split data if there is free space available
+ if (bl.length() <= free_space_available) {
+ uint64_t size_before_wrap = head.queue_size - head.tail.offset;
+ bufferlist bl_data_before_wrap;
+ bl.splice(0, size_before_wrap, &bl_data_before_wrap);
+ //write spliced (data size and data) at tail offset
+ CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length());
+ auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (ret < 0) {
+ return ret;
+ }
+ head.tail.offset = head.max_head_size;
+ head.tail.gen += 1;
+ //write remaining data at tail offset after wrapping around
+ CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length());
+ ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (ret < 0) {
+ return ret;
+ }
+ head.tail.offset += bl.length();
+ } else {
+ CLS_LOG(0, "ERROR: No space left in queue\n");
+ // return queue full error
+ return -ENOSPC;
+ }
+ }
+ } else if (head.front.offset > head.tail.offset) {
+ if ((head.tail.offset + bl.length()) <= head.front.offset) {
+ CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
+ //write data size and data at tail offset
+ auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
+ if (ret < 0) {
+ return ret;
+ }
+ head.tail.offset += bl.length();
+ } else {
+ CLS_LOG(0, "ERROR: No space left in queue");
+ // return queue full error
+ return -ENOSPC;
+ }
+ }
+
+ if (head.tail.offset == head.queue_size) {
+ head.tail.offset = head.max_head_size;
+ head.tail.gen += 1;
+ }
+ CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str());
+ } //end - for
+
+ return 0;
+}
+
+int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
+{
+ // If queue is empty, return from here
+ if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
+ CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str());
+ op_ret.next_marker = head.front.to_str();
+ op_ret.is_truncated = false;
+ return 0;
+ }
+
+ cls_queue_marker start_marker;
+ start_marker.from_str(op.start_marker.c_str());
+ cls_queue_marker next_marker = {0, 0};
+
+ uint64_t start_offset = 0, gen = 0;
+ if (start_marker.offset == 0) {
+ start_offset = head.front.offset;
+ gen = head.front.gen;
+ } else {
+ start_offset = start_marker.offset;
+ gen = start_marker.gen;
+ }
+
+ op_ret.is_truncated = true;
+ uint64_t chunk_size = 1024;
+ uint64_t contiguous_data_size = 0, size_to_read = 0;
+ bool wrap_around = false;
+
+ //Calculate length of contiguous data to be read depending on front, tail and start offset
+ if (head.tail.offset > head.front.offset) {
+ contiguous_data_size = head.tail.offset - start_offset;
+ } else if (head.front.offset >= head.tail.offset) {
+ if (start_offset >= head.front.offset) {
+ contiguous_data_size = head.queue_size - start_offset;
+ wrap_around = true;
+ } else if (start_offset <= head.tail.offset) {
+ contiguous_data_size = head.tail.offset - start_offset;
+ }
+ }
+
+ CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());
+
+ bool offset_populated = false, entry_start_processed = false;
+ uint64_t data_size = 0, num_ops = 0;
+ uint16_t entry_start = 0;
+ bufferlist bl;
+ string last_marker;
+ do
+ {
+ CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset);
+
+ bufferlist bl_chunk;
+ //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
+ if (contiguous_data_size > chunk_size) {
+ size_to_read = chunk_size;
+ } else {
+ size_to_read = contiguous_data_size;
+ }
+ CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read);
+ if (size_to_read == 0) {
+ next_marker = head.tail;
+ op_ret.is_truncated = false;
+ CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
+ break;
+ }
+
+ auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
+ if (ret < 0) {
+ return ret;
+ }
+
+ //If there is leftover data from previous iteration, append new data to leftover data
+ uint64_t entry_start_offset = start_offset - bl.length();
+ CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset);
+ bl.claim_append(bl_chunk);
+ bl_chunk = std::move(bl);
+
+ CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length());
+
+ //Process the chunk of data read
+ unsigned index = 0;
+ auto it = bl_chunk.cbegin();
+ uint64_t size_to_process = bl_chunk.length();
+ do {
+ CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process);
+ cls_queue_entry entry;
+ ceph_assert(it.get_off() == index);
+ //Use the last marker saved in previous iteration as the marker for this entry
+ if (offset_populated) {
+ entry.marker = last_marker;
+ }
+ //Populate offset if not done in previous iteration
+ if (! offset_populated) {
+ cls_queue_marker marker = {entry_start_offset + index, gen};
+ CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
+ entry.marker = marker.to_str();
+ }
+ // Magic number + Data size - process if not done in previous iteration
+ if (! entry_start_processed ) {
+ if (size_to_process >= QUEUE_ENTRY_OVERHEAD) {
+ // Decode magic number at start
+ try {
+ decode(entry_start, it);
+ } catch (const ceph::buffer::error& err) {
+ CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what());
+ return -EINVAL;
+ }
+ if (entry_start != QUEUE_ENTRY_START) {
+ CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start);
+ return -EINVAL;
+ }
+ index += sizeof(uint16_t);
+ size_to_process -= sizeof(uint16_t);
+ // Decode data size
+ try {
+ decode(data_size, it);
+ } catch (const ceph::buffer::error& err) {
+ CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what());
+ return -EINVAL;
+ }
+ } else {
+ // Copy unprocessed data to bl
+ bl_chunk.splice(index, size_to_process, &bl);
+ offset_populated = true;
+ last_marker = entry.marker;
+ CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!");
+ break;
+ }
+ CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size);
+ index += sizeof(uint64_t);
+ size_to_process -= sizeof(uint64_t);
+ }
+ // Data
+ if (data_size <= size_to_process) {
+ it.copy(data_size, entry.data);
+ index += entry.data.length();
+ size_to_process -= entry.data.length();
+ } else {
+ it.copy(size_to_process, bl);
+ offset_populated = true;
+ entry_start_processed = true;
+ last_marker = entry.marker;
+ CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
+ break;
+ }
+ op_ret.entries.emplace_back(entry);
+ // Resetting some values
+ offset_populated = false;
+ entry_start_processed = false;
+ data_size = 0;
+ entry_start = 0;
+ num_ops++;
+ last_marker.clear();
+ if (num_ops == op.max) {
+ CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!");
+ break;
+ }
+ } while(index < bl_chunk.length());
+
+ CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
+
+ if (num_ops == op.max) {
+ next_marker = cls_queue_marker{(entry_start_offset + index), gen};
+ CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
+ break;
+ }
+
+ //Calculate new start_offset and contiguous data size
+ start_offset += size_to_read;
+ contiguous_data_size -= size_to_read;
+ if (contiguous_data_size == 0) {
+ if (wrap_around) {
+ start_offset = head.max_head_size;
+ contiguous_data_size = head.tail.offset - head.max_head_size;
+ gen += 1;
+ wrap_around = false;
+ } else {
+ CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!");
+ next_marker = head.tail;
+ op_ret.is_truncated = false;
+ break;
+ }
+ }
+
+ } while(num_ops < op.max);
+
+ //Wrap around next offset if it has reached end of queue
+ if (next_marker.offset == head.queue_size) {
+ next_marker.offset = head.max_head_size;
+ next_marker.gen += 1;
+ }
+ if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
+ op_ret.is_truncated = false;
+ }
+
+ CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str());
+ op_ret.next_marker = next_marker.to_str();
+
+ return 0;
+}
+
+int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
+{
+ //Queue is empty
+ if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
+ return 0;
+ }
+
+ cls_queue_marker end_marker;
+ end_marker.from_str(op.end_marker.c_str());
+
+ CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str());
+
+ //Zero out the entries that have been removed, to reclaim storage space
+ if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
+ uint64_t len = end_marker.offset - head.front.offset;
+ if (len > 0) {
+ auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
+ if (ret < 0) {
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
+ return ret;
+ }
+ }
+ } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
+ uint64_t len = head.queue_size - head.front.offset;
+ if (len > 0) {
+ auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
+ if (ret < 0) {
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
+ return ret;
+ }
+ }
+ len = end_marker.offset - head.max_head_size;
+ if (len > 0) {
+ auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
+ if (ret < 0) {
+ CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
+ CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size);
+ return ret;
+ }
+ }
+ } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
+ //no-op
+ } else {
+ CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen);
+ return -EINVAL;
+ }
+
+ head.front = end_marker;
+
+ // Check if it is the end, then wrap around
+ if (head.front.offset == head.queue_size) {
+ head.front.offset = head.max_head_size;
+ head.front.gen += 1;
+ }
+
+ CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());
+
+ return 0;
+}
diff --git a/src/cls/queue/cls_queue_src.h b/src/cls/queue/cls_queue_src.h
new file mode 100644
index 000000000..9970b98ea
--- /dev/null
+++ b/src/cls/queue/cls_queue_src.h
@@ -0,0 +1,16 @@
+#ifndef CEPH_CLS_QUEUE_SRC_H
+#define CEPH_CLS_QUEUE_SRC_H
+
+#include "objclass/objclass.h"
+#include "cls/queue/cls_queue_types.h"
+#include "cls/queue/cls_queue_ops.h"
+
+int queue_write_head(cls_method_context_t hctx, cls_queue_head& head);
+int queue_read_head(cls_method_context_t hctx, cls_queue_head& head);
+int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op);
+int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret);
+int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head);
+int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head);
+int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head);
+
+#endif /* CEPH_CLS_QUEUE_SRC_H */
diff --git a/src/cls/queue/cls_queue_types.h b/src/cls/queue/cls_queue_types.h
new file mode 100644
index 000000000..cc46df405
--- /dev/null
+++ b/src/cls/queue/cls_queue_types.h
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_QUEUE_TYPES_H
+#define CEPH_CLS_QUEUE_TYPES_H
+
+#include <errno.h>
+#include "include/types.h"
+
+//Size of head leaving out urgent data
+#define QUEUE_HEAD_SIZE_1K 1024
+
+#define QUEUE_START_OFFSET_1K QUEUE_HEAD_SIZE_1K
+
+constexpr unsigned int QUEUE_HEAD_START = 0xDEAD;
+constexpr unsigned int QUEUE_ENTRY_START = 0xBEEF;
+constexpr unsigned int QUEUE_ENTRY_OVERHEAD = sizeof(uint16_t) + sizeof(uint64_t);
+
+struct cls_queue_entry
+{
+ ceph::buffer::list data;
+ std::string marker;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(data, bl);
+ encode(marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(data, bl);
+ decode(marker, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_entry)
+
+struct cls_queue_marker
+{
+ uint64_t offset{0};
+ uint64_t gen{0};
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(gen, bl);
+ encode(offset, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(gen, bl);
+ decode(offset, bl);
+ DECODE_FINISH(bl);
+ }
+
+ std::string to_str() {
+ return std::to_string(gen) + '/' + std::to_string(offset);
+ }
+
+ int from_str(const char* str) {
+ errno = 0;
+ char* end = nullptr;
+ gen = ::strtoull(str, &end, 10);
+ if (errno) {
+ return errno;
+ }
+ if (str == end || *end != '/') { // expects delimiter
+ return -EINVAL;
+ }
+ str = end + 1;
+ offset = ::strtoull(str, &end, 10);
+ if (errno) {
+ return errno;
+ }
+ if (str == end || *end != 0) { // expects null terminator
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+};
+WRITE_CLASS_ENCODER(cls_queue_marker)
+
+struct cls_queue_head
+{
+ uint64_t max_head_size = QUEUE_HEAD_SIZE_1K;
+ cls_queue_marker front{QUEUE_START_OFFSET_1K, 0};
+ cls_queue_marker tail{QUEUE_START_OFFSET_1K, 0};
+ uint64_t queue_size{0}; // size of queue requested by user, with head size added to it
+ uint64_t max_urgent_data_size{0};
+ ceph::buffer::list bl_urgent_data; // special data known to application using queue
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(max_head_size, bl);
+ encode(front, bl);
+ encode(tail, bl);
+ encode(queue_size, bl);
+ encode(max_urgent_data_size, bl);
+ encode(bl_urgent_data, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(max_head_size, bl);
+ decode(front, bl);
+ decode(tail, bl);
+ decode(queue_size, bl);
+ decode(max_urgent_data_size, bl);
+ decode(bl_urgent_data, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_head)
+
+#endif