diff options
Diffstat (limited to 'src/cls/log/cls_log.cc')
-rw-r--r-- | src/cls/log/cls_log.cc | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/src/cls/log/cls_log.cc b/src/cls/log/cls_log.cc new file mode 100644 index 000000000..58a8524da --- /dev/null +++ b/src/cls/log/cls_log.cc @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/types.h" +#include "include/utime.h" +#include "objclass/objclass.h" + +#include "cls_log_types.h" +#include "cls_log_ops.h" + +#include "global/global_context.h" +#include "include/compat.h" + +using std::map; +using std::string; + +using ceph::bufferlist; + +CLS_VER(1,0) +CLS_NAME(log) + +static string log_index_prefix = "1_"; + + +static int write_log_entry(cls_method_context_t hctx, string& index, cls_log_entry& entry) +{ + bufferlist bl; + encode(entry, bl); + + int ret = cls_cxx_map_set_val(hctx, index, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static void get_index_time_prefix(utime_t& ts, string& index) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "%010ld.%06ld_", (long)ts.sec(), (long)ts.usec()); + + index = log_index_prefix + buf; +} + +static int read_header(cls_method_context_t hctx, cls_log_header& header) +{ + bufferlist header_bl; + + int ret = cls_cxx_map_read_header(hctx, &header_bl); + if (ret < 0) + return ret; + + if (header_bl.length() == 0) { + header = cls_log_header(); + return 0; + } + + auto iter = header_bl.cbegin(); + try { + decode(header, iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: read_header(): failed to decode header"); + } + + return 0; +} + +static int write_header(cls_method_context_t hctx, cls_log_header& header) +{ + bufferlist header_bl; + encode(header, header_bl); + + int ret = cls_cxx_map_write_header(hctx, &header_bl); + if (ret < 0) + return ret; + + return 0; +} + +static void get_index(cls_method_context_t hctx, utime_t& ts, string& index) +{ + get_index_time_prefix(ts, index); + + string unique_id; + + cls_cxx_subop_version(hctx, &unique_id); + + index.append(unique_id); +} + +static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_add_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_log_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_log_header header; + + int ret = read_header(hctx, header); + if (ret < 0) + return ret; + + for (auto iter = op.entries.begin(); iter != op.entries.end(); ++iter) { + cls_log_entry& entry = *iter; + + string index; + + utime_t timestamp = entry.timestamp; + if (op.monotonic_inc && timestamp < header.max_time) + timestamp = header.max_time; + else if (timestamp > header.max_time) + header.max_time = timestamp; + + if (entry.id.empty()) { + get_index(hctx, timestamp, index); + entry.id = index; + } else { + index = entry.id; + } + + CLS_LOG(20, "storing entry at %s", index.c_str()); + + + if (index > header.max_marker) + header.max_marker = index; + + ret = write_log_entry(hctx, index, entry); + if (ret < 0) + return ret; + } + + ret = write_header(hctx, header); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_log_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_list_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_log_list_op(): failed to decode op"); + return -EINVAL; + } + + map<string, bufferlist> keys; + + string from_index; + string to_index; + + if (op.marker.empty()) { + get_index_time_prefix(op.from_time, from_index); + } else { + from_index = op.marker; + } + bool use_time_boundary = (!op.from_time.is_zero() && (op.to_time >= op.from_time)); + + if (use_time_boundary) + get_index_time_prefix(op.to_time, to_index); + +#define MAX_ENTRIES 1000 + size_t max_entries = op.max_entries; + if (!max_entries || max_entries > MAX_ENTRIES) + max_entries = MAX_ENTRIES; + + cls_log_list_ret ret; + + int rc = cls_cxx_map_get_vals(hctx, from_index, log_index_prefix, max_entries, &keys, &ret.truncated); + if (rc < 0) + return rc; + + auto& entries = ret.entries; + auto iter = keys.begin(); + + string marker; + + for (; iter != keys.end(); ++iter) { + const string& index = iter->first; + marker = index; + if (use_time_boundary && index.compare(0, to_index.size(), to_index) >= 0) { + ret.truncated = false; + break; + } + + bufferlist& bl = iter->second; + auto biter = bl.cbegin(); + try { + cls_log_entry e; + decode(e, biter); + entries.push_back(e); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: cls_log_list: could not decode entry, index=%s", index.c_str()); + } + } + + ret.marker = marker; + + encode(ret, *out); + + return 0; +} + + +static int cls_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_trim_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(0, "ERROR: cls_log_trim(): failed to decode entry"); + return -EINVAL; + } + + string from_index; + string to_index; + + if (op.from_marker.empty()) { + get_index_time_prefix(op.from_time, from_index); + } else { + from_index = op.from_marker; + } + + // cls_cxx_map_remove_range() expects one-past-end + if (op.to_marker.empty()) { + auto t = op.to_time; + t.nsec_ref() += 1000; // equivalent to usec() += 1 + t.normalize(); + get_index_time_prefix(t, to_index); + } else { + to_index = op.to_marker; + to_index.append(1, '\0'); + } + + // list a single key to detect whether the range is empty + const size_t max_entries = 1; + std::set<std::string> keys; + bool more = false; + + int rc = cls_cxx_map_get_keys(hctx, from_index, max_entries, &keys, &more); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc); + return rc; + } + + if (keys.empty()) { + CLS_LOG(20, "range is empty from_index=%s", from_index.c_str()); + return -ENODATA; + } + + const std::string& first_key = *keys.begin(); + if (to_index < first_key) { + CLS_LOG(20, "listed key %s past to_index=%s", first_key.c_str(), to_index.c_str()); + return -ENODATA; + } + + CLS_LOG(20, "listed key %s, removing through %s", first_key.c_str(), to_index.c_str()); + + rc = cls_cxx_map_remove_range(hctx, first_key, to_index); + if (rc < 0) { + CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc); + return rc; + } + + return 0; +} + +static int cls_log_info(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + auto in_iter = in->cbegin(); + + cls_log_info_op op; + try { + decode(op, in_iter); + } catch (ceph::buffer::error& err) { + CLS_LOG(1, "ERROR: cls_log_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_log_info_ret ret; + + int rc = read_header(hctx, ret.header); + if (rc < 0) + return rc; + + encode(ret, *out); + + return 0; +} + +CLS_INIT(log) +{ + CLS_LOG(1, "Loaded log class!"); + + cls_handle_t h_class; + cls_method_handle_t h_log_add; + cls_method_handle_t h_log_list; + cls_method_handle_t h_log_trim; + cls_method_handle_t h_log_info; + + cls_register("log", &h_class); + + /* log */ + cls_register_cxx_method(h_class, "add", CLS_METHOD_RD | CLS_METHOD_WR, cls_log_add, &h_log_add); + cls_register_cxx_method(h_class, "list", CLS_METHOD_RD, cls_log_list, &h_log_list); + cls_register_cxx_method(h_class, "trim", CLS_METHOD_RD | CLS_METHOD_WR, cls_log_trim, &h_log_trim); + cls_register_cxx_method(h_class, "info", CLS_METHOD_RD, cls_log_info, &h_log_info); + + return; +} + |