summaryrefslogtreecommitdiffstats
path: root/src/cls/queue/cls_queue_src.cc
blob: 8806b5804971ae7476b31953410a5c6d9f7b30dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "include/types.h"

#include "objclass/objclass.h"
#include "cls/queue/cls_queue_types.h"
#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
#include "cls/queue/cls_queue_src.h"

using ceph::bufferlist;
using ceph::decode;
using ceph::encode;

int queue_write_head(cls_method_context_t hctx, cls_queue_head& head)
{
  bufferlist bl;
  uint16_t entry_start = QUEUE_HEAD_START;
  encode(entry_start, bl);

  bufferlist bl_head;
  encode(head, bl_head);

  uint64_t encoded_len = bl_head.length();
  encode(encoded_len, bl);

  bl.claim_append(bl_head);

  if (bl.length() > head.max_head_size) {
    CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length());
    return -EINVAL;
  }

  int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
  if (ret < 0) {
    CLS_LOG(5, "ERROR: queue_write_head: failed to write head");
    return ret;
  }
  return 0;
}

int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
{
  uint64_t chunk_size = 1024, start_offset = 0;

  bufferlist bl_head;
  const auto  ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
  if (ret < 0) {
    CLS_LOG(5, "ERROR: queue_read_head: failed to read head");
    return ret;
  }
  if (ret == 0) {
    CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet");
    return -EINVAL;
  }

  //Process the chunk of data read
  auto it = bl_head.cbegin();
  // Queue head start
  uint16_t queue_head_start;
  try {
    decode(queue_head_start, it);
  } catch (const ceph::buffer::error& err) {
    CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what());
    return -EINVAL;
  }
  if (queue_head_start != QUEUE_HEAD_START) {
    CLS_LOG(0, "ERROR: queue_read_head: invalid queue start");
    return -EINVAL;
  }

  uint64_t encoded_len;
  try {
    decode(encoded_len, it);
  } catch (const ceph::buffer::error& err) {
    CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what());
    return -EINVAL;
  }

  if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) {
    start_offset = chunk_size;
    chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD));
    bufferlist bl_remaining_head;
    const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
    if (ret < 0) {
      CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head");
      return ret;
    }
    bl_head.claim_append(bl_remaining_head);
  }

  try {
    decode(head, it);
  } catch (const ceph::buffer::error& err) {
    CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what());
    return -EINVAL;
  }

  return 0;
}

int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op)
{
  //get head and its size
  cls_queue_head head;
  int ret = queue_read_head(hctx, head);

  //head is already initialized
  if (ret == 0) {
    return -EEXIST;
  }

  if (ret < 0 && ret != -EINVAL) {
    return ret;
  }

  if (op.bl_urgent_data.length() > 0) {
    head.bl_urgent_data = op.bl_urgent_data;
  }

  head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size;
  head.queue_size = op.queue_size + head.max_head_size;
  head.max_urgent_data_size = op.max_urgent_data_size;
  head.tail.gen = head.front.gen = 0;
  head.tail.offset = head.front.offset = head.max_head_size;
  
  CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
  CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
  CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
  CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);

  return queue_write_head(hctx, head);
}

int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret)
{
  //get head
  cls_queue_head head;
  int ret = queue_read_head(hctx, head);
  if (ret < 0) {
    return ret;
  }

  op_ret.queue_capacity = head.queue_size - head.max_head_size;

  CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity);

  return 0;
}


/*
enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in
one of two states:

(1) split free space
+-------------+--------------------------------------------------------------------+
| object head |                XXXXXXXXXXXXXXXXXXXXXXXXXXX                         |
|             |                ^                         ^                         |
| front  tail |                |                         |                         |
+---+------+--+----------------|-------------------------|-------------------------+
    |      |                   |                         |
    |      +-------------------|-------------------------+
    +--------------------------+

(2) continuous free space
+-------------+--------------------------------------------------------------------+
| object head |XXXXXXXXXXXXXXXXX                         XXXXXXXXXXXXXXXXXXXXXXXXXX|
|             |                ^                         ^                         |
| front  tail |                |                         |                         |
+---+------+--+----------------|-------------------------|-------------------------+
    |      |                   |                         |
    |      +-------------------+                         |
    +----------------------------------------------------+
*/

