diff options
Diffstat (limited to 'src/cls/timeindex')
-rw-r--r-- | src/cls/timeindex/cls_timeindex.cc | 266 | ||||
-rw-r--r-- | src/cls/timeindex/cls_timeindex_client.cc | 120 | ||||
-rw-r--r-- | src/cls/timeindex/cls_timeindex_client.h | 98 | ||||
-rw-r--r-- | src/cls/timeindex/cls_timeindex_ops.h | 115 | ||||
-rw-r--r-- | src/cls/timeindex/cls_timeindex_types.cc | 21 | ||||
-rw-r--r-- | src/cls/timeindex/cls_timeindex_types.h | 46 |
6 files changed, 666 insertions, 0 deletions
diff --git a/src/cls/timeindex/cls_timeindex.cc b/src/cls/timeindex/cls_timeindex.cc new file mode 100644 index 000000000..5ad8883d8 --- /dev/null +++ b/src/cls/timeindex/cls_timeindex.cc @@ -0,0 +1,266 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <errno.h> + +#include "objclass/objclass.h" + +#include "cls_timeindex_ops.h" + +#include "include/compat.h" + +using std::map; +using std::string; + +using ceph::bufferlist; + +CLS_VER(1,0) +CLS_NAME(timeindex) + +static const size_t MAX_LIST_ENTRIES = 1000; +static const size_t MAX_TRIM_ENTRIES = 1000; + +static const string TIMEINDEX_PREFIX = "1_"; + +static void get_index_time_prefix(const utime_t& ts, + string& index) +{ + char buf[32]; + + snprintf(buf, sizeof(buf), "%s%010ld.%06ld_", TIMEINDEX_PREFIX.c_str(), + (long)ts.sec(), (long)ts.usec()); + buf[sizeof(buf) - 1] = '\0'; + + index = buf; +} + +static void get_index(cls_method_context_t hctx, + const utime_t& key_ts, + const string& key_ext, + string& index) +{ + get_index_time_prefix(key_ts, index); + index.append(key_ext); +} + +static int parse_index(const string& index, + utime_t& key_ts, + string& key_ext) +{ + int sec, usec; + char keyext[256]; + + int ret = sscanf(index.c_str(), "1_%d.%d_%255s", &sec, &usec, keyext); + + key_ts = utime_t(sec, usec); + key_ext = string(keyext); + return ret; +} + +static int cls_timeindex_add(cls_method_context_t hctx, + bufferlist * const in, + bufferlist * const out) +{ + auto in_iter = in->cbegin(); + + cls_timeindex_add_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_timeindex_add_op(): failed to decode op"); + return -EINVAL; + } + + for (auto iter = op.entries.begin(); + iter != op.entries.end(); + ++iter) { + cls_timeindex_entry& entry = *iter; + + string index; + get_index(hctx, entry.key_ts, entry.key_ext, index); + + CLS_LOG(20, "storing entry at %s", index.c_str()); + + int ret = cls_cxx_map_set_val(hctx, index, &entry.value); + if (ret < 0) { + return ret; + } + } + + return 0; +} + +static int cls_timeindex_list(cls_method_context_t hctx, + bufferlist * const in, + bufferlist * const out) +{ + auto in_iter = in->cbegin(); + + cls_timeindex_list_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_timeindex_list_op(): failed to decode op"); + return -EINVAL; + } + + map<string, bufferlist> keys; + + string from_index; + string to_index; + + if (op.marker.empty()) { + get_index_time_prefix(op.from_time, from_index); + } else { + from_index = op.marker; + } + const bool use_time_boundary = (op.to_time >= op.from_time); + + if (use_time_boundary) { + get_index_time_prefix(op.to_time, to_index); + } + + size_t max_entries = op.max_entries; + if (max_entries > MAX_LIST_ENTRIES) { + max_entries = MAX_LIST_ENTRIES; + } + + cls_timeindex_list_ret ret; + + int rc = cls_cxx_map_get_vals(hctx, from_index, TIMEINDEX_PREFIX, + max_entries, &keys, &ret.truncated); + if (rc < 0) { + return rc; + } + + auto& entries = ret.entries; + auto iter = keys.begin(); + + string marker; + + for (; iter != keys.end(); ++iter) { + const string& index = iter->first; + bufferlist& bl = iter->second; + + if (use_time_boundary && index.compare(0, to_index.size(), to_index) >= 0) { + CLS_LOG(20, "DEBUG: cls_timeindex_list: finishing on to_index=%s", + to_index.c_str()); + ret.truncated = false; + break; + } + + cls_timeindex_entry e; + + if (parse_index(index, e.key_ts, e.key_ext) < 0) { + CLS_LOG(0, "ERROR: cls_timeindex_list: could not parse index=%s", + index.c_str()); + } else { + CLS_LOG(20, "DEBUG: cls_timeindex_list: index=%s, key_ext=%s, bl.len = %d", + index.c_str(), e.key_ext.c_str(), bl.length()); + e.value = bl; + entries.push_back(e); + } + marker = index; + } + + ret.marker = marker; + + encode(ret, *out); + + return 0; +} + + +static int cls_timeindex_trim(cls_method_context_t hctx, + bufferlist * const in, + bufferlist * const out) +{ + auto in_iter = in->cbegin(); + + cls_timeindex_trim_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_timeindex_trim: failed to decode entry"); + return -EINVAL; + } + + map<string, bufferlist> keys; + + string from_index; + string to_index; + + if (op.from_marker.empty()) { + get_index_time_prefix(op.from_time, from_index); + } else { + from_index = op.from_marker; + } + + if (op.to_marker.empty()) { + get_index_time_prefix(op.to_time, to_index); + } else { + to_index = op.to_marker; + } + + bool more; + + int rc = cls_cxx_map_get_vals(hctx, from_index, TIMEINDEX_PREFIX, + MAX_TRIM_ENTRIES, &keys, &more); + if (rc < 0) { + return rc; + } + + auto iter = keys.begin(); + + bool removed = false; + for (; iter != keys.end(); ++iter) { + const string& index = iter->first; + + CLS_LOG(20, "index=%s to_index=%s", index.c_str(), to_index.c_str()); + + if (index.compare(0, to_index.size(), to_index) > 0) { + CLS_LOG(20, "DEBUG: cls_timeindex_trim: finishing on to_index=%s", + to_index.c_str()); + break; + } + + CLS_LOG(20, "removing key: index=%s", index.c_str()); + + int rc = cls_cxx_map_remove_key(hctx, index); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_remove_key failed rc=%d", rc); + return rc; + } + + removed = true; + } + + if (!removed) { + return -ENODATA; + } + + return 0; +} + +CLS_INIT(timeindex) +{ + CLS_LOG(1, "Loaded timeindex class!"); + + cls_handle_t h_class; + cls_method_handle_t h_timeindex_add; + cls_method_handle_t h_timeindex_list; + cls_method_handle_t h_timeindex_trim; + + cls_register("timeindex", &h_class); + + /* timeindex */ + cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR, + cls_timeindex_add, &h_timeindex_add); + cls_register_cxx_method(h_class, "list", CLS_METHOD_RD, + cls_timeindex_list, &h_timeindex_list); + cls_register_cxx_method(h_class, "trim", CLS_METHOD_RD | CLS_METHOD_WR, + cls_timeindex_trim, &h_timeindex_trim); + + return; +} + diff --git a/src/cls/timeindex/cls_timeindex_client.cc b/src/cls/timeindex/cls_timeindex_client.cc new file mode 100644 index 000000000..7a38ff5fa --- /dev/null +++ b/src/cls/timeindex/cls_timeindex_client.cc @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <errno.h> + +#include "cls/timeindex/cls_timeindex_ops.h" +#include "cls/timeindex/cls_timeindex_client.h" +#include "include/compat.h" + +void cls_timeindex_add( + librados::ObjectWriteOperation& op, + std::list<cls_timeindex_entry>& entries) +{ + librados::bufferlist in; + cls_timeindex_add_op call; + call.entries = entries; + + encode(call, in); + op.exec("timeindex", "add", in); +} + +void cls_timeindex_add( + librados::ObjectWriteOperation& op, + cls_timeindex_entry& entry) +{ + librados::bufferlist in; + cls_timeindex_add_op call; + call.entries.push_back(entry); + + encode(call, in); + op.exec("timeindex", "add", in); +} + +void cls_timeindex_add_prepare_entry( + cls_timeindex_entry& entry, + const utime_t& key_timestamp, + const std::string& key_ext, + const librados::bufferlist& bl) +{ + entry.key_ts = key_timestamp; + entry.key_ext = key_ext; + entry.value = bl; +} + +void cls_timeindex_add( + librados::ObjectWriteOperation& op, + const utime_t& key_timestamp, + const std::string& key_ext, + const librados::bufferlist& bl) +{ + cls_timeindex_entry entry; + cls_timeindex_add_prepare_entry(entry, key_timestamp, key_ext, bl); + cls_timeindex_add(op, entry); +} + +void cls_timeindex_trim( + librados::ObjectWriteOperation& op, + const utime_t& from_time, + const utime_t& to_time, + const std::string& from_marker, + const std::string& to_marker) +{ + librados::bufferlist in; + cls_timeindex_trim_op call; + call.from_time = from_time; + call.to_time = to_time; + call.from_marker = from_marker; + call.to_marker = to_marker; + + encode(call, in); + + op.exec("timeindex", "trim", in); +} + +int cls_timeindex_trim( + librados::IoCtx& io_ctx, + const std::string& oid, + const utime_t& from_time, + const utime_t& to_time, + const std::string& from_marker, + const std::string& to_marker) +{ + bool done = false; + + do { + librados::ObjectWriteOperation op; + cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker); + int r = io_ctx.operate(oid, &op); + + if (r == -ENODATA) + done = true; + else if (r < 0) + return r; + } while (!done); + + return 0; +} + +void cls_timeindex_list( + librados::ObjectReadOperation& op, + const utime_t& from, + const utime_t& to, + const std::string& in_marker, + const int max_entries, + std::list<cls_timeindex_entry>& entries, + std::string *out_marker, + bool *truncated) +{ + librados::bufferlist in; + cls_timeindex_list_op call; + call.from_time = from; + call.to_time = to; + call.marker = in_marker; + call.max_entries = max_entries; + + encode(call, in); + + op.exec("timeindex", "list", in, + new TimeindexListCtx(&entries, out_marker, truncated)); +} diff --git a/src/cls/timeindex/cls_timeindex_client.h b/src/cls/timeindex/cls_timeindex_client.h new file mode 100644 index 000000000..818d4b0c4 --- /dev/null +++ b/src/cls/timeindex/cls_timeindex_client.h @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_TIMEINDEX_CLIENT_H +#define CEPH_CLS_TIMEINDEX_CLIENT_H + +#include "include/rados/librados.hpp" + +#include "cls_timeindex_ops.h" + +/** + * timeindex objclass + */ +class TimeindexListCtx : public librados::ObjectOperationCompletion { + std::list<cls_timeindex_entry> *entries; + std::string *marker; + bool *truncated; + +public: + ///* ctor + TimeindexListCtx( + std::list<cls_timeindex_entry> *_entries, + std::string *_marker, + bool *_truncated) + : entries(_entries), marker(_marker), truncated(_truncated) {} + + ///* dtor + ~TimeindexListCtx() {} + + void handle_completion(int r, ceph::buffer::list& bl) override { + if (r >= 0) { + cls_timeindex_list_ret ret; + try { + auto iter = bl.cbegin(); + decode(ret, iter); + if (entries) + *entries = ret.entries; + if (truncated) + *truncated = ret.truncated; + if (marker) + *marker = ret.marker; + } catch (ceph::buffer::error& err) { + // nothing we can do about it atm + } + } + } +}; + +void cls_timeindex_add_prepare_entry( + cls_timeindex_entry& entry, + const utime_t& key_timestamp, + const std::string& key_ext, + ceph::buffer::list& bl); + +void cls_timeindex_add( + librados::ObjectWriteOperation& op, + const std::list<cls_timeindex_entry>& entry); + +void cls_timeindex_add( + librados::ObjectWriteOperation& op, + const cls_timeindex_entry& entry); + +void cls_timeindex_add( + librados::ObjectWriteOperation& op, + const utime_t& timestamp, + const std::string& name, + const ceph::buffer::list& bl); + +void cls_timeindex_list( + librados::ObjectReadOperation& op, + const utime_t& from, + const utime_t& to, + const std::string& in_marker, + const int max_entries, + std::list<cls_timeindex_entry>& entries, + std::string *out_marker, + bool *truncated); + +void cls_timeindex_trim( + librados::ObjectWriteOperation& op, + const utime_t& from_time, + const utime_t& to_time, + const std::string& from_marker = std::string(), + const std::string& to_marker = std::string()); + +// these overloads which call io_ctx.operate() should not be called in the rgw. +// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate() +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_timeindex_trim( + librados::IoCtx& io_ctx, + const std::string& oid, + const utime_t& from_time, + const utime_t& to_time, + const std::string& from_marker = std::string(), + const std::string& to_marker = std::string()); +#endif + +#endif diff --git a/src/cls/timeindex/cls_timeindex_ops.h b/src/cls/timeindex/cls_timeindex_ops.h new file mode 100644 index 000000000..f40058954 --- /dev/null +++ b/src/cls/timeindex/cls_timeindex_ops.h @@ -0,0 +1,115 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_TIMEINDEX_OPS_H +#define CEPH_CLS_TIMEINDEX_OPS_H + +#include "cls_timeindex_types.h" + +struct cls_timeindex_add_op { + std::list<cls_timeindex_entry> entries; + + cls_timeindex_add_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_timeindex_add_op) + +struct cls_timeindex_list_op { + utime_t from_time; + std::string marker; /* if not empty, overrides from_time */ + utime_t to_time; /* not inclusive */ + int max_entries; /* upperbound to returned num of entries + might return less than that and still be truncated */ + + cls_timeindex_list_op() : max_entries(0) {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(from_time, bl); + encode(marker, bl); + encode(to_time, bl); + encode(max_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(from_time, bl); + decode(marker, bl); + decode(to_time, bl); + decode(max_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_timeindex_list_op) + +struct cls_timeindex_list_ret { + std::list<cls_timeindex_entry> entries; + std::string marker; + bool truncated; + + cls_timeindex_list_ret() : truncated(false) {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(entries, bl); + encode(marker, bl); + encode(truncated, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(entries, bl); + decode(marker, bl); + decode(truncated, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_timeindex_list_ret) + + +/* + * operation will return 0 when successfully removed but not done. Will return + * -ENODATA when done, so caller needs to repeat sending request until that. + */ +struct cls_timeindex_trim_op { + utime_t from_time; + utime_t to_time; /* inclusive */ + std::string from_marker; + std::string to_marker; + + cls_timeindex_trim_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(from_time, bl); + encode(to_time, bl); + encode(from_marker, bl); + encode(to_marker, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(from_time, bl); + decode(to_time, bl); + decode(from_marker, bl); + decode(to_marker, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_timeindex_trim_op) + +#endif /* CEPH_CLS_TIMEINDEX_OPS_H */ diff --git a/src/cls/timeindex/cls_timeindex_types.cc b/src/cls/timeindex/cls_timeindex_types.cc new file mode 100644 index 000000000..98c374186 --- /dev/null +++ b/src/cls/timeindex/cls_timeindex_types.cc @@ -0,0 +1,21 @@ +#include "cls_timeindex_types.h" +#include "common/Formatter.h" + +void cls_timeindex_entry::dump(Formatter *f) const +{ + f->dump_stream("key_ts") << key_ts; + f->dump_string("key_ext", key_ext); + f->dump_string("value", value.to_str()); +} + +void cls_timeindex_entry::generate_test_instances(list<cls_timeindex_entry*>& o) +{ + cls_timeindex_entry *i = new cls_timeindex_entry; + i->key_ts = utime_t(0,0); + i->key_ext = "foo"; + bufferlist bl; + bl.append("bar"); + i->value = bl; + o.push_back(i); + o.push_back(new cls_timeindex_entry); +} diff --git a/src/cls/timeindex/cls_timeindex_types.h b/src/cls/timeindex/cls_timeindex_types.h new file mode 100644 index 000000000..d33886881 --- /dev/null +++ b/src/cls/timeindex/cls_timeindex_types.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_TIMEINDEX_TYPES_H +#define CEPH_CLS_TIMEINDEX_TYPES_H + +#include "include/encoding.h" +#include "include/types.h" + +#include "include/utime.h" + +class JSONObj; + +struct cls_timeindex_entry { + /* Mandatory timestamp. Will be part of the key. */ + utime_t key_ts; + /* Not mandatory. The name_ext field, if not empty, will form second + * part of the key. */ + std::string key_ext; + /* Become value of OMAP-based mapping. */ + ceph::buffer::list value; + + cls_timeindex_entry() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(key_ts, bl); + encode(key_ext, bl); + encode(value, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(key_ts, bl); + decode(key_ext, bl); + decode(value, bl); + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter *f) const; + static void generate_test_instances(std::list<cls_timeindex_entry*>& o); +}; +WRITE_CLASS_ENCODER(cls_timeindex_entry) + +#endif /* CEPH_CLS_TIMEINDEX_TYPES_H */ |