summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd_mirror/ImageDeleter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/rbd_mirror/ImageDeleter.cc')
-rw-r--r--src/tools/rbd_mirror/ImageDeleter.cc548
1 files changed, 548 insertions, 0 deletions
diff --git a/src/tools/rbd_mirror/ImageDeleter.cc b/src/tools/rbd_mirror/ImageDeleter.cc
new file mode 100644
index 000000000..fcdd1baad
--- /dev/null
+++ b/src/tools/rbd_mirror/ImageDeleter.cc
@@ -0,0 +1,548 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/rados/librados.hpp"
+#include "common/Formatter.h"
+#include "common/admin_socket.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "global/global_context.h"
+#include "librbd/internal.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Operations.h"
+#include "librbd/asio/ContextWQ.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include "librbd/Utils.h"
+#include "ImageDeleter.h"
+#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
+#include "tools/rbd_mirror/image_deleter/TrashMoveRequest.h"
+#include "tools/rbd_mirror/image_deleter/TrashRemoveRequest.h"
+#include "tools/rbd_mirror/image_deleter/TrashWatcher.h"
+#include <map>
+#include <sstream>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd_mirror
+
+using std::string;
+using std::stringstream;
+using std::vector;
+using std::pair;
+using std::make_pair;
+
+using librados::IoCtx;
+using namespace librbd;
+
+namespace rbd {
+namespace mirror {
+
+using librbd::util::create_async_context_callback;
+
+namespace {
+
+class ImageDeleterAdminSocketCommand {
+public:
+ virtual ~ImageDeleterAdminSocketCommand() {}
+ virtual int call(Formatter *f) = 0;
+};
+
+template <typename I>
+class StatusCommand : public ImageDeleterAdminSocketCommand {
+public:
+ explicit StatusCommand(ImageDeleter<I> *image_del) : image_del(image_del) {}
+
+ int call(Formatter *f) override {
+ image_del->print_status(f);
+ return 0;
+ }
+
+private:
+ ImageDeleter<I> *image_del;
+};
+
+} // anonymous namespace
+
+template <typename I>
+class ImageDeleterAdminSocketHook : public AdminSocketHook {
+public:
+ ImageDeleterAdminSocketHook(CephContext *cct, const std::string& pool_name,
+ ImageDeleter<I> *image_del) :
+ admin_socket(cct->get_admin_socket()) {
+
+ std::string command;
+ int r;
+
+ command = "rbd mirror deletion status " + pool_name;
+ r = admin_socket->register_command(command, this,
+ "get status for image deleter");
+ if (r == 0) {
+ commands[command] = new StatusCommand<I>(image_del);
+ }
+
+ }
+
+ ~ImageDeleterAdminSocketHook() override {
+ (void)admin_socket->unregister_commands(this);
+ for (Commands::const_iterator i = commands.begin(); i != commands.end();
+ ++i) {
+ delete i->second;
+ }
+ }
+
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
+ Commands::const_iterator i = commands.find(command);
+ ceph_assert(i != commands.end());
+ return i->second->call(f);
+ }
+
+private:
+ typedef std::map<std::string, ImageDeleterAdminSocketCommand*,
+ std::less<>> Commands;
+ AdminSocket *admin_socket;
+ Commands commands;
+};
+
+template <typename I>
+ImageDeleter<I>::ImageDeleter(
+ librados::IoCtx& local_io_ctx, Threads<librbd::ImageCtx>* threads,
+ Throttler<librbd::ImageCtx>* image_deletion_throttler,
+ ServiceDaemon<librbd::ImageCtx>* service_daemon)
+ : m_local_io_ctx(local_io_ctx), m_threads(threads),
+ m_image_deletion_throttler(image_deletion_throttler),
+ m_service_daemon(service_daemon), m_trash_listener(this),
+ m_lock(ceph::make_mutex(
+ librbd::util::unique_lock_name("rbd::mirror::ImageDeleter::m_lock",
+ this))) {
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::ImageDeleter: " << " " \
+ << __func__ << ": "
+
+template <typename I>
+void ImageDeleter<I>::trash_move(librados::IoCtx& local_io_ctx,
+ const std::string& global_image_id,
+ bool resync,
+ librbd::asio::ContextWQ* work_queue,
+ Context* on_finish) {
+ dout(10) << "global_image_id=" << global_image_id << ", "
+ << "resync=" << resync << dendl;
+
+ auto req = rbd::mirror::image_deleter::TrashMoveRequest<>::create(
+ local_io_ctx, global_image_id, resync, work_queue, on_finish);
+ req->send();
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::ImageDeleter: " << this << " " \
+ << __func__ << ": "
+
+template <typename I>
+void ImageDeleter<I>::init(Context* on_finish) {
+ dout(10) << dendl;
+
+ m_asok_hook = new ImageDeleterAdminSocketHook<I>(
+ g_ceph_context, m_local_io_ctx.get_pool_name(), this);
+
+ m_trash_watcher = image_deleter::TrashWatcher<I>::create(m_local_io_ctx,
+ m_threads,
+ m_trash_listener);
+ m_trash_watcher->init(on_finish);
+}
+
+template <typename I>
+void ImageDeleter<I>::shut_down(Context* on_finish) {
+ dout(10) << dendl;
+
+ delete m_asok_hook;
+ m_asok_hook = nullptr;
+
+ m_image_deletion_throttler->drain(m_local_io_ctx.get_namespace(),
+ -ESTALE);
+
+ shut_down_trash_watcher(on_finish);
+}
+
+template <typename I>
+void ImageDeleter<I>::shut_down_trash_watcher(Context* on_finish) {
+ dout(10) << dendl;
+ ceph_assert(m_trash_watcher);
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ delete m_trash_watcher;
+ m_trash_watcher = nullptr;
+
+ wait_for_ops(on_finish);
+ });
+ m_trash_watcher->shut_down(ctx);
+}
+
+template <typename I>
+void ImageDeleter<I>::wait_for_ops(Context* on_finish) {
+ {
+ std::scoped_lock locker{m_threads->timer_lock, m_lock};
+ m_running = false;
+ cancel_retry_timer();
+ }
+
+ auto ctx = new LambdaContext([this, on_finish](int) {
+ cancel_all_deletions(on_finish);
+ });
+ m_async_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void ImageDeleter<I>::cancel_all_deletions(Context* on_finish) {
+ m_image_deletion_throttler->drain(m_local_io_ctx.get_namespace(),
+ -ECANCELED);
+ {
+ std::lock_guard locker{m_lock};
+ // wake up any external state machines waiting on deletions
+ ceph_assert(m_in_flight_delete_queue.empty());
+ for (auto& queue : {&m_delete_queue, &m_retry_delete_queue}) {
+ for (auto& info : *queue) {
+ notify_on_delete(info->image_id, -ECANCELED);
+ }
+ queue->clear();
+ }
+ }
+ on_finish->complete(0);
+}
+
+template <typename I>
+void ImageDeleter<I>::wait_for_deletion(const std::string& image_id,
+ bool scheduled_only,
+ Context* on_finish) {
+ dout(5) << "image_id=" << image_id << dendl;
+
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ m_threads->work_queue->queue(on_finish, r);
+ });
+
+ std::lock_guard locker{m_lock};
+ auto del_info = find_delete_info(image_id);
+ if (!del_info && scheduled_only) {
+ // image not scheduled for deletion
+ on_finish->complete(0);
+ return;
+ }
+
+ notify_on_delete(image_id, -ESTALE);
+ m_on_delete_contexts[image_id] = on_finish;
+}
+
+template <typename I>
+void ImageDeleter<I>::complete_active_delete(DeleteInfoRef* delete_info,
+ int r) {
+ dout(20) << "info=" << *delete_info << ", r=" << r << dendl;
+ std::lock_guard locker{m_lock};
+ notify_on_delete((*delete_info)->image_id, r);
+ delete_info->reset();
+}
+
+template <typename I>
+void ImageDeleter<I>::enqueue_failed_delete(DeleteInfoRef* delete_info,
+ int error_code,
+ double retry_delay) {
+ dout(20) << "info=" << *delete_info << ", r=" << error_code << dendl;
+ if (error_code == -EBLOCKLISTED) {
+ std::lock_guard locker{m_lock};
+ derr << "blocklisted while deleting local image" << dendl;
+ complete_active_delete(delete_info, error_code);
+ return;
+ }
+
+ std::scoped_lock locker{m_threads->timer_lock, m_lock};
+ auto& delete_info_ref = *delete_info;
+ notify_on_delete(delete_info_ref->image_id, error_code);
+ delete_info_ref->error_code = error_code;
+ ++delete_info_ref->retries;
+ delete_info_ref->retry_time = (clock_t::now() +
+ ceph::make_timespan(retry_delay));
+ m_retry_delete_queue.push_back(delete_info_ref);
+
+ schedule_retry_timer();
+}
+
+template <typename I>
+typename ImageDeleter<I>::DeleteInfoRef
+ImageDeleter<I>::find_delete_info(const std::string &image_id) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ DeleteQueue delete_queues[] = {m_in_flight_delete_queue,
+ m_retry_delete_queue,
+ m_delete_queue};
+
+ DeleteInfo delete_info{image_id};
+ for (auto& queue : delete_queues) {
+ auto it = std::find_if(queue.begin(), queue.end(),
+ [&delete_info](const DeleteInfoRef& ref) {
+ return delete_info == *ref;
+ });
+ if (it != queue.end()) {
+ return *it;
+ }
+ }
+ return {};
+}
+
+template <typename I>
+void ImageDeleter<I>::print_status(Formatter *f) {
+ dout(20) << dendl;
+
+ f->open_object_section("image_deleter_status");
+ f->open_array_section("delete_images_queue");
+
+ std::lock_guard l{m_lock};
+ for (const auto& image : m_delete_queue) {
+ image->print_status(f);
+ }
+
+ f->close_section();
+ f->open_array_section("failed_deletes_queue");
+ for (const auto& image : m_retry_delete_queue) {
+ image->print_status(f, true);
+ }
+
+ f->close_section();
+ f->close_section();
+}
+
+template <typename I>
+vector<string> ImageDeleter<I>::get_delete_queue_items() {
+ vector<string> items;
+
+ std::lock_guard l{m_lock};
+ for (const auto& del_info : m_delete_queue) {
+ items.push_back(del_info->image_id);
+ }
+
+ return items;
+}
+
+template <typename I>
+vector<pair<string, int> > ImageDeleter<I>::get_failed_queue_items() {
+ vector<pair<string, int> > items;
+
+ std::lock_guard l{m_lock};
+ for (const auto& del_info : m_retry_delete_queue) {
+ items.push_back(make_pair(del_info->image_id,
+ del_info->error_code));
+ }
+
+ return items;
+}
+
+template <typename I>
+void ImageDeleter<I>::remove_images() {
+ dout(10) << dendl;
+
+ std::lock_guard locker{m_lock};
+ while (m_running && !m_delete_queue.empty()) {
+
+ DeleteInfoRef delete_info = m_delete_queue.front();
+ m_delete_queue.pop_front();
+
+ ceph_assert(delete_info);
+
+ auto on_start = create_async_context_callback(
+ m_threads->work_queue, new LambdaContext(
+ [this, delete_info](int r) {
+ if (r < 0) {
+ notify_on_delete(delete_info->image_id, r);
+ return;
+ }
+ remove_image(delete_info);
+ }));
+
+ m_image_deletion_throttler->start_op(m_local_io_ctx.get_namespace(),
+ delete_info->image_id, on_start);
+ }
+}
+
+template <typename I>
+void ImageDeleter<I>::remove_image(DeleteInfoRef delete_info) {
+ dout(10) << "info=" << *delete_info << dendl;
+
+ std::lock_guard locker{m_lock};
+
+ m_in_flight_delete_queue.push_back(delete_info);
+ m_async_op_tracker.start_op();
+
+ auto ctx = new LambdaContext([this, delete_info](int r) {
+ handle_remove_image(delete_info, r);
+ m_async_op_tracker.finish_op();
+ });
+
+ auto req = image_deleter::TrashRemoveRequest<I>::create(
+ m_local_io_ctx, delete_info->image_id, &delete_info->error_result,
+ m_threads->work_queue, ctx);
+ req->send();
+}
+
+template <typename I>
+void ImageDeleter<I>::handle_remove_image(DeleteInfoRef delete_info,
+ int r) {
+ dout(10) << "info=" << *delete_info << ", r=" << r << dendl;
+
+ m_image_deletion_throttler->finish_op(m_local_io_ctx.get_namespace(),
+ delete_info->image_id);
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ auto it = std::find(m_in_flight_delete_queue.begin(),
+ m_in_flight_delete_queue.end(), delete_info);
+ ceph_assert(it != m_in_flight_delete_queue.end());
+ m_in_flight_delete_queue.erase(it);
+ }
+
+ if (r < 0) {
+ if (delete_info->error_result == image_deleter::ERROR_RESULT_COMPLETE) {
+ complete_active_delete(&delete_info, r);
+ } else if (delete_info->error_result ==
+ image_deleter::ERROR_RESULT_RETRY_IMMEDIATELY) {
+ enqueue_failed_delete(&delete_info, r, m_busy_interval);
+ } else {
+ auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
+ double failed_interval = cct->_conf.get_val<double>(
+ "rbd_mirror_delete_retry_interval");
+ enqueue_failed_delete(&delete_info, r, failed_interval);
+ }
+ } else {
+ complete_active_delete(&delete_info, 0);
+ }
+
+ // process the next queued image to delete
+ remove_images();
+}
+
+template <typename I>
+void ImageDeleter<I>::schedule_retry_timer() {
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ if (!m_running || m_timer_ctx != nullptr || m_retry_delete_queue.empty()) {
+ return;
+ }
+
+ dout(10) << dendl;
+ auto &delete_info = m_retry_delete_queue.front();
+ m_timer_ctx = new LambdaContext([this](int r) {
+ handle_retry_timer();
+ });
+ m_threads->timer->add_event_at(delete_info->retry_time, m_timer_ctx);
+}
+
+template <typename I>
+void ImageDeleter<I>::cancel_retry_timer() {
+ dout(10) << dendl;
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ if (m_timer_ctx != nullptr) {
+ bool canceled = m_threads->timer->cancel_event(m_timer_ctx);
+ m_timer_ctx = nullptr;
+ ceph_assert(canceled);
+ }
+}
+
+template <typename I>
+void ImageDeleter<I>::handle_retry_timer() {
+ dout(10) << dendl;
+ ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
+ std::lock_guard locker{m_lock};
+
+ ceph_assert(m_timer_ctx != nullptr);
+ m_timer_ctx = nullptr;
+
+ ceph_assert(m_running);
+ ceph_assert(!m_retry_delete_queue.empty());
+
+ // move all ready-to-ready items back to main queue
+ auto now = clock_t::now();
+ while (!m_retry_delete_queue.empty()) {
+ auto &delete_info = m_retry_delete_queue.front();
+ if (delete_info->retry_time > now) {
+ break;
+ }
+
+ m_delete_queue.push_back(delete_info);
+ m_retry_delete_queue.pop_front();
+ }
+
+ // schedule wake up for any future retries
+ schedule_retry_timer();
+
+ // start (concurrent) removal of images
+ m_async_op_tracker.start_op();
+ auto ctx = new LambdaContext([this](int r) {
+ remove_images();
+ m_async_op_tracker.finish_op();
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageDeleter<I>::handle_trash_image(const std::string& image_id,
+ const ImageDeleter<I>::clock_t::time_point& deferment_end_time) {
+ std::scoped_lock locker{m_threads->timer_lock, m_lock};
+
+ auto del_info = find_delete_info(image_id);
+ if (del_info != nullptr) {
+ dout(20) << "image " << image_id << " "
+ << "was already scheduled for deletion" << dendl;
+ return;
+ }
+
+ dout(10) << "image_id=" << image_id << ", "
+ << "deferment_end_time=" << utime_t{deferment_end_time} << dendl;
+
+ del_info.reset(new DeleteInfo(image_id));
+ del_info->retry_time = deferment_end_time;
+ m_retry_delete_queue.push_back(del_info);
+
+ schedule_retry_timer();
+}
+
+template <typename I>
+void ImageDeleter<I>::notify_on_delete(const std::string& image_id,
+ int r) {
+ dout(10) << "image_id=" << image_id << ", r=" << r << dendl;
+ auto it = m_on_delete_contexts.find(image_id);
+ if (it == m_on_delete_contexts.end()) {
+ return;
+ }
+
+ it->second->complete(r);
+ m_on_delete_contexts.erase(it);
+}
+
+template <typename I>
+void ImageDeleter<I>::DeleteInfo::print_status(Formatter *f,
+ bool print_failure_info) {
+ f->open_object_section("delete_info");
+ f->dump_string("image_id", image_id);
+ if (print_failure_info) {
+ f->dump_string("error_code", cpp_strerror(error_code));
+ f->dump_int("retries", retries);
+ }
+ f->close_section();
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::ImageDeleter<librbd::ImageCtx>;