diff options
Diffstat (limited to 'src/cls/log')
-rw-r--r-- | src/cls/log/cls_log.cc | 323 | ||||
-rw-r--r-- | src/cls/log/cls_log_client.cc | 160 | ||||
-rw-r--r-- | src/cls/log/cls_log_client.h | 39 | ||||
-rw-r--r-- | src/cls/log/cls_log_ops.h | 156 | ||||
-rw-r--r-- | src/cls/log/cls_log_types.h | 74 |
5 files changed, 752 insertions, 0 deletions
diff --git a/src/cls/log/cls_log.cc b/src/cls/log/cls_log.cc new file mode 100644 index 000000000..58a8524da --- /dev/null +++ b/src/cls/log/cls_log.cc @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" +#include "include/utime.h" +#include "objclass/objclass.h" + +#include "cls_log_types.h" +#include "cls_log_ops.h" + +#include "global/global_context.h" +#include "include/compat.h" + +using std::map; +using std::string; + +using ceph::bufferlist; + +CLS_VER(1,0) +CLS_NAME(log) + +static string log_index_prefix = "1_"; + + +static int write_log_entry(cls_method_context_t hctx, string& index, cls_log_entry& entry) +{ + bufferlist bl; + encode(entry, bl); + + int ret = cls_cxx_map_set_val(hctx, index, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static void get_index_time_prefix(utime_t& ts, string& index) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "%010ld.%06ld_", (long)ts.sec(), (long)ts.usec()); + + index = log_index_prefix + buf; +} + +static int read_header(cls_method_context_t hctx, cls_log_header& header) +{ + bufferlist header_bl; + + int ret = cls_cxx_map_read_header(hctx, &header_bl); + if (ret < 0) + return ret; + + if (header_bl.length() == 0) { + header = cls_log_header(); + return 0; + } + + auto iter = header_bl.cbegin(); + try { + decode(header, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: read_header(): failed to decode header"); + } + + return 0; +} + +static int write_header(cls_method_context_t hctx, cls_log_header& header) +{ + bufferlist header_bl; + encode(header, header_bl); + + int ret = cls_cxx_map_write_header(hctx, &header_bl); + if (ret < 0) + return ret; + + return 0; +} + +static void get_index(cls_method_context_t hctx, utime_t& ts, string& index) +{ + get_index_time_prefix(ts, index); + + string unique_id; + + cls_cxx_subop_version(hctx, &unique_id); + + index.append(unique_id); +} + +static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_add_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_log_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_log_header header; + + int ret = read_header(hctx, header); + if (ret < 0) + return ret; + + for (auto iter = op.entries.begin(); iter != op.entries.end(); ++iter) { + cls_log_entry& entry = *iter; + + string index; + + utime_t timestamp = entry.timestamp; + if (op.monotonic_inc && timestamp < header.max_time) + timestamp = header.max_time; + else if (timestamp > header.max_time) + header.max_time = timestamp; + + if (entry.id.empty()) { + get_index(hctx, timestamp, index); + entry.id = index; + } else { + index = entry.id; + } + + CLS_LOG(20, "storing entry at %s", index.c_str()); + + + if (index > header.max_marker) + header.max_marker = index; + + ret = write_log_entry(hctx, index, entry); + if (ret < 0) + return ret; + } + + ret = write_header(hctx, header); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_log_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_list_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_log_list_op(): failed to decode op"); + return -EINVAL; + } + + map<string, bufferlist> keys; + + string from_index; + string to_index; + + if (op.marker.empty()) { + get_index_time_prefix(op.from_time, from_index); + } else { + from_index = op.marker; + } + bool use_time_boundary = (!op.from_time.is_zero() && (op.to_time >= op.from_time)); + + if (use_time_boundary) + get_index_time_prefix(op.to_time, to_index); + +#define MAX_ENTRIES 1000 + size_t max_entries = op.max_entries; + if (!max_entries || max_entries > MAX_ENTRIES) + max_entries = MAX_ENTRIES; + + cls_log_list_ret ret; + + int rc = cls_cxx_map_get_vals(hctx, from_index, log_index_prefix, max_entries, &keys, &ret.truncated); + if (rc < 0) + return rc; + + auto& entries = ret.entries; + auto iter = keys.begin(); + + string marker; + + for (; iter != keys.end(); ++iter) { + const string& index = iter->first; + marker = index; + if (use_time_boundary && index.compare(0, to_index.size(), to_index) >= 0) { + ret.truncated = false; + break; + } + + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + try { + cls_log_entry e; + decode(e, biter); + entries.push_back(e); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: cls_log_list: could not decode entry, index=%s", index.c_str()); + } + } + + ret.marker = marker; + + encode(ret, *out); + + return 0; +} + + +static int cls_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_trim_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: cls_log_trim(): failed to decode entry"); + return -EINVAL; + } + + string from_index; + string to_index; + + if (op.from_marker.empty()) { + get_index_time_prefix(op.from_time, from_index); + } else { + from_index = op.from_marker; + } + + // cls_cxx_map_remove_range() expects one-past-end + if (op.to_marker.empty()) { + auto t = op.to_time; + t.nsec_ref() += 1000; // equivalent to usec() += 1 + t.normalize(); + get_index_time_prefix(t, to_index); + } else { + to_index = op.to_marker; + to_index.append(1, '\0'); + } + + // list a single key to detect whether the range is empty + const size_t max_entries = 1; + std::set<std::string> keys; + bool more = false; + + int rc = cls_cxx_map_get_keys(hctx, from_index, max_entries, &keys, &more); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc); + return rc; + } + + if (keys.empty()) { + CLS_LOG(20, "range is empty from_index=%s", from_index.c_str()); + return -ENODATA; + } + + const std::string& first_key = *keys.begin(); + if (to_index < first_key) { + CLS_LOG(20, "listed key %s past to_index=%s", first_key.c_str(), to_index.c_str()); + return -ENODATA; + } + + CLS_LOG(20, "listed key %s, removing through %s", first_key.c_str(), to_index.c_str()); + + rc = cls_cxx_map_remove_range(hctx, first_key, to_index); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc); + return rc; + } + + return 0; +} + +static int cls_log_info(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_info_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_log_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_log_info_ret ret; + + int rc = read_header(hctx, ret.header); + if (rc < 0) + return rc; + + encode(ret, *out); + + return 0; +} + +CLS_INIT(log) +{ + CLS_LOG(1, "Loaded log class!"); + + cls_handle_t h_class; + cls_method_handle_t h_log_add; + cls_method_handle_t h_log_list; + cls_method_handle_t h_log_trim; + cls_method_handle_t h_log_info; + + cls_register("log", &h_class); + + /* log */ + cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR, cls_log_add, &h_log_add); + cls_register_cxx_method(h_class, "list", CLS_METHOD_RD, cls_log_list, &h_log_list); + cls_register_cxx_method(h_class, "trim", CLS_METHOD_RD | CLS_METHOD_WR, cls_log_trim, &h_log_trim); + cls_register_cxx_method(h_class, "info", CLS_METHOD_RD, cls_log_info, &h_log_info); + + return; +} + diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc new file mode 100644 index 000000000..182bb9fec --- /dev/null +++ b/src/cls/log/cls_log_client.cc @@ -0,0 +1,160 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include <errno.h> + +#include "cls/log/cls_log_ops.h" +#include "include/rados/librados.hpp" +#include "include/compat.h" + + +using std::list; +using std::string; + +using ceph::bufferlist; + +using namespace librados; + + + +void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entries, bool monotonic_inc) +{ + bufferlist in; + cls_log_add_op call; + call.entries = entries; + encode(call, in); + op.exec("log", "add", in); +} + +void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry) +{ + bufferlist in; + cls_log_add_op call; + call.entries.push_back(entry); + encode(call, in); + op.exec("log", "add", in); +} + +void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, + const string& section, const string& name, bufferlist& bl) +{ + entry.timestamp = timestamp; + entry.section = section; + entry.name = name; + entry.data = bl; +} + +void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, + const string& section, const string& name, bufferlist& bl) +{ + cls_log_entry entry; + + cls_log_add_prepare_entry(entry, timestamp, section, name, bl); + cls_log_add(op, entry); +} + +void cls_log_trim(librados::ObjectWriteOperation& op, const utime_t& from_time, const utime_t& to_time, + const string& from_marker, const string& to_marker) +{ + bufferlist in; + cls_log_trim_op call; + call.from_time = from_time; + call.to_time = to_time; + call.from_marker = from_marker; + call.to_marker = to_marker; + encode(call, in); + op.exec("log", "trim", in); +} + +int cls_log_trim(librados::IoCtx& io_ctx, const string& oid, const utime_t& from_time, const utime_t& to_time, + const string& from_marker, const string& to_marker) +{ + bool done = false; + + do { + ObjectWriteOperation op; + + cls_log_trim(op, from_time, to_time, from_marker, to_marker); + + int r = io_ctx.operate(oid, &op); + if (r == -ENODATA) + done = true; + else if (r < 0) + return r; + + } while (!done); + + + return 0; +} + +class LogListCtx : public ObjectOperationCompletion { + list<cls_log_entry> *entries; + string *marker; + bool *truncated; +public: + LogListCtx(list<cls_log_entry> *_entries, string *_marker, bool *_truncated) : + entries(_entries), marker(_marker), truncated(_truncated) {} + void handle_completion(int r, bufferlist& outbl) override { + if (r >= 0) { + cls_log_list_ret ret; + try { + auto iter = outbl.cbegin(); + decode(ret, iter); + if (entries) + *entries = std::move(ret.entries); + if (truncated) + *truncated = ret.truncated; + if (marker) + *marker = std::move(ret.marker); + } catch (ceph::buffer::error& err) { + // nothing we can do about it atm + } + } + } +}; + +void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, + const utime_t& to, const string& in_marker, int max_entries, + list<cls_log_entry>& entries, + string *out_marker, bool *truncated) +{ + bufferlist inbl; + cls_log_list_op call; + call.from_time = from; + call.to_time = to; + call.marker = in_marker; + call.max_entries = max_entries; + + encode(call, inbl); + + op.exec("log", "list", inbl, new LogListCtx(&entries, out_marker, truncated)); +} + +class LogInfoCtx : public ObjectOperationCompletion { + cls_log_header *header; +public: + explicit LogInfoCtx(cls_log_header *_header) : header(_header) {} + void handle_completion(int r, bufferlist& outbl) override { + if (r >= 0) { + cls_log_info_ret ret; + try { + auto iter = outbl.cbegin(); + decode(ret, iter); + if (header) + *header = ret.header; + } catch (ceph::buffer::error& err) { + // nothing we can do about it atm + } + } + } +}; + +void cls_log_info(librados::ObjectReadOperation& op, cls_log_header *header) +{ + bufferlist inbl; + cls_log_info_op call; + + encode(call, inbl); + + op.exec("log", "info", inbl, new LogInfoCtx(header)); +} diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h new file mode 100644 index 000000000..2afdabeb3 --- /dev/null +++ b/src/cls/log/cls_log_client.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_LOG_CLIENT_H +#define CEPH_CLS_LOG_CLIENT_H + +#include "include/rados/librados_fwd.hpp" +#include "cls_log_types.h" + +/* + * log objclass + */ + +void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, + const std::string& section, const std::string& name, ceph::buffer::list& bl); + +void cls_log_add(librados::ObjectWriteOperation& op, std::list<cls_log_entry>& entries, bool monotonic_inc); +void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); +void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, + const std::string& section, const std::string& name, ceph::buffer::list& bl); + +void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, + const utime_t& to, const std::string& in_marker, + int max_entries, std::list<cls_log_entry>& entries, + std::string *out_marker, bool *truncated); + +void cls_log_trim(librados::ObjectWriteOperation& op, const utime_t& from_time, const utime_t& to_time, + const std::string& from_marker, const std::string& to_marker); + +// these overloads which call io_ctx.operate() should not be called in the rgw. +// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate() +#ifndef CLS_CLIENT_HIDE_IOCTX +int cls_log_trim(librados::IoCtx& io_ctx, const std::string& oid, const utime_t& from_time, const utime_t& to_time, + const std::string& from_marker, const std::string& to_marker); +#endif + +void cls_log_info(librados::ObjectReadOperation& op, cls_log_header *header); + +#endif diff --git a/src/cls/log/cls_log_ops.h b/src/cls/log/cls_log_ops.h new file mode 100644 index 000000000..0cedc8802 --- /dev/null +++ b/src/cls/log/cls_log_ops.h @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CLS_LOG_OPS_H +#define CEPH_CLS_LOG_OPS_H + +#include "cls_log_types.h" + +struct cls_log_add_op { + std::list<cls_log_entry> entries; + bool monotonic_inc; + + cls_log_add_op() : monotonic_inc(true) {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(2, 1, bl); + encode(entries, bl); + encode(monotonic_inc, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(2, bl); + decode(entries, bl); + if (struct_v >= 2) { + decode(monotonic_inc, bl); + } + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_add_op) + +struct cls_log_list_op { + utime_t from_time; + std::string marker; /* if not empty, overrides from_time */ + utime_t to_time; /* not inclusive */ + int max_entries; /* upperbound to returned num of entries + might return less than that and still be truncated */ + + cls_log_list_op() : max_entries(0) {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(from_time, bl); + encode(marker, bl); + encode(to_time, bl); + encode(max_entries, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(from_time, bl); + decode(marker, bl); + decode(to_time, bl); + decode(max_entries, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_list_op) + +struct cls_log_list_ret { + std::list<cls_log_entry> entries; + std::string marker; + bool truncated; + + cls_log_list_ret() : truncated(false) {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(entries, bl); + encode(marker, bl); + encode(truncated, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(entries, bl); + decode(marker, bl); + decode(truncated, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_list_ret) + + +/* + * operation will return 0 when successfully removed but not done. Will return + * -ENODATA when done, so caller needs to repeat sending request until that. + */ +struct cls_log_trim_op { + utime_t from_time; + utime_t to_time; /* inclusive */ + std::string from_marker; + std::string to_marker; + + cls_log_trim_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(2, 1, bl); + encode(from_time, bl); + encode(to_time, bl); + encode(from_marker, bl); + encode(to_marker, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(2, bl); + decode(from_time, bl); + decode(to_time, bl); + if (struct_v >= 2) { + decode(from_marker, bl); + decode(to_marker, bl); + } + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_trim_op) + +struct cls_log_info_op { + cls_log_info_op() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + // currently empty request + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + // currently empty request + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_info_op) + +struct cls_log_info_ret { + cls_log_header header; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(header, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(header, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_info_ret) + +#endif diff --git a/src/cls/log/cls_log_types.h b/src/cls/log/cls_log_types.h new file mode 100644 index 000000000..1746d243e --- /dev/null +++ b/src/cls/log/cls_log_types.h @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_CLS_LOG_TYPES_H +#define CEPH_CLS_LOG_TYPES_H + +#include "include/encoding.h" +#include "include/types.h" + +#include "include/utime.h" + +class JSONObj; + + +struct cls_log_entry { + std::string id; + std::string section; + std::string name; + utime_t timestamp; + ceph::buffer::list data; + + cls_log_entry() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(2, 1, bl); + encode(section, bl); + encode(name, bl); + encode(timestamp, bl); + encode(data, bl); + encode(id, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(2, bl); + decode(section, bl); + decode(name, bl); + decode(timestamp, bl); + decode(data, bl); + if (struct_v >= 2) + decode(id, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(cls_log_entry) + +struct cls_log_header { + std::string max_marker; + utime_t max_time; + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(max_marker, bl); + encode(max_time, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(max_marker, bl); + decode(max_time, bl); + DECODE_FINISH(bl); + } +}; +inline bool operator ==(const cls_log_header& lhs, const cls_log_header& rhs) { + return (lhs.max_marker == rhs.max_marker && + lhs.max_time == rhs.max_time); +} +inline bool operator !=(const cls_log_header& lhs, const cls_log_header& rhs) { + return !(lhs == rhs); +} +WRITE_CLASS_ENCODER(cls_log_header) + + +#endif |