From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/tools/ceph_dedup_tool.cc | 834 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 834 insertions(+) create mode 100644 src/tools/ceph_dedup_tool.cc (limited to 'src/tools/ceph_dedup_tool.cc') diff --git a/src/tools/ceph_dedup_tool.cc b/src/tools/ceph_dedup_tool.cc new file mode 100644 index 00000000..1713cde4 --- /dev/null +++ b/src/tools/ceph_dedup_tool.cc @@ -0,0 +1,834 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Author: Myoungwon Oh + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "include/types.h" + +#include "include/rados/buffer.h" +#include "include/rados/librados.hpp" +#include "include/rados/rados_types.hpp" + +#include "acconfig.h" + +#include "common/config.h" +#include "common/ceph_argparse.h" +#include "global/global_init.h" +#include "common/Cond.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/Formatter.h" +#include "common/obj_bencher.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "tools/RadosDump.h" +#include "cls/cas/cls_cas_client.h" +#include "include/stringify.h" +#include "global/signal_handler.h" + +using namespace librados; +unsigned default_op_size = 1 << 22; +unsigned default_max_thread = 2; +int32_t default_report_period = 2; +map< string, pair > chunk_statistics; // < key, > +Mutex glock("chunk_statistics::Locker"); + +void usage() +{ + cout << " usage: [--op ] [--pool ] " << std::endl; + cout << " --object " << std::endl; + cout << " --chunk-size chunk-size (byte) " << std::endl; + cout << " --chunk-algorithm " << std::endl; + cout << " --fingerprint-algorithm " << std::endl; + cout << " --chunk-pool " << std::endl; + cout << " --max-thread " << std::endl; + cout << " --report-perioid " << std::endl; + exit(1); +} + +[[noreturn]] static void usage_exit() +{ + usage(); + exit(1); +} + +template +static int rados_sistrtoll(I &i, T *val) { + std::string err; + *val = strict_iecstrtoll(i->second.c_str(), &err); + if (err != "") { + cerr << "Invalid value for " << i->first << ": " << err << std::endl; + return -EINVAL; + } else { + return 0; + } +} + +class EstimateDedupRatio; +class ChunkScrub; +class EstimateThread : public Thread +{ + IoCtx io_ctx; + int n; + int m; + ObjectCursor begin; + ObjectCursor end; + Mutex m_lock; + Cond m_cond; + int32_t timeout; + bool m_stop = false; + uint64_t total_bytes = 0; + uint64_t examined_objects = 0; + uint64_t total_objects = 0; +#define COND_WAIT_INTERVAL 10 + +public: + EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, int32_t timeout): + io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), m_lock("EstimateThread::Locker"), timeout(timeout) + {} + void signal(int signum) { + Mutex::Locker l(m_lock); + m_stop = true; + m_cond.Signal(); + } + virtual void print_status(Formatter *f, ostream &out) = 0; + uint64_t count_objects(IoCtx &ioctx, ObjectCursor &begin, ObjectCursor &end); + uint64_t get_examined_objects() { return examined_objects; } + uint64_t get_total_bytes() { return total_bytes; } + uint64_t get_total_objects() { return total_objects; } + friend class EstimateDedupRatio; + friend class ChunkScrub; +}; + +class EstimateDedupRatio : public EstimateThread +{ + string chunk_algo; + string fp_algo; + uint64_t chunk_size; + map< string, pair > local_chunk_statistics; // < key, > + +public: + EstimateDedupRatio(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, + string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t timeout): + EstimateThread(io_ctx, n, m, begin, end, timeout), chunk_algo(chunk_algo), fp_algo(fp_algo), + chunk_size(chunk_size) { } + + void* entry() { + count_objects(io_ctx, begin, end); + estimate_dedup_ratio(); + return NULL; + } + void estimate_dedup_ratio(); + void print_status(Formatter *f, ostream &out); + map< string, pair > &get_chunk_statistics() { return local_chunk_statistics; } + uint64_t fixed_chunk(string oid, uint64_t offset); +}; + +class ChunkScrub: public EstimateThread +{ + IoCtx chunk_io_ctx; + int fixed_objects = 0; + +public: + ChunkScrub(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, + IoCtx& chunk_io_ctx, int32_t timeout): + EstimateThread(io_ctx, n, m, begin, end, timeout), chunk_io_ctx(chunk_io_ctx) + { } + void* entry() { + count_objects(chunk_io_ctx, begin, end); + chunk_scrub_common(); + return NULL; + } + void chunk_scrub_common(); + int get_fixed_objects() { return fixed_objects; } + void print_status(Formatter *f, ostream &out); +}; + +vector> estimate_threads; + +uint64_t EstimateThread::count_objects(IoCtx &ioctx, ObjectCursor &begin, ObjectCursor &end) +{ + ObjectCursor shard_start; + ObjectCursor shard_end; + uint64_t count = 0; + + ioctx.object_list_slice( + begin, + end, + n, + m, + &shard_start, + &shard_end); + + ObjectCursor c(shard_start); + while(c < shard_end) + { + std::vector result; + int r = ioctx.object_list(c, shard_end, 12, {}, &result, &c); + if (r < 0 ) { + cerr << "error object_list : " << cpp_strerror(r) << std::endl; + return 0; + } + count += result.size(); + total_objects += result.size(); + } + return count; +} + +static void print_dedup_estimate(bool debug = false) +{ + uint64_t total_size = 0; + uint64_t dedup_size = 0; + uint64_t examined_objects = 0; + uint64_t total_objects = 0; + EstimateDedupRatio *ratio = NULL; + for (auto &et : estimate_threads) { + Mutex::Locker l(glock); + ratio = dynamic_cast(et.get()); + assert(ratio); + for (auto p : ratio->get_chunk_statistics()) { + auto c = chunk_statistics.find(p.first); + if (c != chunk_statistics.end()) { + c->second.first += p.second.first; + } else { + chunk_statistics.insert(p); + } + } + } + + if (debug) { + for (auto p : chunk_statistics) { + cout << " -- " << std::endl; + cout << " key: " << p.first << std::endl; + cout << " count: " << p.second.first << std::endl; + cout << " chunk_size: " << p.second.second << std::endl; + dedup_size += p.second.second; + cout << " -- " << std::endl; + } + } else { + for (auto p : chunk_statistics) { + dedup_size += p.second.second; + } + + } + + for (auto &et : estimate_threads) { + total_size += et->get_total_bytes(); + examined_objects += et->get_examined_objects(); + total_objects += et->get_total_objects(); + } + + cout << " result: " << total_size << " | " << dedup_size << " (total size | deduped size) " << std::endl; + cout << " Dedup ratio: " << (100 - (double)(dedup_size)/total_size*100) << " % " << std::endl; + cout << " Examined objects: " << examined_objects << std::endl; + cout << " Total objects: " << total_objects << std::endl; +} + +static void handle_signal(int signum) +{ + Mutex::Locker l(glock); + for (auto &p : estimate_threads) { + p->signal(signum); + } +} + +void EstimateDedupRatio::print_status(Formatter *f, ostream &out) +{ + if (f) { + f->open_array_section("estimate_dedup_ratio"); + f->dump_string("PID", stringify(get_pid())); + for (auto p : local_chunk_statistics) { + f->open_object_section("fingerprint object"); + f->dump_string("fingperint", p.first); + f->dump_string("count", stringify(p.second.first)); + f->dump_string("chunk_size", stringify(p.second.second)); + } + f->close_section(); + f->open_object_section("Status"); + f->dump_string("Total bytes", stringify(total_bytes)); + f->dump_string("Examined objectes", stringify(examined_objects)); + f->close_section(); + f->flush(out); + cout << std::endl; + } +} + +void EstimateDedupRatio::estimate_dedup_ratio() +{ + ObjectCursor shard_start; + ObjectCursor shard_end; + utime_t cur_time = ceph_clock_now(); + + io_ctx.object_list_slice( + begin, + end, + n, + m, + &shard_start, + &shard_end); + + ObjectCursor c(shard_start); + while(c < shard_end) + { + std::vector result; + int r = io_ctx.object_list(c, shard_end, 12, {}, &result, &c); + if (r < 0 ){ + cerr << "error object_list : " << cpp_strerror(r) << std::endl; + return; + } + + for (const auto & i : result) { + const auto &oid = i.oid; + uint64_t offset = 0; + while (true) { + Mutex::Locker l(m_lock); + if (m_stop) { + Formatter *formatter = Formatter::create("json-pretty"); + print_status(formatter, cout); + delete formatter; + return; + } + + uint64_t next_offset; + if (chunk_algo == "fixed") { + next_offset = fixed_chunk(oid, offset); + } else { + // CDC .. + ceph_assert(0 == "no support chunk algorithm"); + } + + if (!next_offset) { + break; + } + offset += next_offset; + m_cond.WaitInterval(m_lock,utime_t(0, COND_WAIT_INTERVAL)); + if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) { + Formatter *formatter = Formatter::create("json-pretty"); + print_status(formatter, cout); + delete formatter; + cur_time = ceph_clock_now(); + } + } + examined_objects++; + } + } +} + +uint64_t EstimateDedupRatio::fixed_chunk(string oid, uint64_t offset) +{ + unsigned op_size = default_op_size; + int ret; + bufferlist outdata; + ret = io_ctx.read(oid, outdata, op_size, offset); + if (ret <= 0) { + return 0; + } + + if (fp_algo == "sha1") { + uint64_t c_offset = 0; + while (c_offset < outdata.length()) { + bufferlist chunk; + if (outdata.length() - c_offset > chunk_size) { + bufferptr bptr(chunk_size); + chunk.push_back(std::move(bptr)); + chunk.copy_in(0, chunk_size, outdata.c_str()); + } else { + bufferptr bptr(outdata.length() - c_offset); + chunk.push_back(std::move(bptr)); + chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str()); + } + sha1_digest_t sha1_val = chunk.sha1(); + string fp = sha1_val.to_str(); + auto p = local_chunk_statistics.find(fp); + if (p != local_chunk_statistics.end()) { + uint64_t count = p->second.first; + count++; + local_chunk_statistics[fp] = make_pair(count, chunk.length()); + } else { + local_chunk_statistics[fp] = make_pair(1, chunk.length()); + } + total_bytes += chunk.length(); + c_offset = c_offset + chunk_size; + } + } else { + ceph_assert(0 == "no support fingerperint algorithm"); + } + + if (outdata.length() < op_size) { + return 0; + } + return outdata.length(); +} + +void ChunkScrub::chunk_scrub_common() +{ + ObjectCursor shard_start; + ObjectCursor shard_end; + int ret; + utime_t cur_time = ceph_clock_now(); + + chunk_io_ctx.object_list_slice( + begin, + end, + n, + m, + &shard_start, + &shard_end); + + ObjectCursor c(shard_start); + while(c < shard_end) + { + std::vector result; + int r = chunk_io_ctx.object_list(c, shard_end, 12, {}, &result, &c); + if (r < 0 ){ + cerr << "error object_list : " << cpp_strerror(r) << std::endl; + return; + } + + for (const auto & i : result) { + Mutex::Locker l(m_lock); + if (m_stop) { + Formatter *formatter = Formatter::create("json-pretty"); + print_status(formatter, cout); + delete formatter; + return; + } + auto oid = i.oid; + set refs; + set real_refs; + ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs); + if (ret < 0) { + continue; + } + + for (auto pp : refs) { + ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid); + if (ret != -ENOENT) { + real_refs.insert(pp); + } + } + + if (refs.size() != real_refs.size()) { + ObjectWriteOperation op; + cls_chunk_refcount_set(op, real_refs); + ret = chunk_io_ctx.operate(oid, &op); + if (ret < 0) { + continue; + } + fixed_objects++; + } + examined_objects++; + m_cond.WaitInterval(m_lock,utime_t(0, COND_WAIT_INTERVAL)); + if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) { + Formatter *formatter = Formatter::create("json-pretty"); + print_status(formatter, cout); + delete formatter; + cur_time = ceph_clock_now(); + } + } + } +} + +void ChunkScrub::print_status(Formatter *f, ostream &out) +{ + if (f) { + f->open_array_section("chunk_scrub"); + f->dump_string("PID", stringify(get_pid())); + f->open_object_section("Status"); + f->dump_string("Total object", stringify(total_objects)); + f->dump_string("Examined objectes", stringify(examined_objects)); + f->dump_string("Fixed objectes", stringify(fixed_objects)); + f->close_section(); + f->flush(out); + cout << std::endl; + } +} + +int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, + std::vector &nargs) +{ + Rados rados; + IoCtx io_ctx; + std::string chunk_algo; + string fp_algo; + string pool_name; + uint64_t chunk_size = 0; + unsigned max_thread = default_max_thread; + uint32_t report_period = default_report_period; + int ret; + std::map::const_iterator i; + bool debug = false; + ObjectCursor begin; + ObjectCursor end; + + i = opts.find("pool"); + if (i != opts.end()) { + pool_name = i->second.c_str(); + } + i = opts.find("chunk-algorithm"); + if (i != opts.end()) { + chunk_algo = i->second.c_str(); + if (chunk_algo != "fixed") { + usage_exit(); + } + } else { + usage_exit(); + } + + i = opts.find("fingerprint-algorithm"); + if (i != opts.end()) { + fp_algo = i->second.c_str(); + if (fp_algo != "sha1") { + usage_exit(); + } + } else { + usage_exit(); + } + + i = opts.find("chunk-size"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &chunk_size)) { + return -EINVAL; + } + } else { + usage_exit(); + } + + i = opts.find("max-thread"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &max_thread)) { + return -EINVAL; + } + } + + i = opts.find("report-period"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &report_period)) { + return -EINVAL; + } + } + i = opts.find("debug"); + if (i != opts.end()) { + debug = true; + } + + i = opts.find("pgid"); + boost::optional pgid(i != opts.end(), pg_t()); + + ret = rados.init_with_context(g_ceph_context); + if (ret < 0) { + cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl; + goto out; + } + ret = rados.connect(); + if (ret) { + cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl; + ret = -1; + goto out; + } + if (pool_name.empty()) { + cerr << "--create-pool requested but pool_name was not specified!" << std::endl; + usage_exit(); + } + ret = rados.ioctx_create(pool_name.c_str(), io_ctx); + if (ret < 0) { + cerr << "error opening pool " + << pool_name << ": " + << cpp_strerror(ret) << std::endl; + goto out; + } + + glock.Lock(); + begin = io_ctx.object_list_begin(); + end = io_ctx.object_list_end(); + for (unsigned i = 0; i < max_thread; i++) { + std::unique_ptr ptr (new EstimateDedupRatio(io_ctx, i, max_thread, begin, end, + chunk_algo, fp_algo, chunk_size, + report_period)); + ptr->create("estimate_thread"); + estimate_threads.push_back(move(ptr)); + } + glock.Unlock(); + + for (auto &p : estimate_threads) { + p->join(); + } + + print_dedup_estimate(debug); + + out: + return (ret < 0) ? 1 : 0; +} + +static void print_chunk_scrub() +{ + uint64_t total_objects = 0; + uint64_t examined_objects = 0; + int fixed_objects = 0; + + for (auto &et : estimate_threads) { + total_objects += et->get_total_objects(); + examined_objects += et->get_examined_objects(); + ChunkScrub *ptr = static_cast(et.get()); + fixed_objects += ptr->get_fixed_objects(); + } + + cout << " Total object : " << total_objects << std::endl; + cout << " Examined object : " << examined_objects << std::endl; + cout << " Fixed object : " << fixed_objects << std::endl; +} + +int chunk_scrub_common(const std::map < std::string, std::string > &opts, + std::vector &nargs) +{ + Rados rados; + IoCtx io_ctx, chunk_io_ctx; + std::string object_name, target_object_name; + string pool_name, chunk_pool_name, op_name; + int ret; + unsigned max_thread = default_max_thread; + std::map::const_iterator i; + uint32_t report_period = default_report_period; + ObjectCursor begin; + ObjectCursor end; + + i = opts.find("pool"); + if (i != opts.end()) { + pool_name = i->second.c_str(); + } else { + usage_exit(); + } + i = opts.find("op_name"); + if (i != opts.end()) { + op_name= i->second.c_str(); + } else { + usage_exit(); + } + + i = opts.find("chunk-pool"); + if (i != opts.end()) { + chunk_pool_name = i->second.c_str(); + } else { + usage_exit(); + } + i = opts.find("max-thread"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &max_thread)) { + return -EINVAL; + } + } + i = opts.find("report-period"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &report_period)) { + return -EINVAL; + } + } + i = opts.find("pgid"); + boost::optional pgid(i != opts.end(), pg_t()); + + ret = rados.init_with_context(g_ceph_context); + if (ret < 0) { + cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl; + goto out; + } + ret = rados.connect(); + if (ret) { + cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl; + ret = -1; + goto out; + } + if (pool_name.empty()) { + cerr << "--create-pool requested but pool_name was not specified!" << std::endl; + usage_exit(); + } + ret = rados.ioctx_create(pool_name.c_str(), io_ctx); + if (ret < 0) { + cerr << "error opening pool " + << pool_name << ": " + << cpp_strerror(ret) << std::endl; + goto out; + } + ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx); + if (ret < 0) { + cerr << "error opening pool " + << chunk_pool_name << ": " + << cpp_strerror(ret) << std::endl; + goto out; + } + + if (op_name == "add_chunk_ref") { + string target_object_name; + i = opts.find("object"); + if (i != opts.end()) { + object_name = i->second.c_str(); + } else { + usage_exit(); + } + i = opts.find("target-ref"); + if (i != opts.end()) { + target_object_name = i->second.c_str(); + } else { + usage_exit(); + } + + set refs; + ret = cls_chunk_refcount_read(chunk_io_ctx, object_name, &refs); + if (ret < 0) { + cerr << " cls_chunk_refcount_read fail : " << cpp_strerror(ret) << std::endl; + return ret; + } + for (auto p : refs) { + cout << " " << p.oid.name << " "; + } + + uint32_t hash; + ret = chunk_io_ctx.get_object_hash_position2(object_name, &hash); + if (ret < 0) { + return ret; + } + hobject_t oid(sobject_t(target_object_name, CEPH_NOSNAP), "", hash, -1, ""); + refs.insert(oid); + + ObjectWriteOperation op; + cls_chunk_refcount_set(op, refs); + ret = chunk_io_ctx.operate(object_name, &op); + if (ret < 0) { + cerr << " operate fail : " << cpp_strerror(ret) << std::endl; + } + + return ret; + + } else if (op_name == "get_chunk_ref") { + i = opts.find("object"); + if (i != opts.end()) { + object_name = i->second.c_str(); + } else { + usage_exit(); + } + set refs; + cout << " refs: " << std::endl; + ret = cls_chunk_refcount_read(chunk_io_ctx, object_name, &refs); + for (auto p : refs) { + cout << " " << p.oid.name << " "; + } + cout << std::endl; + return ret; + } + + glock.Lock(); + begin = io_ctx.object_list_begin(); + end = io_ctx.object_list_end(); + for (unsigned i = 0; i < max_thread; i++) { + std::unique_ptr ptr (new ChunkScrub(io_ctx, i, max_thread, begin, end, chunk_io_ctx, + report_period)); + ptr->create("estimate_thread"); + estimate_threads.push_back(move(ptr)); + } + glock.Unlock(); + + for (auto &p : estimate_threads) { + p->join(); + } + + print_chunk_scrub(); + +out: + return (ret < 0) ? 1 : 0; +} + +int main(int argc, const char **argv) +{ + vector args; + argv_to_vec(argc, argv, args); + if (args.empty()) { + cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + std::string fn; + string op_name; + + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); + init_async_signal_handler(); + register_async_signal_handler_oneshot(SIGINT, handle_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_signal); + std::map < std::string, std::string > opts; + std::string val; + std::vector::iterator i; + for (i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "--op", (char*)NULL)) { + opts["op_name"] = val; + op_name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--pool", (char*)NULL)) { + opts["pool"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--object", (char*)NULL)) { + opts["object"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--chunk-algorithm", (char*)NULL)) { + opts["chunk-algorithm"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--chunk-size", (char*)NULL)) { + opts["chunk-size"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--fingerprint-algorithm", (char*)NULL)) { + opts["fingerprint-algorithm"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--chunk-pool", (char*)NULL)) { + opts["chunk-pool"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--target-ref", (char*)NULL)) { + opts["target-ref"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--max-thread", (char*)NULL)) { + opts["max-thread"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--report-period", (char*)NULL)) { + opts["report-period"] = val; + } else if (ceph_argparse_flag(args, i, "--debug", (char*)NULL)) { + opts["debug"] = "true"; + } else { + if (val[0] == '-') + usage_exit(); + ++i; + } + } + + if (op_name == "estimate") { + return estimate_dedup_ratio(opts, args); + } else if (op_name == "chunk_scrub") { + return chunk_scrub_common(opts, args); + } else if (op_name == "add_chunk_ref") { + return chunk_scrub_common(opts, args); + } else if (op_name == "get_chunk_ref") { + return chunk_scrub_common(opts, args); + } else { + usage(); + exit(0); + } + + unregister_async_signal_handler(SIGINT, handle_signal); + unregister_async_signal_handler(SIGTERM, handle_signal); + shutdown_async_signal_handler(); + + return 0; +} -- cgit v1.2.3