int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
{
  if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
    CLS_LOG(0, "ERROR: No space left in queue");
    return -ENOSPC;
  }

  for (auto& bl_data : op.bl_data_vec) {
    bufferlist bl;
    uint16_t entry_start = QUEUE_ENTRY_START;
    encode(entry_start, bl);
    uint64_t data_size = bl_data.length();
    encode(data_size, bl);
    bl.claim_append(bl_data);
  
    CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size);

    if (head.tail.offset >= head.front.offset) {
      // check if data can fit in the remaining space in queue
      if ((head.tail.offset + bl.length()) <= head.queue_size) {
        CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
        //write data size and data at tail offset
        auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
        if (ret < 0) {
          return ret;
        }
        head.tail.offset += bl.length();
      } else {
        uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
        //Split data if there is free space available
        if (bl.length() <= free_space_available) {
          uint64_t size_before_wrap = head.queue_size - head.tail.offset;
          bufferlist bl_data_before_wrap;
          bl.splice(0, size_before_wrap, &bl_data_before_wrap);
          //write spliced (data size and data) at tail offset
          CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length());
          auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
          if (ret < 0) {
            return ret;
          }
          head.tail.offset = head.max_head_size;
          head.tail.gen += 1;
          //write remaining data at tail offset after wrapping around
          CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length());
          ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
          if (ret < 0) {
            return ret;
          }
          head.tail.offset += bl.length();
        } else {
          CLS_LOG(0, "ERROR: No space left in queue\n");
          // return queue full error
          return -ENOSPC;
        }
      }
    } else if (head.front.offset > head.tail.offset) {
      if ((head.tail.offset + bl.length()) <= head.front.offset) {
        CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
        //write data size and data at tail offset
        auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
        if (ret < 0) {
          return ret;
        }
        head.tail.offset += bl.length();
      } else {
        CLS_LOG(0, "ERROR: No space left in queue");
        // return queue full error
        return -ENOSPC;
      }
    }

    if (head.tail.offset == head.queue_size) {
      head.tail.offset = head.max_head_size;
      head.tail.gen += 1;
    }
    CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str());
  } //end - for

  return 0;
}

