diff options
Diffstat (limited to '')
-rw-r--r-- | src/journal/Journaler.cc | 462 |
1 files changed, 462 insertions, 0 deletions
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc new file mode 100644 index 000000000..1838d633a --- /dev/null +++ b/src/journal/Journaler.cc @@ -0,0 +1,462 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Journaler.h" +#include "include/stringify.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "journal/Entry.h" +#include "journal/FutureImpl.h" +#include "journal/JournalMetadata.h" +#include "journal/JournalPlayer.h" +#include "journal/JournalRecorder.h" +#include "journal/JournalTrimmer.h" +#include "journal/ReplayEntry.h" +#include "journal/ReplayHandler.h" +#include "cls/journal/cls_journal_client.h" +#include "cls/journal/cls_journal_types.h" +#include "Utils.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "Journaler: " << this << " " + +namespace journal { + +namespace { + +static const std::string JOURNAL_HEADER_PREFIX = "journal."; +static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; + +} // anonymous namespace + +using namespace cls::journal; +using utils::rados_ctx_callback; + +std::string Journaler::header_oid(const std::string &journal_id) { + return JOURNAL_HEADER_PREFIX + journal_id; +} + +std::string Journaler::object_oid_prefix(int pool_id, + const std::string &journal_id) { + return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + "."; +} + +Journaler::Threads::Threads(CephContext *cct) { + thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1); + thread_pool->start(); + + work_queue = new ContextWQ("Journaler::work_queue", + ceph::make_timespan(60), + thread_pool); + + timer = new SafeTimer(cct, timer_lock, true); + timer->init(); +} + +Journaler::Threads::~Threads() { + { + std::lock_guard timer_locker{timer_lock}; + timer->shutdown(); + } + delete timer; + timer = nullptr; + + work_queue->drain(); + delete work_queue; + work_queue = nullptr; + + thread_pool->stop(); + delete thread_pool; + thread_pool = nullptr; +} + +Journaler::Journaler(librados::IoCtx &header_ioctx, + const std::string &journal_id, + const std::string &client_id, const Settings &settings, + CacheManagerHandler *cache_manager_handler) + : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))), + m_client_id(client_id), m_cache_manager_handler(cache_manager_handler) { + set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, + header_ioctx, journal_id, settings); +} + +Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer, + ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, + const std::string &journal_id, + const std::string &client_id, const Settings &settings, + CacheManagerHandler *cache_manager_handler) + : m_client_id(client_id), m_cache_manager_handler(cache_manager_handler) { + set_up(work_queue, timer, timer_lock, header_ioctx, journal_id, + settings); +} + +void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, + ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, + const std::string &journal_id, + const Settings &settings) { + m_header_ioctx.dup(header_ioctx); + m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct()); + + m_header_oid = header_oid(journal_id); + m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id); + + m_metadata = ceph::make_ref<JournalMetadata>(work_queue, timer, timer_lock, + m_header_ioctx, m_header_oid, m_client_id, + settings); +} + +Journaler::~Journaler() { + if (m_metadata != nullptr) { + ceph_assert(!m_metadata->is_initialized()); + if (!m_initialized) { + // never initialized -- ensure any in-flight ops are complete + // since we wouldn't expect shut_down to be invoked + m_metadata->wait_for_ops(); + } + m_metadata.reset(); + } + ceph_assert(m_trimmer == nullptr); + ceph_assert(m_player == nullptr); + ceph_assert(m_recorder == nullptr); + + delete m_threads; + m_threads = nullptr; +} + +void Journaler::exists(Context *on_finish) const { + librados::ObjectReadOperation op; + op.stat(nullptr, nullptr, nullptr); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(on_finish, rados_ctx_callback); + int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, nullptr); + ceph_assert(r == 0); + comp->release(); +} + +void Journaler::init(Context *on_init) { + m_initialized = true; + m_metadata->init(new C_InitJournaler(this, on_init)); +} + +int Journaler::init_complete() { + int64_t pool_id = m_metadata->get_pool_id(); + + if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) { + ldout(m_cct, 20) << "using image pool for journal data" << dendl; + m_data_ioctx.dup(m_header_ioctx); + } else { + ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data" + << dendl; + librados::Rados rados(m_header_ioctx); + int r = rados.ioctx_create2(pool_id, m_data_ioctx); + if (r < 0) { + if (r == -ENOENT) { + ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists" + << dendl; + } + return r; + } + } + m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, + m_metadata); + return 0; +} + +void Journaler::shut_down() { + C_SaferCond ctx; + shut_down(&ctx); + ctx.wait(); +} + +void Journaler::shut_down(Context *on_finish) { + ceph_assert(m_player == nullptr); + ceph_assert(m_recorder == nullptr); + + auto metadata = std::move(m_metadata); + ceph_assert(metadata); + + on_finish = new LambdaContext([metadata, on_finish](int r) { + on_finish->complete(0); + }); + + JournalTrimmer *trimmer = nullptr; + std::swap(trimmer, m_trimmer); + if (!trimmer) { + metadata->shut_down(on_finish); + return; + } + + on_finish = new LambdaContext([trimmer, metadata, on_finish](int r) { + delete trimmer; + metadata->shut_down(on_finish); + }); + trimmer->shut_down(on_finish); +} + +bool Journaler::is_initialized() const { + return m_metadata->is_initialized(); +} + +void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish) { + m_metadata->get_immutable_metadata(order, splay_width, pool_id, on_finish); +} + +void Journaler::get_mutable_metadata(uint64_t *minimum_set, + uint64_t *active_set, + RegisteredClients *clients, + Context *on_finish) { + m_metadata->get_mutable_metadata(minimum_set, active_set, clients, on_finish); +} + +void Journaler::create(uint8_t order, uint8_t splay_width, + int64_t pool_id, Context *on_finish) { + if (order > 26 || order < 12) { + lderr(m_cct) << "order must be in the range [12, 26]" << dendl; + on_finish->complete(-EDOM); + return; + } + if (splay_width == 0) { + on_finish->complete(-EINVAL); + return; + } + + ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl; + + librados::ObjectWriteOperation op; + client::create(&op, order, splay_width, pool_id); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(on_finish, rados_ctx_callback); + int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void Journaler::remove(bool force, Context *on_finish) { + // chain journal removal (reverse order) + on_finish = new LambdaContext([this, on_finish](int r) { + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + r = m_header_ioctx.aio_remove(m_header_oid, comp); + ceph_assert(r == 0); + comp->release(); + }); + + on_finish = new LambdaContext([this, force, on_finish](int r) { + m_trimmer->remove_objects(force, on_finish); + }); + + m_metadata->shut_down(on_finish); +} + +void Journaler::flush_commit_position(Context *on_safe) { + m_metadata->flush_commit_position(on_safe); +} + +void Journaler::add_listener(JournalMetadataListener *listener) { + m_metadata->add_listener(listener); +} + +void Journaler::remove_listener(JournalMetadataListener *listener) { + m_metadata->remove_listener(listener); +} + +int Journaler::register_client(const bufferlist &data) { + C_SaferCond cond; + register_client(data, &cond); + return cond.wait(); +} + +int Journaler::unregister_client() { + C_SaferCond cond; + unregister_client(&cond); + return cond.wait(); +} + +void Journaler::register_client(const bufferlist &data, Context *on_finish) { + return m_metadata->register_client(data, on_finish); +} + +void Journaler::update_client(const bufferlist &data, Context *on_finish) { + return m_metadata->update_client(data, on_finish); +} + +void Journaler::unregister_client(Context *on_finish) { + return m_metadata->unregister_client(on_finish); +} + +void Journaler::get_client(const std::string &client_id, + cls::journal::Client *client, + Context *on_finish) { + m_metadata->get_client(client_id, client, on_finish); +} + +int Journaler::get_cached_client(const std::string &client_id, + cls::journal::Client *client) { + RegisteredClients clients; + m_metadata->get_registered_clients(&clients); + + auto it = clients.find({client_id, {}}); + if (it == clients.end()) { + return -ENOENT; + } + + *client = *it; + return 0; +} + +void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag, + Context *on_finish) { + m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag, + on_finish); +} + +void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data, + cls::journal::Tag *tag, Context *on_finish) { + m_metadata->allocate_tag(tag_class, data, tag, on_finish); +} + +void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { + m_metadata->get_tag(tag_tid, tag, on_finish); +} + +void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) { + m_metadata->get_tags(0, tag_class, tags, on_finish); +} + +void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, + Tags *tags, Context *on_finish) { + m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish); +} + +void Journaler::start_replay(ReplayHandler* replay_handler) { + create_player(replay_handler); + m_player->prefetch(); +} + +void Journaler::start_live_replay(ReplayHandler* replay_handler, + double interval) { + create_player(replay_handler); + m_player->prefetch_and_watch(interval); +} + +bool Journaler::try_pop_front(ReplayEntry *replay_entry, + uint64_t *tag_tid) { + ceph_assert(m_player != nullptr); + + Entry entry; + uint64_t commit_tid; + if (!m_player->try_pop_front(&entry, &commit_tid)) { + return false; + } + + *replay_entry = ReplayEntry(entry.get_data(), commit_tid); + if (tag_tid != nullptr) { + *tag_tid = entry.get_tag_tid(); + } + return true; +} + +void Journaler::stop_replay() { + C_SaferCond ctx; + stop_replay(&ctx); + ctx.wait(); +} + +void Journaler::stop_replay(Context *on_finish) { + auto player = std::move(m_player); + auto* playerp = player.get(); + + auto f = [player=std::move(player), on_finish](int r) { + on_finish->complete(r); + }; + on_finish = new LambdaContext(std::move(f)); + playerp->shut_down(on_finish); +} + +void Journaler::committed(const ReplayEntry &replay_entry) { + m_trimmer->committed(replay_entry.get_commit_tid()); +} + +void Journaler::committed(const Future &future) { + auto& future_impl = future.get_future_impl(); + m_trimmer->committed(future_impl->get_commit_tid()); +} + +void Journaler::start_append(uint64_t max_in_flight_appends) { + ceph_assert(m_recorder == nullptr); + + // TODO verify active object set >= current replay object set + + m_recorder = std::make_unique<JournalRecorder>(m_data_ioctx, m_object_oid_prefix, + m_metadata, max_in_flight_appends); +} + +void Journaler::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ceph_assert(m_recorder != nullptr); + m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age); +} + +void Journaler::stop_append(Context *on_safe) { + auto recorder = std::move(m_recorder); + ceph_assert(recorder); + + auto* recorderp = recorder.get(); + on_safe = new LambdaContext([recorder=std::move(recorder), on_safe](int r) { + on_safe->complete(r); + }); + recorderp->shut_down(on_safe); +} + +uint64_t Journaler::get_max_append_size() const { + uint64_t max_payload_size = m_metadata->get_object_size() - + Entry::get_fixed_size(); + if (m_metadata->get_settings().max_payload_bytes > 0) { + max_payload_size = std::min(max_payload_size, + m_metadata->get_settings().max_payload_bytes); + } + return max_payload_size; +} + +Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) { + return m_recorder->append(tag_tid, payload_bl); +} + +void Journaler::flush_append(Context *on_safe) { + m_recorder->flush(on_safe); +} + +void Journaler::create_player(ReplayHandler* replay_handler) { + ceph_assert(m_player == nullptr); + m_player = std::make_unique<JournalPlayer>(m_data_ioctx, m_object_oid_prefix, m_metadata, + replay_handler, m_cache_manager_handler); +} + +void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id) { + ceph_assert(m_metadata != nullptr); + + *order = m_metadata->get_order(); + *splay_width = m_metadata->get_splay_width(); + *pool_id = m_metadata->get_pool_id(); +} + +std::ostream &operator<<(std::ostream &os, + const Journaler &journaler) { + os << "[metadata="; + if (journaler.m_metadata) { + os << *journaler.m_metadata; + } else { + os << "NULL"; + } + os << "]"; + return os; +} + +} // namespace journal |