summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalMetadata.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal/JournalMetadata.cc')
-rw-r--r--src/journal/JournalMetadata.cc1165
1 files changed, 1165 insertions, 0 deletions
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
new file mode 100644
index 000000000..bf04d4e1c
--- /dev/null
+++ b/src/journal/JournalMetadata.cc
@@ -0,0 +1,1165 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "journal/Utils.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+#include <functional>
+#include <set>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalMetadata: " << this << " "
+
+namespace journal {
+
+using namespace cls::journal;
+
+namespace {
+
+struct C_GetClient : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ std::string client_id;
+ cls::journal::Client *client;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ AsyncOpTracker &async_op_tracker, const std::string &client_id,
+ cls::journal::Client *client, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ client_id(client_id), client(client), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_GetClient() override {
+ async_op_tracker.finish_op();
+ }
+
+ virtual void send() {
+ send_get_client();
+ }
+
+ void send_get_client() {
+ ldout(cct, 20) << "C_GetClient: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_client_start(&op, client_id);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_GetClient, &C_GetClient::handle_get_client>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_client(int r) {
+ ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ auto it = out_bl.cbegin();
+ r = client::get_client_finish(&it, client);
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_AllocateTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t tag_class;
+ Tag *tag;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
+ const std::string &oid, AsyncOpTracker &async_op_tracker,
+ uint64_t tag_class, const bufferlist &data, Tag *tag,
+ Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ tag_class(tag_class), tag(tag), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ tag->data = data;
+ }
+ ~C_AllocateTag() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_get_next_tag_tid();
+ }
+
+ void send_get_next_tag_tid() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_next_tag_tid_start(&op);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_next_tag_tid(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ auto iter = out_bl.cbegin();
+ r = client::get_next_tag_tid_finish(&iter, &tag->tid);
+ }
+ if (r < 0) {
+ complete(r);
+ return;
+ }
+ send_tag_create();
+ }
+
+ void send_tag_create() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::tag_create(&op, tag->tid, tag_class, tag->data);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_tag_create>);
+
+ int r = ioctx.aio_operate(oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_tag_create(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == -ESTALE) {
+ send_get_next_tag_tid();
+ return;
+ } else if (r < 0) {
+ complete(r);
+ return;
+ }
+
+ send_get_tag();
+ }
+
+ void send_get_tag() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_tag_start(&op, tag->tid);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_get_tag>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_tag(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ auto iter = out_bl.cbegin();
+
+ cls::journal::Tag journal_tag;
+ r = client::get_tag_finish(&iter, &journal_tag);
+ if (r == 0) {
+ *tag = journal_tag;
+ }
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_GetTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t tag_tid;
+ JournalMetadata::Tag *tag;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ AsyncOpTracker &async_op_tracker, uint64_t tag_tid,
+ JournalMetadata::Tag *tag, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ tag_tid(tag_tid), tag(tag), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_GetTag() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_get_tag();
+ }
+
+ void send_get_tag() {
+ librados::ObjectReadOperation op;
+ client::get_tag_start(&op, tag_tid);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_GetTag, &C_GetTag::handle_get_tag>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_tag(int r) {
+ if (r == 0) {
+ auto iter = out_bl.cbegin();
+ r = client::get_tag_finish(&iter, tag);
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_GetTags : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ const std::string &client_id;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t start_after_tag_tid;
+ boost::optional<uint64_t> tag_class;
+ JournalMetadata::Tags *tags;
+ Context *on_finish;
+
+ const uint64_t MAX_RETURN = 64;
+ bufferlist out_bl;
+
+ C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, AsyncOpTracker &async_op_tracker,
+ uint64_t start_after_tag_tid,
+ const boost::optional<uint64_t> &tag_class,
+ JournalMetadata::Tags *tags, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
+ async_op_tracker(async_op_tracker),
+ start_after_tag_tid(start_after_tag_tid), tag_class(tag_class),
+ tags(tags), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_GetTags() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_tag_list();
+ }
+
+ void send_tag_list() {
+ librados::ObjectReadOperation op;
+ client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
+ tag_class);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_GetTags, &C_GetTags::handle_tag_list>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_tag_list(int r) {
+ if (r == 0) {
+ std::set<cls::journal::Tag> journal_tags;
+ auto iter = out_bl.cbegin();
+ r = client::tag_list_finish(&iter, &journal_tags);
+ if (r == 0) {
+ for (auto &journal_tag : journal_tags) {
+ tags->push_back(journal_tag);
+ start_after_tag_tid = journal_tag.tid;
+ }
+
+ if (journal_tags.size() == MAX_RETURN) {
+ send_tag_list();
+ return;
+ }
+ }
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_FlushCommitPosition : public Context {
+ Context *commit_position_ctx;
+ Context *on_finish;
+
+ C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
+ : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
+ }
+ void finish(int r) override {
+ if (commit_position_ctx != nullptr) {
+ commit_position_ctx->complete(r);
+ }
+ on_finish->complete(r);
+ }
+};
+
+struct C_AssertActiveTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ std::string client_id;
+ uint64_t tag_tid;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx,
+ const std::string &oid, AsyncOpTracker &async_op_tracker,
+ const std::string &client_id, uint64_t tag_tid,
+ Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_AssertActiveTag() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::tag_list_start(&op, tag_tid, 2, client_id, boost::none);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, &utils::rados_state_callback<
+ C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_send(int r) {
+ ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl;
+
+ std::set<cls::journal::Tag> tags;
+ if (r == 0) {
+ auto it = out_bl.cbegin();
+ r = client::tag_list_finish(&it, &tags);
+ }
+
+ // NOTE: since 0 is treated as an uninitialized list filter, we need to
+ // load to entries and look at the last tid
+ if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) {
+ r = -ESTALE;
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+} // anonymous namespace
+
+JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
+ ceph::mutex *timer_lock, librados::IoCtx &ioctx,
+ const std::string &oid,
+ const std::string &client_id,
+ const Settings &settings)
+ : m_oid(oid),
+ m_client_id(client_id), m_settings(settings),
+ m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
+ m_watch_ctx(this)
+{
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+JournalMetadata::~JournalMetadata() {
+ std::lock_guard locker{m_lock};
+ ceph_assert(!m_initialized);
+}
+
+void JournalMetadata::init(Context *on_finish) {
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(!m_initialized);
+ m_initialized = true;
+ }
+
+ // chain the init sequence (reverse order)
+ on_finish = utils::create_async_context_callback(
+ this, on_finish);
+ on_finish = new C_ImmutableMetadata(this, on_finish);
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ if (r < 0) {
+ lderr(m_cct) << __func__ << ": failed to watch journal"
+ << cpp_strerror(r) << dendl;
+ std::lock_guard locker{m_lock};
+ m_watch_handle = 0;
+ on_finish->complete(r);
+ return;
+ }
+
+ get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
+ });
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::shut_down(Context *on_finish) {
+
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ uint64_t watch_handle = 0;
+ {
+ std::lock_guard locker{m_lock};
+ m_initialized = false;
+ std::swap(watch_handle, m_watch_handle);
+ }
+
+ // chain the shut down sequence (reverse order)
+ on_finish = utils::create_async_context_callback(
+ this, on_finish);
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
+ m_async_op_tracker.wait_for_ops(on_finish);
+ });
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
+ librados::Rados rados(m_ioctx);
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, utils::rados_ctx_callback);
+ r = rados.aio_watch_flush(comp);
+ ceph_assert(r == 0);
+ comp->release();
+ });
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ flush_commit_position(on_finish);
+ });
+ if (watch_handle != 0) {
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_unwatch(watch_handle, comp);
+ ceph_assert(r == 0);
+ comp->release();
+ } else {
+ on_finish->complete(0);
+ }
+}
+
+void JournalMetadata::get_immutable_metadata(uint8_t *order,
+ uint8_t *splay_width,
+ int64_t *pool_id,
+ Context *on_finish) {
+ client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id,
+ on_finish);
+}
+
+void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set,
+ uint64_t *active_set,
+ RegisteredClients *clients,
+ Context *on_finish) {
+ client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients,
+ on_finish);
+}
+
+void JournalMetadata::register_client(const bufferlist &data,
+ Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
+ librados::ObjectWriteOperation op;
+ client::client_register(&op, m_client_id, data);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::update_client(const bufferlist &data,
+ Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
+ librados::ObjectWriteOperation op;
+ client::client_update_data(&op, m_client_id, data);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::unregister_client(Context *on_finish) {
+ ceph_assert(!m_client_id.empty());
+
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
+ librados::ObjectWriteOperation op;
+ client::client_unregister(&op, m_client_id);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
+ Tag *tag, Context *on_finish) {
+ on_finish = new C_NotifyUpdate(this, on_finish);
+ C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
+ m_async_op_tracker, tag_class,
+ data, tag, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_client(const std::string &client_id,
+ cls::journal::Client *client,
+ Context *on_finish) {
+ C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker,
+ client_id, client, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
+ C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker,
+ tag_tid, tag, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
+ const boost::optional<uint64_t> &tag_class,
+ Tags *tags, Context *on_finish) {
+ C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
+ m_async_op_tracker, start_after_tag_tid,
+ tag_class, tags, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::add_listener(JournalMetadataListener *listener) {
+ std::unique_lock locker{m_lock};
+ m_update_cond.wait(locker, [this] {
+ return m_update_notifications <= 0;
+ });
+ m_listeners.push_back(listener);
+}
+
+void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
+ std::unique_lock locker{m_lock};
+ m_update_cond.wait(locker, [this] {
+ return m_update_notifications <= 0;
+ });
+ m_listeners.remove(listener);
+}
+
+void JournalMetadata::set_minimum_set(uint64_t object_set) {
+ std::lock_guard locker{m_lock};
+
+ ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
+ << ", new=" << object_set << dendl;
+ if (m_minimum_set >= object_set) {
+ return;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::set_minimum_set(&op, object_set);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+
+ m_minimum_set = object_set;
+}
+
+int JournalMetadata::set_active_set(uint64_t object_set) {
+ C_SaferCond ctx;
+ set_active_set(object_set, &ctx);
+ return ctx.wait();
+}
+
+void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
+ std::lock_guard locker{m_lock};
+
+ ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
+ << ", new=" << object_set << dendl;
+ if (m_active_set >= object_set) {
+ m_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::set_active_set(&op, object_set);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+
+ m_active_set = object_set;
+}
+
+void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
+ std::lock_guard locker{m_lock};
+
+ C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
+ m_async_op_tracker,
+ m_client_id, tag_tid,
+ on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::flush_commit_position() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ C_SaferCond ctx;
+ flush_commit_position(&ctx);
+ ctx.wait();
+}
+
+void JournalMetadata::flush_commit_position(Context *on_safe) {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ std::scoped_lock locker{*m_timer_lock, m_lock};
+ if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
+ // nothing to flush
+ if (on_safe != nullptr) {
+ m_work_queue->queue(on_safe, 0);
+ }
+ return;
+ }
+
+ if (on_safe != nullptr) {
+ m_flush_commit_position_ctxs.push_back(on_safe);
+ }
+ if (m_commit_position_ctx == nullptr) {
+ return;
+ }
+
+ cancel_commit_task();
+ handle_commit_position_task();
+}
+
+void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
+ std::lock_guard locker{m_lock};
+ uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
+ if (allocated_entry_tid <= entry_tid) {
+ allocated_entry_tid = entry_tid + 1;
+ }
+}
+
+bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
+ uint64_t *entry_tid) const {
+ std::lock_guard locker{m_lock};
+
+ AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
+ if (it == m_allocated_entry_tids.end()) {
+ return false;
+ }
+
+ ceph_assert(it->second > 0);
+ *entry_tid = it->second - 1;
+ return true;
+}
+
+void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
+ if (r < 0) {
+ lderr(m_cct) << "failed to initialize immutable metadata: "
+ << cpp_strerror(r) << dendl;
+ on_init->complete(r);
+ return;
+ }
+
+ ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
+ refresh(on_init);
+}
+
+void JournalMetadata::refresh(Context *on_complete) {
+ ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
+
+ {
+ std::lock_guard locker{m_lock};
+ if (on_complete != nullptr) {
+ m_refresh_ctxs.push_back(on_complete);
+ }
+ ++m_refreshes_in_progress;
+ }
+
+ auto refresh = new C_Refresh(this);
+ get_mutable_metadata(&refresh->minimum_set, &refresh->active_set,
+ &refresh->registered_clients, refresh);
+}
+
+void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
+ ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
+
+ m_lock.lock();
+ if (r == 0) {
+ Client client(m_client_id, bufferlist());
+ RegisteredClients::iterator it = refresh->registered_clients.find(client);
+ if (it != refresh->registered_clients.end()) {
+ if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
+ << dendl;
+ }
+ m_minimum_set = std::max(m_minimum_set, refresh->minimum_set);
+ m_active_set = std::max(m_active_set, refresh->active_set);
+ m_registered_clients = refresh->registered_clients;
+ m_client = *it;
+
+ ++m_update_notifications;
+ m_lock.unlock();
+ for (Listeners::iterator it = m_listeners.begin();
+ it != m_listeners.end(); ++it) {
+ (*it)->handle_update(this);
+ }
+ m_lock.lock();
+ if (--m_update_notifications == 0) {
+ m_update_cond.notify_all();
+ }
+ } else {
+ lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
+ r = -ENOENT;
+ }
+ }
+
+ Contexts refresh_ctxs;
+ ceph_assert(m_refreshes_in_progress > 0);
+ --m_refreshes_in_progress;
+ if (m_refreshes_in_progress == 0) {
+ std::swap(refresh_ctxs, m_refresh_ctxs);
+ }
+ m_lock.unlock();
+
+ for (auto ctx : refresh_ctxs) {
+ ctx->complete(r);
+ }
+}
+
+void JournalMetadata::cancel_commit_task() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_commit_position_ctx != nullptr);
+ ceph_assert(m_commit_position_task_ctx != nullptr);
+ m_timer->cancel_event(m_commit_position_task_ctx);
+ m_commit_position_task_ctx = NULL;
+}
+
+void JournalMetadata::schedule_commit_task() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_commit_position_ctx != nullptr);
+ if (m_commit_position_task_ctx == nullptr) {
+ m_commit_position_task_ctx =
+ m_timer->add_event_after(m_settings.commit_interval,
+ new C_CommitPositionTask(this));
+ }
+}
+
+void JournalMetadata::handle_commit_position_task() {
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ldout(m_cct, 20) << __func__ << ": "
+ << "client_id=" << m_client_id << ", "
+ << "commit_position=" << m_commit_position << dendl;
+
+ m_commit_position_task_ctx = nullptr;
+ Context* commit_position_ctx = nullptr;
+ std::swap(commit_position_ctx, m_commit_position_ctx);
+
+ m_async_op_tracker.start_op();
+ ++m_flush_commits_in_progress;
+
+ Context* ctx = new LambdaContext([this, commit_position_ctx](int r) {
+ Contexts flush_commit_position_ctxs;
+ m_lock.lock();
+ ceph_assert(m_flush_commits_in_progress > 0);
+ --m_flush_commits_in_progress;
+ if (m_flush_commits_in_progress == 0) {
+ std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
+ }
+ m_lock.unlock();
+
+ commit_position_ctx->complete(0);
+ for (auto ctx : flush_commit_position_ctxs) {
+ ctx->complete(0);
+ }
+ m_async_op_tracker.finish_op();
+ });
+ ctx = new C_NotifyUpdate(this, ctx);
+ ctx = new LambdaContext([this, ctx](int r) {
+ // manually kick of a refresh in case the notification is missed
+ // and ignore the next notification that we are about to send
+ m_lock.lock();
+ ++m_ignore_watch_notifies;
+ m_lock.unlock();
+
+ refresh(ctx);
+ });
+ ctx = new LambdaContext([this, ctx](int r) {
+ schedule_laggy_clients_disconnect(ctx);
+ });
+
+ librados::ObjectWriteOperation op;
+ client::client_commit(&op, m_client_id, m_commit_position);
+
+ auto comp = librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::schedule_watch_reset() {
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ m_timer->add_event_after(1, new C_WatchReset(this));
+}
+
+void JournalMetadata::handle_watch_reset() {
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ if (!m_initialized) {
+ return;
+ }
+
+ int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
+ } else if (r == -EBLOCKLISTED) {
+ ldout(m_cct, 5) << __func__ << ": client blocklisted" << dendl;
+ } else {
+ lderr(m_cct) << __func__ << ": failed to watch journal: "
+ << cpp_strerror(r) << dendl;
+ }
+ schedule_watch_reset();
+ } else {
+ ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
+ refresh(NULL);
+ }
+}
+
+void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
+ ldout(m_cct, 10) << "journal header updated" << dendl;
+
+ bufferlist bl;
+ m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
+
+ {
+ std::lock_guard locker{m_lock};
+ if (m_ignore_watch_notifies > 0) {
+ --m_ignore_watch_notifies;
+ return;
+ }
+ }
+
+ refresh(NULL);
+}
+
+void JournalMetadata::handle_watch_error(int err) {
+ if (err == -ENOTCONN) {
+ ldout(m_cct, 5) << "journal watch error: header removed" << dendl;
+ } else if (err == -EBLOCKLISTED) {
+ lderr(m_cct) << "journal watch error: client blocklisted" << dendl;
+ } else {
+ lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
+ }
+
+ std::scoped_lock locker{*m_timer_lock, m_lock};
+
+ // release old watch on error
+ if (m_watch_handle != 0) {
+ m_ioctx.unwatch2(m_watch_handle);
+ m_watch_handle = 0;
+ }
+
+ if (m_initialized && err != -ENOENT) {
+ schedule_watch_reset();
+ }
+}
+
+uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
+ uint64_t tag_tid,
+ uint64_t entry_tid) {
+ std::lock_guard locker{m_lock};
+ uint64_t commit_tid = ++m_commit_tid;
+ m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
+ entry_tid);
+
+ ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
+ << "object_num=" << object_num << ", "
+ << "tag_tid=" << tag_tid << ", "
+ << "entry_tid=" << entry_tid << "]"
+ << dendl;
+ return commit_tid;
+}
+
+void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
+ uint64_t object_num) {
+ std::lock_guard locker{m_lock};
+
+ auto it = m_pending_commit_tids.find(commit_tid);
+ ceph_assert(it != m_pending_commit_tids.end());
+ ceph_assert(it->second.object_num < object_num);
+
+ ldout(m_cct, 20) << __func__ << ": "
+ << "commit_tid=" << commit_tid << ", "
+ << "old_object_num=" << it->second.object_num << ", "
+ << "new_object_num=" << object_num << dendl;
+ it->second.object_num = object_num;
+}
+
+void JournalMetadata::get_commit_entry(uint64_t commit_tid,
+ uint64_t *object_num,
+ uint64_t *tag_tid, uint64_t *entry_tid) {
+ std::lock_guard locker{m_lock};
+
+ auto it = m_pending_commit_tids.find(commit_tid);
+ ceph_assert(it != m_pending_commit_tids.end());
+
+ *object_num = it->second.object_num;
+ *tag_tid = it->second.tag_tid;
+ *entry_tid = it->second.entry_tid;
+}
+
+void JournalMetadata::committed(uint64_t commit_tid,
+ const CreateContext &create_context) {
+ ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
+
+ ObjectSetPosition commit_position;
+ Context *stale_ctx = nullptr;
+ {
+ std::scoped_lock locker{*m_timer_lock, m_lock};
+ ceph_assert(commit_tid > m_commit_position_tid);
+
+ if (!m_commit_position.object_positions.empty()) {
+ // in-flight commit position update
+ commit_position = m_commit_position;
+ } else {
+ // safe commit position
+ commit_position = m_client.commit_position;
+ }
+
+ CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
+ ceph_assert(it != m_pending_commit_tids.end());
+
+ CommitEntry &commit_entry = it->second;
+ commit_entry.committed = true;
+
+ bool update_commit_position = false;
+ while (!m_pending_commit_tids.empty()) {
+ CommitTids::iterator it = m_pending_commit_tids.begin();
+ CommitEntry &commit_entry = it->second;
+ if (!commit_entry.committed) {
+ break;
+ }
+
+ commit_position.object_positions.emplace_front(
+ commit_entry.object_num, commit_entry.tag_tid,
+ commit_entry.entry_tid);
+ m_pending_commit_tids.erase(it);
+ update_commit_position = true;
+ }
+
+ if (!update_commit_position) {
+ return;
+ }
+
+ // prune the position to have one position per splay offset
+ std::set<uint8_t> in_use_splay_offsets;
+ ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
+ while (ob_it != commit_position.object_positions.end()) {
+ uint8_t splay_offset = ob_it->object_number % m_splay_width;
+ if (!in_use_splay_offsets.insert(splay_offset).second) {
+ ob_it = commit_position.object_positions.erase(ob_it);
+ } else {
+ ++ob_it;
+ }
+ }
+
+ stale_ctx = m_commit_position_ctx;
+ m_commit_position_ctx = create_context();
+ m_commit_position = commit_position;
+ m_commit_position_tid = commit_tid;
+
+ ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
+ << "on_safe=" << m_commit_position_ctx << dendl;
+ schedule_commit_task();
+ }
+
+
+ if (stale_ctx != nullptr) {
+ ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
+ << dendl;
+ stale_ctx->complete(-ESTALE);
+ }
+}
+
+void JournalMetadata::notify_update() {
+ ldout(m_cct, 10) << "notifying journal header update" << dendl;
+
+ bufferlist bl;
+ m_ioctx.notify2(m_oid, bl, 5000, NULL);
+}
+
+void JournalMetadata::async_notify_update(Context *on_safe) {
+ ldout(m_cct, 10) << "async notifying journal header update" << dendl;
+
+ C_AioNotify *ctx = new C_AioNotify(this, on_safe);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback);
+
+ bufferlist bl;
+ int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
+ ceph_assert(r == 0);
+
+ comp->release();
+}
+
+void JournalMetadata::wait_for_ops() {
+ C_SaferCond ctx;
+ m_async_op_tracker.wait_for_ops(&ctx);
+ ctx.wait();
+}
+
+void JournalMetadata::handle_notified(int r) {
+ ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
+}
+
+void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
+ ldout(m_cct, 20) << __func__ << dendl;
+ if (m_settings.max_concurrent_object_sets <= 0) {
+ on_finish->complete(0);
+ return;
+ }
+
+ Context *ctx = on_finish;
+ {
+ std::lock_guard locker{m_lock};
+ for (auto &c : m_registered_clients) {
+ if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+ c.id == m_client_id ||
+ m_settings.ignored_laggy_clients.count(c.id) > 0) {
+ continue;
+ }
+ const std::string &client_id = c.id;
+ uint64_t object_set = 0;
+ if (!c.commit_position.object_positions.empty()) {
+ auto &position = *(c.commit_position.object_positions.begin());
+ object_set = position.object_number / m_splay_width;
+ }
+
+ if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+ ldout(m_cct, 1) << __func__ << ": " << client_id
+ << ": scheduling disconnect" << dendl;
+
+ ctx = new LambdaContext([this, client_id, ctx](int r1) {
+ ldout(m_cct, 10) << __func__ << ": " << client_id
+ << ": flagging disconnected" << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::client_update_state(
+ &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);
+
+ auto comp = librados::Rados::aio_create_completion(
+ ctx, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+ });
+ }
+ }
+ }
+
+ if (ctx == on_finish) {
+ ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
+ }
+ ctx->complete(0);
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata::RegisteredClients &clients) {
+ os << "[";
+ for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin();
+ c != clients.end(); ++c) {
+ os << (c == clients.begin() ? "" : ", " ) << *c;
+ }
+ os << "]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata &jm) {
+ std::lock_guard locker{jm.m_lock};
+ os << "[oid=" << jm.m_oid << ", "
+ << "initialized=" << jm.m_initialized << ", "
+ << "order=" << (int)jm.m_order << ", "
+ << "splay_width=" << (int)jm.m_splay_width << ", "
+ << "pool_id=" << jm.m_pool_id << ", "
+ << "minimum_set=" << jm.m_minimum_set << ", "
+ << "active_set=" << jm.m_active_set << ", "
+ << "client_id=" << jm.m_client_id << ", "
+ << "commit_tid=" << jm.m_commit_tid << ", "
+ << "commit_interval=" << jm.m_settings.commit_interval << ", "
+ << "commit_position=" << jm.m_commit_position << ", "
+ << "registered_clients=" << jm.m_registered_clients << "]";
+ return os;
+}
+
+} // namespace journal