summaryrefslogtreecommitdiffstats
path: root/src/cls/timeindex
diff options
context:
space:
mode:
Diffstat (limited to 'src/cls/timeindex')
-rw-r--r--src/cls/timeindex/cls_timeindex.cc266
-rw-r--r--src/cls/timeindex/cls_timeindex_client.cc120
-rw-r--r--src/cls/timeindex/cls_timeindex_client.h98
-rw-r--r--src/cls/timeindex/cls_timeindex_ops.h115
-rw-r--r--src/cls/timeindex/cls_timeindex_types.cc21
-rw-r--r--src/cls/timeindex/cls_timeindex_types.h46
6 files changed, 666 insertions, 0 deletions
diff --git a/src/cls/timeindex/cls_timeindex.cc b/src/cls/timeindex/cls_timeindex.cc
new file mode 100644
index 000000000..5ad8883d8
--- /dev/null
+++ b/src/cls/timeindex/cls_timeindex.cc
@@ -0,0 +1,266 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <errno.h>
+
+#include "objclass/objclass.h"
+
+#include "cls_timeindex_ops.h"
+
+#include "include/compat.h"
+
+using std::map;
+using std::string;
+
+using ceph::bufferlist;
+
+CLS_VER(1,0)
+CLS_NAME(timeindex)
+
+static const size_t MAX_LIST_ENTRIES = 1000;
+static const size_t MAX_TRIM_ENTRIES = 1000;
+
+static const string TIMEINDEX_PREFIX = "1_";
+
+static void get_index_time_prefix(const utime_t& ts,
+ string& index)
+{
+ char buf[32];
+
+ snprintf(buf, sizeof(buf), "%s%010ld.%06ld_", TIMEINDEX_PREFIX.c_str(),
+ (long)ts.sec(), (long)ts.usec());
+ buf[sizeof(buf) - 1] = '\0';
+
+ index = buf;
+}
+
+static void get_index(cls_method_context_t hctx,
+ const utime_t& key_ts,
+ const string& key_ext,
+ string& index)
+{
+ get_index_time_prefix(key_ts, index);
+ index.append(key_ext);
+}
+
+static int parse_index(const string& index,
+ utime_t& key_ts,
+ string& key_ext)
+{
+ int sec, usec;
+ char keyext[256];
+
+ int ret = sscanf(index.c_str(), "1_%d.%d_%255s", &sec, &usec, keyext);
+
+ key_ts = utime_t(sec, usec);
+ key_ext = string(keyext);
+ return ret;
+}
+
+static int cls_timeindex_add(cls_method_context_t hctx,
+ bufferlist * const in,
+ bufferlist * const out)
+{
+ auto in_iter = in->cbegin();
+
+ cls_timeindex_add_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_timeindex_add_op(): failed to decode op");
+ return -EINVAL;
+ }
+
+ for (auto iter = op.entries.begin();
+ iter != op.entries.end();
+ ++iter) {
+ cls_timeindex_entry& entry = *iter;
+
+ string index;
+ get_index(hctx, entry.key_ts, entry.key_ext, index);
+
+ CLS_LOG(20, "storing entry at %s", index.c_str());
+
+ int ret = cls_cxx_map_set_val(hctx, index, &entry.value);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+static int cls_timeindex_list(cls_method_context_t hctx,
+ bufferlist * const in,
+ bufferlist * const out)
+{
+ auto in_iter = in->cbegin();
+
+ cls_timeindex_list_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_timeindex_list_op(): failed to decode op");
+ return -EINVAL;
+ }
+
+ map<string, bufferlist> keys;
+
+ string from_index;
+ string to_index;
+
+ if (op.marker.empty()) {
+ get_index_time_prefix(op.from_time, from_index);
+ } else {
+ from_index = op.marker;
+ }
+ const bool use_time_boundary = (op.to_time >= op.from_time);
+
+ if (use_time_boundary) {
+ get_index_time_prefix(op.to_time, to_index);
+ }
+
+ size_t max_entries = op.max_entries;
+ if (max_entries > MAX_LIST_ENTRIES) {
+ max_entries = MAX_LIST_ENTRIES;
+ }
+
+ cls_timeindex_list_ret ret;
+
+ int rc = cls_cxx_map_get_vals(hctx, from_index, TIMEINDEX_PREFIX,
+ max_entries, &keys, &ret.truncated);
+ if (rc < 0) {
+ return rc;
+ }
+
+ auto& entries = ret.entries;
+ auto iter = keys.begin();
+
+ string marker;
+
+ for (; iter != keys.end(); ++iter) {
+ const string& index = iter->first;
+ bufferlist& bl = iter->second;
+
+ if (use_time_boundary && index.compare(0, to_index.size(), to_index) >= 0) {
+ CLS_LOG(20, "DEBUG: cls_timeindex_list: finishing on to_index=%s",
+ to_index.c_str());
+ ret.truncated = false;
+ break;
+ }
+
+ cls_timeindex_entry e;
+
+ if (parse_index(index, e.key_ts, e.key_ext) < 0) {
+ CLS_LOG(0, "ERROR: cls_timeindex_list: could not parse index=%s",
+ index.c_str());
+ } else {
+ CLS_LOG(20, "DEBUG: cls_timeindex_list: index=%s, key_ext=%s, bl.len = %d",
+ index.c_str(), e.key_ext.c_str(), bl.length());
+ e.value = bl;
+ entries.push_back(e);
+ }
+ marker = index;
+ }
+
+ ret.marker = marker;
+
+ encode(ret, *out);
+
+ return 0;
+}
+
+
+static int cls_timeindex_trim(cls_method_context_t hctx,
+ bufferlist * const in,
+ bufferlist * const out)
+{
+ auto in_iter = in->cbegin();
+
+ cls_timeindex_trim_op op;
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_timeindex_trim: failed to decode entry");
+ return -EINVAL;
+ }
+
+ map<string, bufferlist> keys;
+
+ string from_index;
+ string to_index;
+
+ if (op.from_marker.empty()) {
+ get_index_time_prefix(op.from_time, from_index);
+ } else {
+ from_index = op.from_marker;
+ }
+
+ if (op.to_marker.empty()) {
+ get_index_time_prefix(op.to_time, to_index);
+ } else {
+ to_index = op.to_marker;
+ }
+
+ bool more;
+
+ int rc = cls_cxx_map_get_vals(hctx, from_index, TIMEINDEX_PREFIX,
+ MAX_TRIM_ENTRIES, &keys, &more);
+ if (rc < 0) {
+ return rc;
+ }
+
+ auto iter = keys.begin();
+
+ bool removed = false;
+ for (; iter != keys.end(); ++iter) {
+ const string& index = iter->first;
+
+ CLS_LOG(20, "index=%s to_index=%s", index.c_str(), to_index.c_str());
+
+ if (index.compare(0, to_index.size(), to_index) > 0) {
+ CLS_LOG(20, "DEBUG: cls_timeindex_trim: finishing on to_index=%s",
+ to_index.c_str());
+ break;
+ }
+
+ CLS_LOG(20, "removing key: index=%s", index.c_str());
+
+ int rc = cls_cxx_map_remove_key(hctx, index);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: cls_cxx_map_remove_key failed rc=%d", rc);
+ return rc;
+ }
+
+ removed = true;
+ }
+
+ if (!removed) {
+ return -ENODATA;
+ }
+
+ return 0;
+}
+
+CLS_INIT(timeindex)
+{
+ CLS_LOG(1, "Loaded timeindex class!");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_timeindex_add;
+ cls_method_handle_t h_timeindex_list;
+ cls_method_handle_t h_timeindex_trim;
+
+ cls_register("timeindex", &h_class);
+
+ /* timeindex */
+ cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR,
+ cls_timeindex_add, &h_timeindex_add);
+ cls_register_cxx_method(h_class, "list", CLS_METHOD_RD,
+ cls_timeindex_list, &h_timeindex_list);
+ cls_register_cxx_method(h_class, "trim", CLS_METHOD_RD | CLS_METHOD_WR,
+ cls_timeindex_trim, &h_timeindex_trim);
+
+ return;
+}
+
diff --git a/src/cls/timeindex/cls_timeindex_client.cc b/src/cls/timeindex/cls_timeindex_client.cc
new file mode 100644
index 000000000..7a38ff5fa
--- /dev/null
+++ b/src/cls/timeindex/cls_timeindex_client.cc
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <errno.h>
+
+#include "cls/timeindex/cls_timeindex_ops.h"
+#include "cls/timeindex/cls_timeindex_client.h"
+#include "include/compat.h"
+
+void cls_timeindex_add(
+ librados::ObjectWriteOperation& op,
+ std::list<cls_timeindex_entry>& entries)
+{
+ librados::bufferlist in;
+ cls_timeindex_add_op call;
+ call.entries = entries;
+
+ encode(call, in);
+ op.exec("timeindex", "add", in);
+}
+
+void cls_timeindex_add(
+ librados::ObjectWriteOperation& op,
+ cls_timeindex_entry& entry)
+{
+ librados::bufferlist in;
+ cls_timeindex_add_op call;
+ call.entries.push_back(entry);
+
+ encode(call, in);
+ op.exec("timeindex", "add", in);
+}
+
+void cls_timeindex_add_prepare_entry(
+ cls_timeindex_entry& entry,
+ const utime_t& key_timestamp,
+ const std::string& key_ext,
+ const librados::bufferlist& bl)
+{
+ entry.key_ts = key_timestamp;
+ entry.key_ext = key_ext;
+ entry.value = bl;
+}
+
+void cls_timeindex_add(
+ librados::ObjectWriteOperation& op,
+ const utime_t& key_timestamp,
+ const std::string& key_ext,
+ const librados::bufferlist& bl)
+{
+ cls_timeindex_entry entry;
+ cls_timeindex_add_prepare_entry(entry, key_timestamp, key_ext, bl);
+ cls_timeindex_add(op, entry);
+}
+
+void cls_timeindex_trim(
+ librados::ObjectWriteOperation& op,
+ const utime_t& from_time,
+ const utime_t& to_time,
+ const std::string& from_marker,
+ const std::string& to_marker)
+{
+ librados::bufferlist in;
+ cls_timeindex_trim_op call;
+ call.from_time = from_time;
+ call.to_time = to_time;
+ call.from_marker = from_marker;
+ call.to_marker = to_marker;
+
+ encode(call, in);
+
+ op.exec("timeindex", "trim", in);
+}
+
+int cls_timeindex_trim(
+ librados::IoCtx& io_ctx,
+ const std::string& oid,
+ const utime_t& from_time,
+ const utime_t& to_time,
+ const std::string& from_marker,
+ const std::string& to_marker)
+{
+ bool done = false;
+
+ do {
+ librados::ObjectWriteOperation op;
+ cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker);
+ int r = io_ctx.operate(oid, &op);
+
+ if (r == -ENODATA)
+ done = true;
+ else if (r < 0)
+ return r;
+ } while (!done);
+
+ return 0;
+}
+
+void cls_timeindex_list(
+ librados::ObjectReadOperation& op,
+ const utime_t& from,
+ const utime_t& to,
+ const std::string& in_marker,
+ const int max_entries,
+ std::list<cls_timeindex_entry>& entries,
+ std::string *out_marker,
+ bool *truncated)
+{
+ librados::bufferlist in;
+ cls_timeindex_list_op call;
+ call.from_time = from;
+ call.to_time = to;
+ call.marker = in_marker;
+ call.max_entries = max_entries;
+
+ encode(call, in);
+
+ op.exec("timeindex", "list", in,
+ new TimeindexListCtx(&entries, out_marker, truncated));
+}
diff --git a/src/cls/timeindex/cls_timeindex_client.h b/src/cls/timeindex/cls_timeindex_client.h
new file mode 100644
index 000000000..818d4b0c4
--- /dev/null
+++ b/src/cls/timeindex/cls_timeindex_client.h
@@ -0,0 +1,98 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_TIMEINDEX_CLIENT_H
+#define CEPH_CLS_TIMEINDEX_CLIENT_H
+
+#include "include/rados/librados.hpp"
+
+#include "cls_timeindex_ops.h"
+
+/**
+ * timeindex objclass
+ */
+class TimeindexListCtx : public librados::ObjectOperationCompletion {
+ std::list<cls_timeindex_entry> *entries;
+ std::string *marker;
+ bool *truncated;
+
+public:
+ ///* ctor
+ TimeindexListCtx(
+ std::list<cls_timeindex_entry> *_entries,
+ std::string *_marker,
+ bool *_truncated)
+ : entries(_entries), marker(_marker), truncated(_truncated) {}
+
+ ///* dtor
+ ~TimeindexListCtx() {}
+
+ void handle_completion(int r, ceph::buffer::list& bl) override {
+ if (r >= 0) {
+ cls_timeindex_list_ret ret;
+ try {
+ auto iter = bl.cbegin();
+ decode(ret, iter);
+ if (entries)
+ *entries = ret.entries;
+ if (truncated)
+ *truncated = ret.truncated;
+ if (marker)
+ *marker = ret.marker;
+ } catch (ceph::buffer::error& err) {
+ // nothing we can do about it atm
+ }
+ }
+ }
+};
+
+void cls_timeindex_add_prepare_entry(
+ cls_timeindex_entry& entry,
+ const utime_t& key_timestamp,
+ const std::string& key_ext,
+ ceph::buffer::list& bl);
+
+void cls_timeindex_add(
+ librados::ObjectWriteOperation& op,
+ const std::list<cls_timeindex_entry>& entry);
+
+void cls_timeindex_add(
+ librados::ObjectWriteOperation& op,
+ const cls_timeindex_entry& entry);
+
+void cls_timeindex_add(
+ librados::ObjectWriteOperation& op,
+ const utime_t& timestamp,
+ const std::string& name,
+ const ceph::buffer::list& bl);
+
+void cls_timeindex_list(
+ librados::ObjectReadOperation& op,
+ const utime_t& from,
+ const utime_t& to,
+ const std::string& in_marker,
+ const int max_entries,
+ std::list<cls_timeindex_entry>& entries,
+ std::string *out_marker,
+ bool *truncated);
+
+void cls_timeindex_trim(
+ librados::ObjectWriteOperation& op,
+ const utime_t& from_time,
+ const utime_t& to_time,
+ const std::string& from_marker = std::string(),
+ const std::string& to_marker = std::string());
+
+// these overloads which call io_ctx.operate() should not be called in the rgw.
+// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_timeindex_trim(
+ librados::IoCtx& io_ctx,
+ const std::string& oid,
+ const utime_t& from_time,
+ const utime_t& to_time,
+ const std::string& from_marker = std::string(),
+ const std::string& to_marker = std::string());
+#endif
+
+#endif
diff --git a/src/cls/timeindex/cls_timeindex_ops.h b/src/cls/timeindex/cls_timeindex_ops.h
new file mode 100644
index 000000000..f40058954
--- /dev/null
+++ b/src/cls/timeindex/cls_timeindex_ops.h
@@ -0,0 +1,115 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_TIMEINDEX_OPS_H
+#define CEPH_CLS_TIMEINDEX_OPS_H
+
+#include "cls_timeindex_types.h"
+
+struct cls_timeindex_add_op {
+ std::list<cls_timeindex_entry> entries;
+
+ cls_timeindex_add_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_timeindex_add_op)
+
+struct cls_timeindex_list_op {
+ utime_t from_time;
+ std::string marker; /* if not empty, overrides from_time */
+ utime_t to_time; /* not inclusive */
+ int max_entries; /* upperbound to returned num of entries
+ might return less than that and still be truncated */
+
+ cls_timeindex_list_op() : max_entries(0) {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(from_time, bl);
+ encode(marker, bl);
+ encode(to_time, bl);
+ encode(max_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(from_time, bl);
+ decode(marker, bl);
+ decode(to_time, bl);
+ decode(max_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_timeindex_list_op)
+
+struct cls_timeindex_list_ret {
+ std::list<cls_timeindex_entry> entries;
+ std::string marker;
+ bool truncated;
+
+ cls_timeindex_list_ret() : truncated(false) {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(entries, bl);
+ encode(marker, bl);
+ encode(truncated, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(entries, bl);
+ decode(marker, bl);
+ decode(truncated, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_timeindex_list_ret)
+
+
+/*
+ * operation will return 0 when successfully removed but not done. Will return
+ * -ENODATA when done, so caller needs to repeat sending request until that.
+ */
+struct cls_timeindex_trim_op {
+ utime_t from_time;
+ utime_t to_time; /* inclusive */
+ std::string from_marker;
+ std::string to_marker;
+
+ cls_timeindex_trim_op() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(from_time, bl);
+ encode(to_time, bl);
+ encode(from_marker, bl);
+ encode(to_marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(from_time, bl);
+ decode(to_time, bl);
+ decode(from_marker, bl);
+ decode(to_marker, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_timeindex_trim_op)
+
+#endif /* CEPH_CLS_TIMEINDEX_OPS_H */
diff --git a/src/cls/timeindex/cls_timeindex_types.cc b/src/cls/timeindex/cls_timeindex_types.cc
new file mode 100644
index 000000000..98c374186
--- /dev/null
+++ b/src/cls/timeindex/cls_timeindex_types.cc
@@ -0,0 +1,21 @@
+#include "cls_timeindex_types.h"
+#include "common/Formatter.h"
+
+void cls_timeindex_entry::dump(Formatter *f) const
+{
+ f->dump_stream("key_ts") << key_ts;
+ f->dump_string("key_ext", key_ext);
+ f->dump_string("value", value.to_str());
+}
+
+void cls_timeindex_entry::generate_test_instances(list<cls_timeindex_entry*>& o)
+{
+ cls_timeindex_entry *i = new cls_timeindex_entry;
+ i->key_ts = utime_t(0,0);
+ i->key_ext = "foo";
+ bufferlist bl;
+ bl.append("bar");
+ i->value = bl;
+ o.push_back(i);
+ o.push_back(new cls_timeindex_entry);
+}
diff --git a/src/cls/timeindex/cls_timeindex_types.h b/src/cls/timeindex/cls_timeindex_types.h
new file mode 100644
index 000000000..d33886881
--- /dev/null
+++ b/src/cls/timeindex/cls_timeindex_types.h
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CLS_TIMEINDEX_TYPES_H
+#define CEPH_CLS_TIMEINDEX_TYPES_H
+
+#include "include/encoding.h"
+#include "include/types.h"
+
+#include "include/utime.h"
+
+class JSONObj;
+
+struct cls_timeindex_entry {
+ /* Mandatory timestamp. Will be part of the key. */
+ utime_t key_ts;
+ /* Not mandatory. The name_ext field, if not empty, will form second
+ * part of the key. */
+ std::string key_ext;
+ /* Become value of OMAP-based mapping. */
+ ceph::buffer::list value;
+
+ cls_timeindex_entry() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(key_ts, bl);
+ encode(key_ext, bl);
+ encode(value, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(key_ts, bl);
+ decode(key_ext, bl);
+ decode(value, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(ceph::Formatter *f) const;
+ static void generate_test_instances(std::list<cls_timeindex_entry*>& o);
+};
+WRITE_CLASS_ENCODER(cls_timeindex_entry)
+
+#endif /* CEPH_CLS_TIMEINDEX_TYPES_H */