int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
{
  // If queue is empty, return from here
  if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
    CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str());
    op_ret.next_marker = head.front.to_str();
    op_ret.is_truncated = false;
    return 0;
  }

  cls_queue_marker start_marker;
  start_marker.from_str(op.start_marker.c_str());
  cls_queue_marker next_marker = {0, 0};

  uint64_t start_offset = 0, gen = 0;
  if (start_marker.offset == 0) {
    start_offset = head.front.offset;
    gen = head.front.gen;
  } else {
    start_offset = start_marker.offset;
    gen = start_marker.gen;
  }

  op_ret.is_truncated = true;
  uint64_t chunk_size = 1024;
  uint64_t contiguous_data_size = 0, size_to_read = 0;
  bool wrap_around = false;

  //Calculate length of contiguous data to be read depending on front, tail and start offset
  if (head.tail.offset > head.front.offset) {
    contiguous_data_size = head.tail.offset - start_offset;
  } else if (head.front.offset >= head.tail.offset) {
    if (start_offset >= head.front.offset) {
      contiguous_data_size = head.queue_size - start_offset;
      wrap_around = true;
    } else if (start_offset <= head.tail.offset) {
      contiguous_data_size = head.tail.offset - start_offset;
    }
  }

  CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());

  bool offset_populated = false, entry_start_processed = false;
  uint64_t data_size = 0, num_ops = 0;
  uint16_t entry_start = 0;
  bufferlist bl;
  string last_marker;
  do
  {
    CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset);
  
    bufferlist bl_chunk;
    //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
    if (contiguous_data_size > chunk_size) {
      size_to_read = chunk_size;
    } else {
      size_to_read = contiguous_data_size;
    }
    CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read);
    if (size_to_read == 0) {
      next_marker = head.tail;
      op_ret.is_truncated = false;
      CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
      break;
    }

    auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
    if (ret < 0) {
      return ret;
    }

    //If there is leftover data from previous iteration, append new data to leftover data
    uint64_t entry_start_offset = start_offset - bl.length();
    CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset);
    bl.claim_append(bl_chunk);
    bl_chunk = std::move(bl);

    CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length());

    //Process the chunk of data read
    unsigned index = 0;
    auto it = bl_chunk.cbegin();
    uint64_t size_to_process = bl_chunk.length();
    do {
      CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process);
      cls_queue_entry entry;
      ceph_assert(it.get_off() == index);
      //Use the last marker saved in previous iteration as the marker for this entry
      if (offset_populated) {
        entry.marker = last_marker;
      }
      //Populate offset if not done in previous iteration
      if (! offset_populated) {
        cls_queue_marker marker = {entry_start_offset + index, gen};
        CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
        entry.marker = marker.to_str();
      }
      // Magic number + Data size - process if not done in previous iteration
      if (! entry_start_processed ) {
        if (size_to_process >= QUEUE_ENTRY_OVERHEAD) {
          // Decode magic number at start
          try {
            decode(entry_start, it);
          } catch (const ceph::buffer::error& err) {
            CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what());
            return -EINVAL;
          }
          if (entry_start != QUEUE_ENTRY_START) {
            CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start);
            return -EINVAL;
          }
          index += sizeof(uint16_t);
          size_to_process -= sizeof(uint16_t);
          // Decode data size
          try {
            decode(data_size, it);
          } catch (const ceph::buffer::error& err) {
            CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what());
            return -EINVAL;
          }
        } else {
          // Copy unprocessed data to bl
          bl_chunk.splice(index, size_to_process, &bl);
          offset_populated = true;
          last_marker = entry.marker;
          CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!");
          break;
        }
        CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size);
        index += sizeof(uint64_t);
        size_to_process -= sizeof(uint64_t);
      }
      // Data
      if (data_size <= size_to_process) {
        it.copy(data_size, entry.data);
        index += entry.data.length();
        size_to_process -= entry.data.length();
      } else {
        it.copy(size_to_process, bl);
        offset_populated = true;
        entry_start_processed = true;
        last_marker = entry.marker;
        CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
        break;
      }
      op_ret.entries.emplace_back(entry);
      // Resetting some values
      offset_populated = false;
      entry_start_processed = false;
      data_size = 0;
      entry_start = 0;
      num_ops++;
      last_marker.clear();
      if (num_ops == op.max) {
        CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!");
        break;
      }
    } while(index < bl_chunk.length());

    CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);

    if (num_ops == op.max) {
      next_marker = cls_queue_marker{(entry_start_offset + index), gen};
      CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
      break;
    }

    //Calculate new start_offset and contiguous data size
    start_offset += size_to_read;
    contiguous_data_size -= size_to_read;
    if (contiguous_data_size == 0) {
      if (wrap_around) {
        start_offset = head.max_head_size;
        contiguous_data_size = head.tail.offset - head.max_head_size;
        gen += 1;
        wrap_around = false;
      } else {
        CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!");
        next_marker = head.tail;
        op_ret.is_truncated = false;
        break;
      }
    }
    
  } while(num_ops < op.max);

  //Wrap around next offset if it has reached end of queue
  if (next_marker.offset == head.queue_size) {
    next_marker.offset = head.max_head_size;
    next_marker.gen += 1;
  }
  if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
    op_ret.is_truncated = false;
  }

  CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str());
  op_ret.next_marker = next_marker.to_str();

  return 0;
}

int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
{
  //Queue is empty
  if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
    return 0;
  }

  cls_queue_marker end_marker;
  end_marker.from_str(op.end_marker.c_str());

  CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str());

  //Zero out the entries that have been removed, to reclaim storage space
  if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
    uint64_t len = end_marker.offset - head.front.offset;
    if (len > 0) {
      auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
      if (ret < 0) {
        CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
        CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
        return ret;
      }
    }
  } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
    uint64_t len = head.queue_size - head.front.offset;
    if (len > 0) {
      auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
      if (ret < 0) {
        CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
        CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
        return ret;
      }
    }
    len = end_marker.offset - head.max_head_size;
    if (len > 0) {
      auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
      if (ret < 0) {
        CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
        CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size);
        return ret;
      }
    }
  } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
    //no-op
  } else {
    CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen);
    return -EINVAL;
  }

  head.front = end_marker;

  // Check if it is the end, then wrap around
  if (head.front.offset == head.queue_size) {
    head.front.offset = head.max_head_size;
    head.front.gen += 1;
  }

  CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());

  return 0;
}