diff options
Diffstat (limited to 'src/journal/JournalMetadata.cc')
-rw-r--r-- | src/journal/JournalMetadata.cc | 1165 |
1 files changed, 1165 insertions, 0 deletions
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc new file mode 100644 index 000000000..bf04d4e1c --- /dev/null +++ b/src/journal/JournalMetadata.cc @@ -0,0 +1,1165 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalMetadata.h" +#include "journal/Utils.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "cls/journal/cls_journal_client.h" +#include <functional> +#include <set> + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalMetadata: " << this << " " + +namespace journal { + +using namespace cls::journal; + +namespace { + +struct C_GetClient : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + std::string client_id; + cls::journal::Client *client; + Context *on_finish; + + bufferlist out_bl; + + C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + AsyncOpTracker &async_op_tracker, const std::string &client_id, + cls::journal::Client *client, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + client_id(client_id), client(client), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_GetClient() override { + async_op_tracker.finish_op(); + } + + virtual void send() { + send_get_client(); + } + + void send_get_client() { + ldout(cct, 20) << "C_GetClient: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_client_start(&op, client_id); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_GetClient, &C_GetClient::handle_get_client>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_client(int r) { + ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + auto it = out_bl.cbegin(); + r = client::get_client_finish(&it, client); + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_AllocateTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + uint64_t tag_class; + Tag *tag; + Context *on_finish; + + bufferlist out_bl; + + C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx, + const std::string &oid, AsyncOpTracker &async_op_tracker, + uint64_t tag_class, const bufferlist &data, Tag *tag, + Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + tag_class(tag_class), tag(tag), on_finish(on_finish) { + async_op_tracker.start_op(); + tag->data = data; + } + ~C_AllocateTag() override { + async_op_tracker.finish_op(); + } + + void send() { + send_get_next_tag_tid(); + } + + void send_get_next_tag_tid() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_next_tag_tid_start(&op); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_next_tag_tid(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + auto iter = out_bl.cbegin(); + r = client::get_next_tag_tid_finish(&iter, &tag->tid); + } + if (r < 0) { + complete(r); + return; + } + send_tag_create(); + } + + void send_tag_create() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectWriteOperation op; + client::tag_create(&op, tag->tid, tag_class, tag->data); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_tag_create>); + + int r = ioctx.aio_operate(oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + } + + void handle_tag_create(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == -ESTALE) { + send_get_next_tag_tid(); + return; + } else if (r < 0) { + complete(r); + return; + } + + send_get_tag(); + } + + void send_get_tag() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_tag_start(&op, tag->tid); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_get_tag>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_tag(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + auto iter = out_bl.cbegin(); + + cls::journal::Tag journal_tag; + r = client::get_tag_finish(&iter, &journal_tag); + if (r == 0) { + *tag = journal_tag; + } + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_GetTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + uint64_t tag_tid; + JournalMetadata::Tag *tag; + Context *on_finish; + + bufferlist out_bl; + + C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + AsyncOpTracker &async_op_tracker, uint64_t tag_tid, + JournalMetadata::Tag *tag, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + tag_tid(tag_tid), tag(tag), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_GetTag() override { + async_op_tracker.finish_op(); + } + + void send() { + send_get_tag(); + } + + void send_get_tag() { + librados::ObjectReadOperation op; + client::get_tag_start(&op, tag_tid); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_GetTag, &C_GetTag::handle_get_tag>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_tag(int r) { + if (r == 0) { + auto iter = out_bl.cbegin(); + r = client::get_tag_finish(&iter, tag); + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_GetTags : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + const std::string &client_id; + AsyncOpTracker &async_op_tracker; + uint64_t start_after_tag_tid; + boost::optional<uint64_t> tag_class; + JournalMetadata::Tags *tags; + Context *on_finish; + + const uint64_t MAX_RETURN = 64; + bufferlist out_bl; + + C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, AsyncOpTracker &async_op_tracker, + uint64_t start_after_tag_tid, + const boost::optional<uint64_t> &tag_class, + JournalMetadata::Tags *tags, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), + async_op_tracker(async_op_tracker), + start_after_tag_tid(start_after_tag_tid), tag_class(tag_class), + tags(tags), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_GetTags() override { + async_op_tracker.finish_op(); + } + + void send() { + send_tag_list(); + } + + void send_tag_list() { + librados::ObjectReadOperation op; + client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id, + tag_class); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_GetTags, &C_GetTags::handle_tag_list>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_tag_list(int r) { + if (r == 0) { + std::set<cls::journal::Tag> journal_tags; + auto iter = out_bl.cbegin(); + r = client::tag_list_finish(&iter, &journal_tags); + if (r == 0) { + for (auto &journal_tag : journal_tags) { + tags->push_back(journal_tag); + start_after_tag_tid = journal_tag.tid; + } + + if (journal_tags.size() == MAX_RETURN) { + send_tag_list(); + return; + } + } + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_FlushCommitPosition : public Context { + Context *commit_position_ctx; + Context *on_finish; + + C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish) + : commit_position_ctx(commit_position_ctx), on_finish(on_finish) { + } + void finish(int r) override { + if (commit_position_ctx != nullptr) { + commit_position_ctx->complete(r); + } + on_finish->complete(r); + } +}; + +struct C_AssertActiveTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + std::string client_id; + uint64_t tag_tid; + Context *on_finish; + + bufferlist out_bl; + + C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx, + const std::string &oid, AsyncOpTracker &async_op_tracker, + const std::string &client_id, uint64_t tag_tid, + Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_AssertActiveTag() override { + async_op_tracker.finish_op(); + } + + void send() { + ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::tag_list_start(&op, tag_tid, 2, client_id, boost::none); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AssertActiveTag, &C_AssertActiveTag::handle_send>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_send(int r) { + ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl; + + std::set<cls::journal::Tag> tags; + if (r == 0) { + auto it = out_bl.cbegin(); + r = client::tag_list_finish(&it, &tags); + } + + // NOTE: since 0 is treated as an uninitialized list filter, we need to + // load to entries and look at the last tid + if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) { + r = -ESTALE; + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +} // anonymous namespace + +JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, + ceph::mutex *timer_lock, librados::IoCtx &ioctx, + const std::string &oid, + const std::string &client_id, + const Settings &settings) + : m_oid(oid), + m_client_id(client_id), m_settings(settings), + m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), + m_watch_ctx(this) +{ + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); +} + +JournalMetadata::~JournalMetadata() { + std::lock_guard locker{m_lock}; + ceph_assert(!m_initialized); +} + +void JournalMetadata::init(Context *on_finish) { + { + std::lock_guard locker{m_lock}; + ceph_assert(!m_initialized); + m_initialized = true; + } + + // chain the init sequence (reverse order) + on_finish = utils::create_async_context_callback( + this, on_finish); + on_finish = new C_ImmutableMetadata(this, on_finish); + on_finish = new LambdaContext([this, on_finish](int r) { + if (r < 0) { + lderr(m_cct) << __func__ << ": failed to watch journal" + << cpp_strerror(r) << dendl; + std::lock_guard locker{m_lock}; + m_watch_handle = 0; + on_finish->complete(r); + return; + } + + get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish); + }); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::shut_down(Context *on_finish) { + + ldout(m_cct, 20) << __func__ << dendl; + + uint64_t watch_handle = 0; + { + std::lock_guard locker{m_lock}; + m_initialized = false; + std::swap(watch_handle, m_watch_handle); + } + + // chain the shut down sequence (reverse order) + on_finish = utils::create_async_context_callback( + this, on_finish); + on_finish = new LambdaContext([this, on_finish](int r) { + ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl; + m_async_op_tracker.wait_for_ops(on_finish); + }); + on_finish = new LambdaContext([this, on_finish](int r) { + ldout(m_cct, 20) << "shut_down: flushing watch" << dendl; + librados::Rados rados(m_ioctx); + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + r = rados.aio_watch_flush(comp); + ceph_assert(r == 0); + comp->release(); + }); + on_finish = new LambdaContext([this, on_finish](int r) { + flush_commit_position(on_finish); + }); + if (watch_handle != 0) { + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + int r = m_ioctx.aio_unwatch(watch_handle, comp); + ceph_assert(r == 0); + comp->release(); + } else { + on_finish->complete(0); + } +} + +void JournalMetadata::get_immutable_metadata(uint8_t *order, + uint8_t *splay_width, + int64_t *pool_id, + Context *on_finish) { + client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id, + on_finish); +} + +void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set, + uint64_t *active_set, + RegisteredClients *clients, + Context *on_finish) { + client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients, + on_finish); +} + +void JournalMetadata::register_client(const bufferlist &data, + Context *on_finish) { + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; + librados::ObjectWriteOperation op; + client::client_register(&op, m_client_id, data); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, + utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::update_client(const bufferlist &data, + Context *on_finish) { + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; + librados::ObjectWriteOperation op; + client::client_update_data(&op, m_client_id, data); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::unregister_client(Context *on_finish) { + ceph_assert(!m_client_id.empty()); + + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; + librados::ObjectWriteOperation op; + client::client_unregister(&op, m_client_id); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data, + Tag *tag, Context *on_finish) { + on_finish = new C_NotifyUpdate(this, on_finish); + C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid, + m_async_op_tracker, tag_class, + data, tag, on_finish); + ctx->send(); +} + +void JournalMetadata::get_client(const std::string &client_id, + cls::journal::Client *client, + Context *on_finish) { + C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker, + client_id, client, on_finish); + ctx->send(); +} + +void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { + C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker, + tag_tid, tag, on_finish); + ctx->send(); +} + +void JournalMetadata::get_tags(uint64_t start_after_tag_tid, + const boost::optional<uint64_t> &tag_class, + Tags *tags, Context *on_finish) { + C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, + m_async_op_tracker, start_after_tag_tid, + tag_class, tags, on_finish); + ctx->send(); +} + +void JournalMetadata::add_listener(JournalMetadataListener *listener) { + std::unique_lock locker{m_lock}; + m_update_cond.wait(locker, [this] { + return m_update_notifications <= 0; + }); + m_listeners.push_back(listener); +} + +void JournalMetadata::remove_listener(JournalMetadataListener *listener) { + std::unique_lock locker{m_lock}; + m_update_cond.wait(locker, [this] { + return m_update_notifications <= 0; + }); + m_listeners.remove(listener); +} + +void JournalMetadata::set_minimum_set(uint64_t object_set) { + std::lock_guard locker{m_lock}; + + ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set + << ", new=" << object_set << dendl; + if (m_minimum_set >= object_set) { + return; + } + + librados::ObjectWriteOperation op; + client::set_minimum_set(&op, object_set); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + + m_minimum_set = object_set; +} + +int JournalMetadata::set_active_set(uint64_t object_set) { + C_SaferCond ctx; + set_active_set(object_set, &ctx); + return ctx.wait(); +} + +void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { + std::lock_guard locker{m_lock}; + + ldout(m_cct, 20) << __func__ << ": current=" << m_active_set + << ", new=" << object_set << dendl; + if (m_active_set >= object_set) { + m_work_queue->queue(on_finish, 0); + return; + } + + librados::ObjectWriteOperation op; + client::set_active_set(&op, object_set); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + + m_active_set = object_set; +} + +void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { + std::lock_guard locker{m_lock}; + + C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, + m_async_op_tracker, + m_client_id, tag_tid, + on_finish); + ctx->send(); +} + +void JournalMetadata::flush_commit_position() { + ldout(m_cct, 20) << __func__ << dendl; + + C_SaferCond ctx; + flush_commit_position(&ctx); + ctx.wait(); +} + +void JournalMetadata::flush_commit_position(Context *on_safe) { + ldout(m_cct, 20) << __func__ << dendl; + + std::scoped_lock locker{*m_timer_lock, m_lock}; + if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) { + // nothing to flush + if (on_safe != nullptr) { + m_work_queue->queue(on_safe, 0); + } + return; + } + + if (on_safe != nullptr) { + m_flush_commit_position_ctxs.push_back(on_safe); + } + if (m_commit_position_ctx == nullptr) { + return; + } + + cancel_commit_task(); + handle_commit_position_task(); +} + +void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { + std::lock_guard locker{m_lock}; + uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; + if (allocated_entry_tid <= entry_tid) { + allocated_entry_tid = entry_tid + 1; + } +} + +bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid, + uint64_t *entry_tid) const { + std::lock_guard locker{m_lock}; + + AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid); + if (it == m_allocated_entry_tids.end()) { + return false; + } + + ceph_assert(it->second > 0); + *entry_tid = it->second - 1; + return true; +} + +void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) { + if (r < 0) { + lderr(m_cct) << "failed to initialize immutable metadata: " + << cpp_strerror(r) << dendl; + on_init->complete(r); + return; + } + + ldout(m_cct, 10) << "initialized immutable metadata" << dendl; + refresh(on_init); +} + +void JournalMetadata::refresh(Context *on_complete) { + ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; + + { + std::lock_guard locker{m_lock}; + if (on_complete != nullptr) { + m_refresh_ctxs.push_back(on_complete); + } + ++m_refreshes_in_progress; + } + + auto refresh = new C_Refresh(this); + get_mutable_metadata(&refresh->minimum_set, &refresh->active_set, + &refresh->registered_clients, refresh); +} + +void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { + ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; + + m_lock.lock(); + if (r == 0) { + Client client(m_client_id, bufferlist()); + RegisteredClients::iterator it = refresh->registered_clients.find(client); + if (it != refresh->registered_clients.end()) { + if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { + ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id + << dendl; + } + m_minimum_set = std::max(m_minimum_set, refresh->minimum_set); + m_active_set = std::max(m_active_set, refresh->active_set); + m_registered_clients = refresh->registered_clients; + m_client = *it; + + ++m_update_notifications; + m_lock.unlock(); + for (Listeners::iterator it = m_listeners.begin(); + it != m_listeners.end(); ++it) { + (*it)->handle_update(this); + } + m_lock.lock(); + if (--m_update_notifications == 0) { + m_update_cond.notify_all(); + } + } else { + lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; + r = -ENOENT; + } + } + + Contexts refresh_ctxs; + ceph_assert(m_refreshes_in_progress > 0); + --m_refreshes_in_progress; + if (m_refreshes_in_progress == 0) { + std::swap(refresh_ctxs, m_refresh_ctxs); + } + m_lock.unlock(); + + for (auto ctx : refresh_ctxs) { + ctx->complete(r); + } +} + +void JournalMetadata::cancel_commit_task() { + ldout(m_cct, 20) << __func__ << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_commit_position_ctx != nullptr); + ceph_assert(m_commit_position_task_ctx != nullptr); + m_timer->cancel_event(m_commit_position_task_ctx); + m_commit_position_task_ctx = NULL; +} + +void JournalMetadata::schedule_commit_task() { + ldout(m_cct, 20) << __func__ << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_commit_position_ctx != nullptr); + if (m_commit_position_task_ctx == nullptr) { + m_commit_position_task_ctx = + m_timer->add_event_after(m_settings.commit_interval, + new C_CommitPositionTask(this)); + } +} + +void JournalMetadata::handle_commit_position_task() { + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ldout(m_cct, 20) << __func__ << ": " + << "client_id=" << m_client_id << ", " + << "commit_position=" << m_commit_position << dendl; + + m_commit_position_task_ctx = nullptr; + Context* commit_position_ctx = nullptr; + std::swap(commit_position_ctx, m_commit_position_ctx); + + m_async_op_tracker.start_op(); + ++m_flush_commits_in_progress; + + Context* ctx = new LambdaContext([this, commit_position_ctx](int r) { + Contexts flush_commit_position_ctxs; + m_lock.lock(); + ceph_assert(m_flush_commits_in_progress > 0); + --m_flush_commits_in_progress; + if (m_flush_commits_in_progress == 0) { + std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs); + } + m_lock.unlock(); + + commit_position_ctx->complete(0); + for (auto ctx : flush_commit_position_ctxs) { + ctx->complete(0); + } + m_async_op_tracker.finish_op(); + }); + ctx = new C_NotifyUpdate(this, ctx); + ctx = new LambdaContext([this, ctx](int r) { + // manually kick of a refresh in case the notification is missed + // and ignore the next notification that we are about to send + m_lock.lock(); + ++m_ignore_watch_notifies; + m_lock.unlock(); + + refresh(ctx); + }); + ctx = new LambdaContext([this, ctx](int r) { + schedule_laggy_clients_disconnect(ctx); + }); + + librados::ObjectWriteOperation op; + client::client_commit(&op, m_client_id, m_commit_position); + + auto comp = librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::schedule_watch_reset() { + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + m_timer->add_event_after(1, new C_WatchReset(this)); +} + +void JournalMetadata::handle_watch_reset() { + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + if (!m_initialized) { + return; + } + + int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); + if (r < 0) { + if (r == -ENOENT) { + ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; + } else if (r == -EBLOCKLISTED) { + ldout(m_cct, 5) << __func__ << ": client blocklisted" << dendl; + } else { + lderr(m_cct) << __func__ << ": failed to watch journal: " + << cpp_strerror(r) << dendl; + } + schedule_watch_reset(); + } else { + ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl; + refresh(NULL); + } +} + +void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { + ldout(m_cct, 10) << "journal header updated" << dendl; + + bufferlist bl; + m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); + + { + std::lock_guard locker{m_lock}; + if (m_ignore_watch_notifies > 0) { + --m_ignore_watch_notifies; + return; + } + } + + refresh(NULL); +} + +void JournalMetadata::handle_watch_error(int err) { + if (err == -ENOTCONN) { + ldout(m_cct, 5) << "journal watch error: header removed" << dendl; + } else if (err == -EBLOCKLISTED) { + lderr(m_cct) << "journal watch error: client blocklisted" << dendl; + } else { + lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; + } + + std::scoped_lock locker{*m_timer_lock, m_lock}; + + // release old watch on error + if (m_watch_handle != 0) { + m_ioctx.unwatch2(m_watch_handle); + m_watch_handle = 0; + } + + if (m_initialized && err != -ENOENT) { + schedule_watch_reset(); + } +} + +uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, + uint64_t tag_tid, + uint64_t entry_tid) { + std::lock_guard locker{m_lock}; + uint64_t commit_tid = ++m_commit_tid; + m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid, + entry_tid); + + ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" + << "object_num=" << object_num << ", " + << "tag_tid=" << tag_tid << ", " + << "entry_tid=" << entry_tid << "]" + << dendl; + return commit_tid; +} + +void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, + uint64_t object_num) { + std::lock_guard locker{m_lock}; + + auto it = m_pending_commit_tids.find(commit_tid); + ceph_assert(it != m_pending_commit_tids.end()); + ceph_assert(it->second.object_num < object_num); + + ldout(m_cct, 20) << __func__ << ": " + << "commit_tid=" << commit_tid << ", " + << "old_object_num=" << it->second.object_num << ", " + << "new_object_num=" << object_num << dendl; + it->second.object_num = object_num; +} + +void JournalMetadata::get_commit_entry(uint64_t commit_tid, + uint64_t *object_num, + uint64_t *tag_tid, uint64_t *entry_tid) { + std::lock_guard locker{m_lock}; + + auto it = m_pending_commit_tids.find(commit_tid); + ceph_assert(it != m_pending_commit_tids.end()); + + *object_num = it->second.object_num; + *tag_tid = it->second.tag_tid; + *entry_tid = it->second.entry_tid; +} + +void JournalMetadata::committed(uint64_t commit_tid, + const CreateContext &create_context) { + ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; + + ObjectSetPosition commit_position; + Context *stale_ctx = nullptr; + { + std::scoped_lock locker{*m_timer_lock, m_lock}; + ceph_assert(commit_tid > m_commit_position_tid); + + if (!m_commit_position.object_positions.empty()) { + // in-flight commit position update + commit_position = m_commit_position; + } else { + // safe commit position + commit_position = m_client.commit_position; + } + + CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); + ceph_assert(it != m_pending_commit_tids.end()); + + CommitEntry &commit_entry = it->second; + commit_entry.committed = true; + + bool update_commit_position = false; + while (!m_pending_commit_tids.empty()) { + CommitTids::iterator it = m_pending_commit_tids.begin(); + CommitEntry &commit_entry = it->second; + if (!commit_entry.committed) { + break; + } + + commit_position.object_positions.emplace_front( + commit_entry.object_num, commit_entry.tag_tid, + commit_entry.entry_tid); + m_pending_commit_tids.erase(it); + update_commit_position = true; + } + + if (!update_commit_position) { + return; + } + + // prune the position to have one position per splay offset + std::set<uint8_t> in_use_splay_offsets; + ObjectPositions::iterator ob_it = commit_position.object_positions.begin(); + while (ob_it != commit_position.object_positions.end()) { + uint8_t splay_offset = ob_it->object_number % m_splay_width; + if (!in_use_splay_offsets.insert(splay_offset).second) { + ob_it = commit_position.object_positions.erase(ob_it); + } else { + ++ob_it; + } + } + + stale_ctx = m_commit_position_ctx; + m_commit_position_ctx = create_context(); + m_commit_position = commit_position; + m_commit_position_tid = commit_tid; + + ldout(m_cct, 20) << "updated commit position: " << commit_position << ", " + << "on_safe=" << m_commit_position_ctx << dendl; + schedule_commit_task(); + } + + + if (stale_ctx != nullptr) { + ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx + << dendl; + stale_ctx->complete(-ESTALE); + } +} + +void JournalMetadata::notify_update() { + ldout(m_cct, 10) << "notifying journal header update" << dendl; + + bufferlist bl; + m_ioctx.notify2(m_oid, bl, 5000, NULL); +} + +void JournalMetadata::async_notify_update(Context *on_safe) { + ldout(m_cct, 10) << "async notifying journal header update" << dendl; + + C_AioNotify *ctx = new C_AioNotify(this, on_safe); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + + bufferlist bl; + int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL); + ceph_assert(r == 0); + + comp->release(); +} + +void JournalMetadata::wait_for_ops() { + C_SaferCond ctx; + m_async_op_tracker.wait_for_ops(&ctx); + ctx.wait(); +} + +void JournalMetadata::handle_notified(int r) { + ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; +} + +void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + if (m_settings.max_concurrent_object_sets <= 0) { + on_finish->complete(0); + return; + } + + Context *ctx = on_finish; + { + std::lock_guard locker{m_lock}; + for (auto &c : m_registered_clients) { + if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || + c.id == m_client_id || + m_settings.ignored_laggy_clients.count(c.id) > 0) { + continue; + } + const std::string &client_id = c.id; + uint64_t object_set = 0; + if (!c.commit_position.object_positions.empty()) { + auto &position = *(c.commit_position.object_positions.begin()); + object_set = position.object_number / m_splay_width; + } + + if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { + ldout(m_cct, 1) << __func__ << ": " << client_id + << ": scheduling disconnect" << dendl; + + ctx = new LambdaContext([this, client_id, ctx](int r1) { + ldout(m_cct, 10) << __func__ << ": " << client_id + << ": flagging disconnected" << dendl; + + librados::ObjectWriteOperation op; + client::client_update_state( + &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED); + + auto comp = librados::Rados::aio_create_completion( + ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + }); + } + } + } + + if (ctx == on_finish) { + ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; + } + ctx->complete(0); +} + +std::ostream &operator<<(std::ostream &os, + const JournalMetadata::RegisteredClients &clients) { + os << "["; + for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin(); + c != clients.end(); ++c) { + os << (c == clients.begin() ? "" : ", " ) << *c; + } + os << "]"; + return os; +} + +std::ostream &operator<<(std::ostream &os, + const JournalMetadata &jm) { + std::lock_guard locker{jm.m_lock}; + os << "[oid=" << jm.m_oid << ", " + << "initialized=" << jm.m_initialized << ", " + << "order=" << (int)jm.m_order << ", " + << "splay_width=" << (int)jm.m_splay_width << ", " + << "pool_id=" << jm.m_pool_id << ", " + << "minimum_set=" << jm.m_minimum_set << ", " + << "active_set=" << jm.m_active_set << ", " + << "client_id=" << jm.m_client_id << ", " + << "commit_tid=" << jm.m_commit_tid << ", " + << "commit_interval=" << jm.m_settings.commit_interval << ", " + << "commit_position=" << jm.m_commit_position << ", " + << "registered_clients=" << jm.m_registered_clients << "]"; + return os; +} + +} // namespace journal |