diff options
Diffstat (limited to 'src/cls/queue/cls_queue_src.cc')
-rw-r--r-- | src/cls/queue/cls_queue_src.cc | 520 |
1 files changed, 520 insertions, 0 deletions
diff --git a/src/cls/queue/cls_queue_src.cc b/src/cls/queue/cls_queue_src.cc new file mode 100644 index 000000000..8806b5804 --- /dev/null +++ b/src/cls/queue/cls_queue_src.cc @@ -0,0 +1,520 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" + +#include "objclass/objclass.h" +#include "cls/queue/cls_queue_types.h" +#include "cls/queue/cls_queue_ops.h" +#include "cls/queue/cls_queue_const.h" +#include "cls/queue/cls_queue_src.h" + +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; + +int queue_write_head(cls_method_context_t hctx, cls_queue_head& head) +{ + bufferlist bl; + uint16_t entry_start = QUEUE_HEAD_START; + encode(entry_start, bl); + + bufferlist bl_head; + encode(head, bl_head); + + uint64_t encoded_len = bl_head.length(); + encode(encoded_len, bl); + + bl.claim_append(bl_head); + + if (bl.length() > head.max_head_size) { + CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length()); + return -EINVAL; + } + + int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); + if (ret < 0) { + CLS_LOG(5, "ERROR: queue_write_head: failed to write head"); + return ret; + } + return 0; +} + +int queue_read_head(cls_method_context_t hctx, cls_queue_head& head) +{ + uint64_t chunk_size = 1024, start_offset = 0; + + bufferlist bl_head; + const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head); + if (ret < 0) { + CLS_LOG(5, "ERROR: queue_read_head: failed to read head"); + return ret; + } + if (ret == 0) { + CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet"); + return -EINVAL; + } + + //Process the chunk of data read + auto it = bl_head.cbegin(); + // Queue head start + uint16_t queue_head_start; + try { + decode(queue_head_start, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what()); + return -EINVAL; + } + if (queue_head_start != QUEUE_HEAD_START) { + CLS_LOG(0, "ERROR: queue_read_head: invalid queue start"); + return -EINVAL; + } + + uint64_t encoded_len; + try { + decode(encoded_len, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what()); + return -EINVAL; + } + + if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) { + start_offset = chunk_size; + chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD)); + bufferlist bl_remaining_head; + const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head"); + return ret; + } + bl_head.claim_append(bl_remaining_head); + } + + try { + decode(head, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what()); + return -EINVAL; + } + + return 0; +} + +int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op) +{ + //get head and its size + cls_queue_head head; + int ret = queue_read_head(hctx, head); + + //head is already initialized + if (ret == 0) { + return -EEXIST; + } + + if (ret < 0 && ret != -EINVAL) { + return ret; + } + + if (op.bl_urgent_data.length() > 0) { + head.bl_urgent_data = op.bl_urgent_data; + } + + head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size; + head.queue_size = op.queue_size + head.max_head_size; + head.max_urgent_data_size = op.max_urgent_data_size; + head.tail.gen = head.front.gen = 0; + head.tail.offset = head.front.offset = head.max_head_size; + + CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size); + CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size); + CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str()); + CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size); + + return queue_write_head(hctx, head); +} + +int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret) +{ + //get head + cls_queue_head head; + int ret = queue_read_head(hctx, head); + if (ret < 0) { + return ret; + } + + op_ret.queue_capacity = head.queue_size - head.max_head_size; + + CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity); + + return 0; +} + + +/* +enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in +one of two states: + +(1) split free space ++-------------+--------------------------------------------------------------------+ +| object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX | +| | ^ ^ | +| front tail | | | | ++---+------+--+----------------|-------------------------|-------------------------+ + | | | | + | +-------------------|-------------------------+ + +--------------------------+ + +(2) continuous free space ++-------------+--------------------------------------------------------------------+ +| object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX| +| | ^ ^ | +| front tail | | | | ++---+------+--+----------------|-------------------------|-------------------------+ + | | | | + | +-------------------+ | + +----------------------------------------------------+ +*/ + +int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head) +{ + if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) { + CLS_LOG(0, "ERROR: No space left in queue"); + return -ENOSPC; + } + + for (auto& bl_data : op.bl_data_vec) { + bufferlist bl; + uint16_t entry_start = QUEUE_ENTRY_START; + encode(entry_start, bl); + uint64_t data_size = bl_data.length(); + encode(data_size, bl); + bl.claim_append(bl_data); + + CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size); + + if (head.tail.offset >= head.front.offset) { + // check if data can fit in the remaining space in queue + if ((head.tail.offset + bl.length()) <= head.queue_size) { + CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length()); + //write data size and data at tail offset + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset += bl.length(); + } else { + uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size); + //Split data if there is free space available + if (bl.length() <= free_space_available) { + uint64_t size_before_wrap = head.queue_size - head.tail.offset; + bufferlist bl_data_before_wrap; + bl.splice(0, size_before_wrap, &bl_data_before_wrap); + //write spliced (data size and data) at tail offset + CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length()); + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset = head.max_head_size; + head.tail.gen += 1; + //write remaining data at tail offset after wrapping around + CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length()); + ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset += bl.length(); + } else { + CLS_LOG(0, "ERROR: No space left in queue\n"); + // return queue full error + return -ENOSPC; + } + } + } else if (head.front.offset > head.tail.offset) { + if ((head.tail.offset + bl.length()) <= head.front.offset) { + CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length()); + //write data size and data at tail offset + auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); + if (ret < 0) { + return ret; + } + head.tail.offset += bl.length(); + } else { + CLS_LOG(0, "ERROR: No space left in queue"); + // return queue full error + return -ENOSPC; + } + } + + if (head.tail.offset == head.queue_size) { + head.tail.offset = head.max_head_size; + head.tail.gen += 1; + } + CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str()); + } //end - for + + return 0; +} + +int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head) +{ + // If queue is empty, return from here + if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { + CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str()); + op_ret.next_marker = head.front.to_str(); + op_ret.is_truncated = false; + return 0; + } + + cls_queue_marker start_marker; + start_marker.from_str(op.start_marker.c_str()); + cls_queue_marker next_marker = {0, 0}; + + uint64_t start_offset = 0, gen = 0; + if (start_marker.offset == 0) { + start_offset = head.front.offset; + gen = head.front.gen; + } else { + start_offset = start_marker.offset; + gen = start_marker.gen; + } + + op_ret.is_truncated = true; + uint64_t chunk_size = 1024; + uint64_t contiguous_data_size = 0, size_to_read = 0; + bool wrap_around = false; + + //Calculate length of contiguous data to be read depending on front, tail and start offset + if (head.tail.offset > head.front.offset) { + contiguous_data_size = head.tail.offset - start_offset; + } else if (head.front.offset >= head.tail.offset) { + if (start_offset >= head.front.offset) { + contiguous_data_size = head.queue_size - start_offset; + wrap_around = true; + } else if (start_offset <= head.tail.offset) { + contiguous_data_size = head.tail.offset - start_offset; + } + } + + CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str()); + + bool offset_populated = false, entry_start_processed = false; + uint64_t data_size = 0, num_ops = 0; + uint16_t entry_start = 0; + bufferlist bl; + string last_marker; + do + { + CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset); + + bufferlist bl_chunk; + //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size + if (contiguous_data_size > chunk_size) { + size_to_read = chunk_size; + } else { + size_to_read = contiguous_data_size; + } + CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read); + if (size_to_read == 0) { + next_marker = head.tail; + op_ret.is_truncated = false; + CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n"); + break; + } + + auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); + if (ret < 0) { + return ret; + } + + //If there is leftover data from previous iteration, append new data to leftover data + uint64_t entry_start_offset = start_offset - bl.length(); + CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset); + bl.claim_append(bl_chunk); + bl_chunk = std::move(bl); + + CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length()); + + //Process the chunk of data read + unsigned index = 0; + auto it = bl_chunk.cbegin(); + uint64_t size_to_process = bl_chunk.length(); + do { + CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process); + cls_queue_entry entry; + ceph_assert(it.get_off() == index); + //Use the last marker saved in previous iteration as the marker for this entry + if (offset_populated) { + entry.marker = last_marker; + } + //Populate offset if not done in previous iteration + if (! offset_populated) { + cls_queue_marker marker = {entry_start_offset + index, gen}; + CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str()); + entry.marker = marker.to_str(); + } + // Magic number + Data size - process if not done in previous iteration + if (! entry_start_processed ) { + if (size_to_process >= QUEUE_ENTRY_OVERHEAD) { + // Decode magic number at start + try { + decode(entry_start, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what()); + return -EINVAL; + } + if (entry_start != QUEUE_ENTRY_START) { + CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start); + return -EINVAL; + } + index += sizeof(uint16_t); + size_to_process -= sizeof(uint16_t); + // Decode data size + try { + decode(data_size, it); + } catch (const ceph::buffer::error& err) { + CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what()); + return -EINVAL; + } + } else { + // Copy unprocessed data to bl + bl_chunk.splice(index, size_to_process, &bl); + offset_populated = true; + last_marker = entry.marker; + CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!"); + break; + } + CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size); + index += sizeof(uint64_t); + size_to_process -= sizeof(uint64_t); + } + // Data + if (data_size <= size_to_process) { + it.copy(data_size, entry.data); + index += entry.data.length(); + size_to_process -= entry.data.length(); + } else { + it.copy(size_to_process, bl); + offset_populated = true; + entry_start_processed = true; + last_marker = entry.marker; + CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!"); + break; + } + op_ret.entries.emplace_back(entry); + // Resetting some values + offset_populated = false; + entry_start_processed = false; + data_size = 0; + entry_start = 0; + num_ops++; + last_marker.clear(); + if (num_ops == op.max) { + CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!"); + break; + } + } while(index < bl_chunk.length()); + + CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); + + if (num_ops == op.max) { + next_marker = cls_queue_marker{(entry_start_offset + index), gen}; + CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset); + break; + } + + //Calculate new start_offset and contiguous data size + start_offset += size_to_read; + contiguous_data_size -= size_to_read; + if (contiguous_data_size == 0) { + if (wrap_around) { + start_offset = head.max_head_size; + contiguous_data_size = head.tail.offset - head.max_head_size; + gen += 1; + wrap_around = false; + } else { + CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!"); + next_marker = head.tail; + op_ret.is_truncated = false; + break; + } + } + + } while(num_ops < op.max); + + //Wrap around next offset if it has reached end of queue + if (next_marker.offset == head.queue_size) { + next_marker.offset = head.max_head_size; + next_marker.gen += 1; + } + if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) { + op_ret.is_truncated = false; + } + + CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str()); + op_ret.next_marker = next_marker.to_str(); + + return 0; +} + +int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head) +{ + //Queue is empty + if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { + return 0; + } + + cls_queue_marker end_marker; + end_marker.from_str(op.end_marker.c_str()); + + CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str()); + + //Zero out the entries that have been removed, to reclaim storage space + if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) { + uint64_t len = end_marker.offset - head.front.offset; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); + if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str()); + return ret; + } + } + } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset + uint64_t len = head.queue_size - head.front.offset; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); + if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str()); + return ret; + } + } + len = end_marker.offset - head.max_head_size; + if (len > 0) { + auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len); + if (ret < 0) { + CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); + CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size); + return ret; + } + } + } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) { + //no-op + } else { + CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen); + return -EINVAL; + } + + head.front = end_marker; + + // Check if it is the end, then wrap around + if (head.front.offset == head.queue_size) { + head.front.offset = head.max_head_size; + head.front.gen += 1; + } + + CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str()); + + return 0; +} |