summaryrefslogtreecommitdiffstats
path: root/src/cls/cephfs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/cls/cephfs/cls_cephfs.cc210
-rw-r--r--src/cls/cephfs/cls_cephfs.h147
-rw-r--r--src/cls/cephfs/cls_cephfs_client.cc177
-rw-r--r--src/cls/cephfs/cls_cephfs_client.h33
4 files changed, 567 insertions, 0 deletions
diff --git a/src/cls/cephfs/cls_cephfs.cc b/src/cls/cephfs/cls_cephfs.cc
new file mode 100644
index 00000000..7e3214b9
--- /dev/null
+++ b/src/cls/cephfs/cls_cephfs.cc
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include <string>
+#include <errno.h>
+
+#include "objclass/objclass.h"
+
+#include "cls_cephfs.h"
+
+CLS_VER(1,0)
+CLS_NAME(cephfs)
+
+
+std::ostream &operator<<(std::ostream &out, const ObjCeiling &in)
+{
+ out << "id: " << in.id << " size: " << in.size;
+ return out;
+}
+
+
+/**
+ * Set a named xattr to a given value, if and only if the xattr
+ * is not already set to a greater value.
+ *
+ * If the xattr is missing, then it is set to the input integer.
+ *
+ * @param xattr_name: name of xattr to compare against and set
+ * @param input_val: candidate new value, of encode()'able type
+ * @returns 0 on success (irrespective of whether our new value
+ * was used) else an error code
+ */
+template <typename A>
+static int set_if_greater(cls_method_context_t hctx,
+ const std::string &xattr_name, const A input_val)
+{
+ bufferlist existing_val_bl;
+
+ bool set_val = false;
+ int r = cls_cxx_getxattr(hctx, xattr_name.c_str(), &existing_val_bl);
+ if (r == -ENOENT || existing_val_bl.length() == 0) {
+ set_val = true;
+ } else if (r >= 0) {
+ auto existing_p = existing_val_bl.cbegin();
+ try {
+ A existing_val;
+ decode(existing_val, existing_p);
+ if (!existing_p.end()) {
+ // Trailing junk? Consider it invalid and overwrite
+ set_val = true;
+ } else {
+ // Valid existing value, do comparison
+ set_val = input_val > existing_val;
+ }
+ } catch (const buffer::error &err) {
+ // Corrupt or empty existing value, overwrite it
+ set_val = true;
+ }
+ } else {
+ return r;
+ }
+
+ // Conditionally set the new xattr
+ if (set_val) {
+ bufferlist set_bl;
+ encode(input_val, set_bl);
+ return cls_cxx_setxattr(hctx, xattr_name.c_str(), &set_bl);
+ } else {
+ return 0;
+ }
+}
+
+static int accumulate_inode_metadata(cls_method_context_t hctx,
+ bufferlist *in, bufferlist *out)
+{
+ ceph_assert(in != NULL);
+ ceph_assert(out != NULL);
+
+ int r = 0;
+
+ // Decode `in`
+ auto q = in->cbegin();
+ AccumulateArgs args;
+ try {
+ args.decode(q);
+ } catch (const buffer::error &err) {
+ return -EINVAL;
+ }
+
+ ObjCeiling ceiling(args.obj_index, args.obj_size);
+ r = set_if_greater(hctx, args.obj_xattr_name, ceiling);
+ if (r < 0) {
+ return r;
+ }
+
+ r = set_if_greater(hctx, args.mtime_xattr_name, args.mtime);
+ if (r < 0) {
+ return r;
+ }
+
+ r = set_if_greater(hctx, args.obj_size_xattr_name, args.obj_size);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+// I want to select objects that have a name ending 00000000
+// and an xattr (scrub_tag) not equal to a specific value.
+// This is so special case that we can't really pretend it's
+// generic, so just fess up and call this the cephfs filter.
+class PGLSCephFSFilter : public PGLSFilter {
+protected:
+ std::string scrub_tag;
+public:
+ int init(bufferlist::const_iterator& params) override {
+ try {
+ InodeTagFilterArgs args;
+ args.decode(params);
+ scrub_tag = args.scrub_tag;
+ } catch (buffer::error &e) {
+ return -EINVAL;
+ }
+
+ if (scrub_tag.empty()) {
+ xattr = "";
+ } else {
+ xattr = "_scrub_tag";
+ }
+
+ return 0;
+ }
+
+ ~PGLSCephFSFilter() override {}
+ bool reject_empty_xattr() override { return false; }
+ bool filter(const hobject_t &obj, bufferlist& xattr_data,
+ bufferlist& outdata) override;
+};
+
+bool PGLSCephFSFilter::filter(const hobject_t &obj,
+ bufferlist& xattr_data, bufferlist& outdata)
+{
+ const std::string need_ending = ".00000000";
+ const std::string &obj_name = obj.oid.name;
+
+ if (obj_name.length() < need_ending.length()) {
+ return false;
+ }
+
+ const bool match = obj_name.compare (obj_name.length() - need_ending.length(), need_ending.length(), need_ending) == 0;
+ if (!match) {
+ return false;
+ }
+
+ if (!scrub_tag.empty() && xattr_data.length() > 0) {
+ std::string tag_ondisk;
+ auto q = xattr_data.cbegin();
+ try {
+ decode(tag_ondisk, q);
+ if (tag_ondisk == scrub_tag)
+ return false;
+ } catch (const buffer::error &err) {
+ }
+ }
+
+ return true;
+}
+
+PGLSFilter *inode_tag_filter()
+{
+ return new PGLSCephFSFilter();
+}
+
+/**
+ * initialize class
+ *
+ * We do two things here: we register the new class, and then register
+ * all of the class's methods.
+ */
+CLS_INIT(cephfs)
+{
+ // this log message, at level 0, will always appear in the ceph-osd
+ // log file.
+ CLS_LOG(0, "loading cephfs");
+
+ cls_handle_t h_class;
+ cls_method_handle_t h_accumulate_inode_metadata;
+
+ cls_register("cephfs", &h_class);
+ cls_register_cxx_method(h_class, "accumulate_inode_metadata",
+ CLS_METHOD_WR | CLS_METHOD_RD,
+ accumulate_inode_metadata, &h_accumulate_inode_metadata);
+
+ // A PGLS filter
+ cls_register_cxx_filter(h_class, "inode_tag", inode_tag_filter);
+}
+
diff --git a/src/cls/cephfs/cls_cephfs.h b/src/cls/cephfs/cls_cephfs.h
new file mode 100644
index 00000000..89f4dab4
--- /dev/null
+++ b/src/cls/cephfs/cls_cephfs.h
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/encoding.h"
+
+/**
+ * Value class for the xattr we'll use to accumulate
+ * the highest object seen for a given inode
+ */
+class ObjCeiling {
+ public:
+ uint64_t id;
+ uint64_t size;
+
+ ObjCeiling()
+ : id(0), size(0)
+ {}
+
+ ObjCeiling(uint64_t id_, uint64_t size_)
+ : id(id_), size(size_)
+ {}
+
+ bool operator >(ObjCeiling const &rhs) const
+ {
+ return id > rhs.id;
+ }
+
+ void encode(bufferlist &bl) const
+ {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(size, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator &p)
+ {
+ DECODE_START(1, p);
+ decode(id, p);
+ decode(size, p);
+ DECODE_FINISH(p);
+ }
+};
+WRITE_CLASS_ENCODER(ObjCeiling)
+
+class AccumulateArgs
+{
+public:
+ uint64_t obj_index;
+ uint64_t obj_size;
+ int64_t mtime;
+ std::string obj_xattr_name;
+ std::string mtime_xattr_name;
+ std::string obj_size_xattr_name;
+
+ AccumulateArgs(
+ uint64_t obj_index_,
+ uint64_t obj_size_,
+ time_t mtime_,
+ const std::string &obj_xattr_name_,
+ const std::string &mtime_xattr_name_,
+ const std::string &obj_size_xattr_name_)
+ : obj_index(obj_index_),
+ obj_size(obj_size_),
+ mtime(mtime_),
+ obj_xattr_name(obj_xattr_name_),
+ mtime_xattr_name(mtime_xattr_name_),
+ obj_size_xattr_name(obj_size_xattr_name_)
+ {}
+
+ AccumulateArgs()
+ : obj_index(0), obj_size(0), mtime(0)
+ {}
+
+ void encode(bufferlist &bl) const
+ {
+ ENCODE_START(1, 1, bl);
+ encode(obj_xattr_name, bl);
+ encode(mtime_xattr_name, bl);
+ encode(obj_size_xattr_name, bl);
+ encode(obj_index, bl);
+ encode(obj_size, bl);
+ encode(mtime, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator &bl)
+ {
+ DECODE_START(1, bl);
+ decode(obj_xattr_name, bl);
+ decode(mtime_xattr_name, bl);
+ decode(obj_size_xattr_name, bl);
+ decode(obj_index, bl);
+ decode(obj_size, bl);
+ decode(mtime, bl);
+ DECODE_FINISH(bl);
+ }
+};
+
+class InodeTagFilterArgs
+{
+ public:
+ std::string scrub_tag;
+
+ void encode(bufferlist &bl) const
+ {
+ ENCODE_START(1, 1, bl);
+ encode(scrub_tag, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator &bl)
+ {
+ DECODE_START(1, bl);
+ decode(scrub_tag, bl);
+ DECODE_FINISH(bl);
+ }
+};
+
+class AccumulateResult
+{
+public:
+ // Index of the highest-indexed object seen
+ uint64_t ceiling_obj_index;
+ // Size of the highest-index object seen
+ uint64_t ceiling_obj_size;
+ // Largest object seen
+ uint64_t max_obj_size;
+ // Highest mtime seen
+ int64_t max_mtime;
+
+ AccumulateResult()
+ : ceiling_obj_index(0), ceiling_obj_size(0), max_obj_size(0), max_mtime(0)
+ {}
+};
+
diff --git a/src/cls/cephfs/cls_cephfs_client.cc b/src/cls/cephfs/cls_cephfs_client.cc
new file mode 100644
index 00000000..988eab41
--- /dev/null
+++ b/src/cls/cephfs/cls_cephfs_client.cc
@@ -0,0 +1,177 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+
+#include "include/rados/librados.hpp"
+#include "mds/CInode.h"
+
+#include "cls_cephfs_client.h"
+
+#define XATTR_CEILING "scan_ceiling"
+#define XATTR_MAX_MTIME "scan_max_mtime"
+#define XATTR_MAX_SIZE "scan_max_size"
+
+int ClsCephFSClient::accumulate_inode_metadata(
+ librados::IoCtx &ctx,
+ inodeno_t inode_no,
+ const uint64_t obj_index,
+ const uint64_t obj_size,
+ const time_t mtime)
+{
+ AccumulateArgs args(
+ obj_index,
+ obj_size,
+ mtime,
+ XATTR_CEILING,
+ XATTR_MAX_MTIME,
+ XATTR_MAX_SIZE);
+
+ // Generate 0th object name, where we will accumulate sizes/mtimes
+ object_t zeroth_object = InodeStore::get_object_name(inode_no, frag_t(), "");
+
+ // Construct a librados operation invoking our class method
+ librados::ObjectReadOperation op;
+ bufferlist inbl;
+ args.encode(inbl);
+ op.exec("cephfs", "accumulate_inode_metadata", inbl);
+
+ // Execute op
+ bufferlist outbl;
+ return ctx.operate(zeroth_object.name, &op, &outbl);
+}
+
+int ClsCephFSClient::delete_inode_accumulate_result(
+ librados::IoCtx &ctx,
+ const std::string &oid)
+{
+ librados::ObjectWriteOperation op;
+
+ // Remove xattrs from object
+ //
+ op.rmxattr(XATTR_CEILING);
+ op.rmxattr(XATTR_MAX_SIZE);
+ op.rmxattr(XATTR_MAX_MTIME);
+
+ return (ctx.operate(oid, &op));
+}
+
+int ClsCephFSClient::fetch_inode_accumulate_result(
+ librados::IoCtx &ctx,
+ const std::string &oid,
+ inode_backtrace_t *backtrace,
+ file_layout_t *layout,
+ AccumulateResult *result)
+{
+ ceph_assert(backtrace != NULL);
+ ceph_assert(result != NULL);
+
+ librados::ObjectReadOperation op;
+
+ int scan_ceiling_r = 0;
+ bufferlist scan_ceiling_bl;
+ op.getxattr(XATTR_CEILING, &scan_ceiling_bl, &scan_ceiling_r);
+
+ int scan_max_size_r = 0;
+ bufferlist scan_max_size_bl;
+ op.getxattr(XATTR_MAX_SIZE, &scan_max_size_bl, &scan_max_size_r);
+
+ int scan_max_mtime_r = 0;
+ bufferlist scan_max_mtime_bl;
+ op.getxattr(XATTR_MAX_MTIME, &scan_max_mtime_bl, &scan_max_mtime_r);
+
+ int parent_r = 0;
+ bufferlist parent_bl;
+ op.getxattr("parent", &parent_bl, &parent_r);
+ op.set_op_flags2(librados::OP_FAILOK);
+
+ int layout_r = 0;
+ bufferlist layout_bl;
+ op.getxattr("layout", &layout_bl, &layout_r);
+ op.set_op_flags2(librados::OP_FAILOK);
+
+ bufferlist op_bl;
+ int r = ctx.operate(oid, &op, &op_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ // Load scan_ceiling
+ try {
+ auto scan_ceiling_bl_iter = scan_ceiling_bl.cbegin();
+ ObjCeiling ceiling;
+ ceiling.decode(scan_ceiling_bl_iter);
+ result->ceiling_obj_index = ceiling.id;
+ result->ceiling_obj_size = ceiling.size;
+ } catch (const buffer::error &err) {
+ //dout(4) << "Invalid size attr on '" << oid << "'" << dendl;
+ return -EINVAL;
+ }
+
+ // Load scan_max_size
+ try {
+ auto scan_max_size_bl_iter = scan_max_size_bl.cbegin();
+ decode(result->max_obj_size, scan_max_size_bl_iter);
+ } catch (const buffer::error &err) {
+ //dout(4) << "Invalid size attr on '" << oid << "'" << dendl;
+ return -EINVAL;
+ }
+
+ // Load scan_max_mtime
+ try {
+ auto scan_max_mtime_bl_iter = scan_max_mtime_bl.cbegin();
+ decode(result->max_mtime, scan_max_mtime_bl_iter);
+ } catch (const buffer::error &err) {
+ //dout(4) << "Invalid size attr on '" << oid << "'" << dendl;
+ return -EINVAL;
+ }
+
+ // Deserialize backtrace
+ if (parent_bl.length()) {
+ try {
+ auto q = parent_bl.cbegin();
+ backtrace->decode(q);
+ } catch (buffer::error &e) {
+ //dout(4) << "Corrupt backtrace on '" << oid << "': " << e << dendl;
+ return -EINVAL;
+ }
+ }
+
+ // Deserialize layout
+ if (layout_bl.length()) {
+ try {
+ auto q = layout_bl.cbegin();
+ decode(*layout, q);
+ } catch (buffer::error &e) {
+ return -EINVAL;
+ }
+ }
+
+ return 0;
+}
+
+void ClsCephFSClient::build_tag_filter(
+ const std::string &scrub_tag,
+ bufferlist *out_bl)
+{
+ ceph_assert(out_bl != NULL);
+
+ // Leading part of bl is un-versioned string naming the filter
+ encode(std::string("cephfs.inode_tag"), *out_bl);
+
+ // Filter-specific part of the bl: in our case this is a versioned structure
+ InodeTagFilterArgs args;
+ args.scrub_tag = scrub_tag;
+ args.encode(*out_bl);
+}
diff --git a/src/cls/cephfs/cls_cephfs_client.h b/src/cls/cephfs/cls_cephfs_client.h
new file mode 100644
index 00000000..744c0aed
--- /dev/null
+++ b/src/cls/cephfs/cls_cephfs_client.h
@@ -0,0 +1,33 @@
+
+#include "include/rados/librados_fwd.hpp"
+#include "mds/mdstypes.h"
+#include "cls_cephfs.h"
+
+class AccumulateArgs;
+
+class ClsCephFSClient
+{
+ public:
+ static int accumulate_inode_metadata(
+ librados::IoCtx &ctx,
+ inodeno_t inode_no,
+ const uint64_t obj_index,
+ const uint64_t obj_size,
+ const time_t mtime);
+
+ static int fetch_inode_accumulate_result(
+ librados::IoCtx &ctx,
+ const std::string &oid,
+ inode_backtrace_t *backtrace,
+ file_layout_t *layout,
+ AccumulateResult *result);
+
+ static int delete_inode_accumulate_result(
+ librados::IoCtx &ctx,
+ const std::string &oid);
+
+ static void build_tag_filter(
+ const std::string &scrub_tag,
+ bufferlist *out_bl);
+};
+