1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <errno.h>
#include "cls/queue/cls_queue_ops.h"
#include "cls/queue/cls_queue_const.h"
#include "cls/queue/cls_queue_client.h"
using namespace librados;
void cls_queue_init(ObjectWriteOperation& op, const string& queue_name, uint64_t size)
{
bufferlist in;
cls_queue_init_op call;
call.max_urgent_data_size = 0;
call.queue_size = size;
encode(call, in);
op.exec(QUEUE_CLASS, QUEUE_INIT, in);
}
int cls_queue_get_capacity(IoCtx& io_ctx, const string& oid, uint64_t& size)
{
bufferlist in, out;
int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_GET_CAPACITY, in, out);
if (r < 0)
return r;
cls_queue_get_capacity_ret op_ret;
auto iter = out.cbegin();
try {
decode(op_ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
size = op_ret.queue_capacity;
return 0;
}
void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vector<bufferlist> bl_data_vec)
{
bufferlist in;
cls_queue_enqueue_op call;
call.bl_data_vec = std::move(bl_data_vec);
encode(call, in);
op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in);
}
int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
vector<cls_queue_entry>& entries,
bool *truncated, string& next_marker)
{
bufferlist in, out;
cls_queue_list_op op;
op.start_marker = marker;
op.max = max;
encode(op, in);
int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
if (r < 0)
return r;
cls_queue_list_ret ret;
auto iter = out.cbegin();
try {
decode(ret, iter);
} catch (buffer::error& err) {
return -EIO;
}
entries = std::move(ret.entries);
*truncated = ret.is_truncated;
next_marker = std::move(ret.next_marker);
return 0;
}
void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker)
{
bufferlist in, out;
cls_queue_remove_op rem_op;
rem_op.end_marker = end_marker;
encode(rem_op, in);
op.exec(QUEUE_CLASS, QUEUE_REMOVE_ENTRIES, in);
}
|