diff options
Diffstat (limited to '')
28 files changed, 6031 insertions, 0 deletions
diff --git a/src/journal/CMakeLists.txt b/src/journal/CMakeLists.txt new file mode 100644 index 000000000..3632c1051 --- /dev/null +++ b/src/journal/CMakeLists.txt @@ -0,0 +1,14 @@ +set(journal_srcs + Entry.cc + Future.cc + FutureImpl.cc + Journaler.cc + JournalMetadata.cc + JournalPlayer.cc + JournalRecorder.cc + JournalTrimmer.cc + ObjectPlayer.cc + ObjectRecorder.cc + Utils.cc) +add_library(journal STATIC ${journal_srcs}) +target_link_libraries(journal cls_journal_client) diff --git a/src/journal/Entry.cc b/src/journal/Entry.cc new file mode 100644 index 000000000..9879a9aad --- /dev/null +++ b/src/journal/Entry.cc @@ -0,0 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Entry.h" +#include "include/encoding.h" +#include "include/stringify.h" +#include "common/Formatter.h" +#include <strstream> + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "Entry: " << this << " " + +namespace journal { + +namespace { + +const uint32_t HEADER_FIXED_SIZE = 25; /// preamble, version, entry tid, tag id +const uint32_t REMAINDER_FIXED_SIZE = 8; /// data size, crc + +} // anonymous namespace + +uint32_t Entry::get_fixed_size() { + return HEADER_FIXED_SIZE + REMAINDER_FIXED_SIZE; +} + +void Entry::encode(bufferlist &bl) const { + using ceph::encode; + bufferlist data_bl; + encode(preamble, data_bl); + encode(static_cast<uint8_t>(1), data_bl); + encode(m_entry_tid, data_bl); + encode(m_tag_tid, data_bl); + encode(m_data, data_bl); + + uint32_t crc = data_bl.crc32c(0); + uint32_t bl_offset = bl.length(); + bl.claim_append(data_bl); + encode(crc, bl); + ceph_assert(get_fixed_size() + m_data.length() + bl_offset == bl.length()); +} + +void Entry::decode(bufferlist::const_iterator &iter) { + using ceph::decode; + uint32_t start_offset = iter.get_off(); + uint64_t bl_preamble; + decode(bl_preamble, iter); + if (bl_preamble != preamble) { + throw buffer::malformed_input("incorrect preamble: " + + stringify(bl_preamble)); + } + + uint8_t version; + decode(version, iter); + if (version != 1) { + throw buffer::malformed_input("unknown version: " + stringify(version)); + } + + decode(m_entry_tid, iter); + decode(m_tag_tid, iter); + decode(m_data, iter); + uint32_t end_offset = iter.get_off(); + + uint32_t crc; + decode(crc, iter); + + bufferlist data_bl; + data_bl.substr_of(iter.get_bl(), start_offset, end_offset - start_offset); + uint32_t actual_crc = data_bl.crc32c(0); + if (crc != actual_crc) { + throw buffer::malformed_input("crc mismatch: " + stringify(crc) + + " != " + stringify(actual_crc)); + } +} + +void Entry::dump(Formatter *f) const { + f->dump_unsigned("tag_tid", m_tag_tid); + f->dump_unsigned("entry_tid", m_entry_tid); + + std::stringstream data; + m_data.hexdump(data); + f->dump_string("data", data.str()); +} + +bool Entry::is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed) { + using ceph::decode; + uint32_t start_off = iter.get_off(); + if (iter.get_remaining() < HEADER_FIXED_SIZE) { + bufferlist sub_bl; + sub_bl.substr_of(iter.get_bl(), iter.get_off(), iter.get_remaining()); + if (sub_bl.length() > 0 && sub_bl.is_zero()) { + // pad bytes + *bytes_needed = 0; + } else { + *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining(); + } + return false; + } + uint64_t bl_preamble; + decode(bl_preamble, iter); + if (bl_preamble != preamble) { + *bytes_needed = 0; + return false; + } + iter += HEADER_FIXED_SIZE - sizeof(bl_preamble); + + if (iter.get_remaining() < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - iter.get_remaining(); + return false; + } + uint32_t data_size; + decode(data_size, iter); + + if (iter.get_remaining() < data_size) { + *bytes_needed = data_size - iter.get_remaining(); + return false; + } + iter += data_size; + uint32_t end_off = iter.get_off(); + + if (iter.get_remaining() < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - iter.get_remaining(); + return false; + } + + bufferlist crc_bl; + crc_bl.substr_of(iter.get_bl(), start_off, end_off - start_off); + + *bytes_needed = 0; + uint32_t crc; + decode(crc, iter); + if (crc != crc_bl.crc32c(0)) { + return false; + } + return true; +} + +void Entry::generate_test_instances(std::list<Entry *> &o) { + o.push_back(new Entry(1, 123, bufferlist())); + + bufferlist bl; + bl.append("data"); + o.push_back(new Entry(2, 123, bl)); +} + +bool Entry::operator==(const Entry& rhs) const { + return (m_tag_tid == rhs.m_tag_tid && m_entry_tid == rhs.m_entry_tid && + const_cast<bufferlist&>(m_data).contents_equal( + const_cast<bufferlist&>(rhs.m_data))); +} + +std::ostream &operator<<(std::ostream &os, const Entry &entry) { + os << "Entry[tag_tid=" << entry.get_tag_tid() << ", " + << "entry_tid=" << entry.get_entry_tid() << ", " + << "data size=" << entry.get_data().length() << "]"; + return os; +} + +} // namespace journal diff --git a/src/journal/Entry.h b/src/journal/Entry.h new file mode 100644 index 000000000..52d9e67b4 --- /dev/null +++ b/src/journal/Entry.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_ENTRY_H +#define CEPH_JOURNAL_ENTRY_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include <iosfwd> +#include <string> + +namespace ceph { +class Formatter; +} + +namespace journal { + +class Entry { +public: + Entry() : m_tag_tid(0), m_entry_tid() {} + Entry(uint64_t tag_tid, uint64_t entry_tid, const bufferlist &data) + : m_tag_tid(tag_tid), m_entry_tid(entry_tid), m_data(data) + { + } + + static uint32_t get_fixed_size(); + + inline uint64_t get_tag_tid() const { + return m_tag_tid; + } + inline uint64_t get_entry_tid() const { + return m_entry_tid; + } + inline const bufferlist &get_data() const { + return m_data; + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::const_iterator &iter); + void dump(ceph::Formatter *f) const; + + bool operator==(const Entry& rhs) const; + + static bool is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed); + static void generate_test_instances(std::list<Entry *> &o); + +private: + static const uint64_t preamble = 0x3141592653589793; + + uint64_t m_tag_tid; + uint64_t m_entry_tid; + bufferlist m_data; +}; + +std::ostream &operator<<(std::ostream &os, const Entry &entry); +WRITE_CLASS_ENCODER(journal::Entry) + +} // namespace journal + + +#endif // CEPH_JOURNAL_ENTRY_H diff --git a/src/journal/Future.cc b/src/journal/Future.cc new file mode 100644 index 000000000..0e794d165 --- /dev/null +++ b/src/journal/Future.cc @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Future.h" +#include "journal/FutureImpl.h" +#include "include/ceph_assert.h" + +namespace journal { + +Future::Future() = default; +Future::Future(const Future& o) = default; +Future& Future::operator=(const Future& o) = default; +Future::Future(Future&& o) = default; +Future& Future::operator=(Future&& o) = default; +Future::Future(ceph::ref_t<FutureImpl> future_impl) : m_future_impl(std::move(future_impl)) {} +Future::~Future() = default; + +void Future::flush(Context *on_safe) { + m_future_impl->flush(on_safe); +} + +void Future::wait(Context *on_safe) { + ceph_assert(on_safe != NULL); + m_future_impl->wait(on_safe); +} + +bool Future::is_complete() const { + return m_future_impl->is_complete(); +} + +int Future::get_return_value() const { + return m_future_impl->get_return_value(); +} + +std::ostream &operator<<(std::ostream &os, const Future &future) { + return os << *future.m_future_impl; +} + +} // namespace journal + diff --git a/src/journal/Future.h b/src/journal/Future.h new file mode 100644 index 000000000..ba835b353 --- /dev/null +++ b/src/journal/Future.h @@ -0,0 +1,57 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_FUTURE_H +#define CEPH_JOURNAL_FUTURE_H + +#include <iosfwd> +#include <string> + +#include "include/ceph_assert.h" +#include "include/int_types.h" +#include "common/ref.h" + +class Context; + +namespace journal { + +class FutureImpl; + +class Future { +public: + Future(); + Future(const Future&); + Future& operator=(const Future&); + Future(Future&&); + Future& operator=(Future&&); + Future(ceph::ref_t<FutureImpl> future_impl); + ~Future(); + + bool is_valid() const { + return bool(m_future_impl); + } + + void flush(Context *on_safe); + void wait(Context *on_safe); + + bool is_complete() const; + int get_return_value() const; + +private: + friend class Journaler; + friend std::ostream& operator<<(std::ostream&, const Future&); + + const auto& get_future_impl() const { + return m_future_impl; + } + + ceph::ref_t<FutureImpl> m_future_impl; +}; + +std::ostream &operator<<(std::ostream &os, const Future &future); + +} // namespace journal + +using journal::operator<<; + +#endif // CEPH_JOURNAL_FUTURE_H diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc new file mode 100644 index 000000000..4e804f8dc --- /dev/null +++ b/src/journal/FutureImpl.cc @@ -0,0 +1,167 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/FutureImpl.h" +#include "journal/Utils.h" + +namespace journal { + +FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid, + uint64_t commit_tid) + : m_tag_tid(tag_tid), + m_entry_tid(entry_tid), + m_commit_tid(commit_tid), + m_consistent_ack(this) +{ +} + +void FutureImpl::init(const ceph::ref_t<FutureImpl> &prev_future) { + // chain ourself to the prior future (if any) to that we known when the + // journal is consistent + if (prev_future) { + m_prev_future = prev_future; + m_prev_future->wait(&m_consistent_ack); + } else { + m_consistent_ack.complete(0); + } +} + +void FutureImpl::flush(Context *on_safe) { + + bool complete; + FlushHandlers flush_handlers; + ceph::ref_t<FutureImpl> prev_future; + { + std::lock_guard locker{m_lock}; + complete = (m_safe && m_consistent); + if (!complete) { + if (on_safe != nullptr) { + m_contexts.push_back(on_safe); + } + + prev_future = prepare_flush(&flush_handlers, m_lock); + } + } + + // instruct prior futures to flush as well + while (prev_future) { + prev_future = prev_future->prepare_flush(&flush_handlers); + } + + if (complete && on_safe != NULL) { + on_safe->complete(m_return_value); + } else if (!flush_handlers.empty()) { + // attached to journal object -- instruct it to flush all entries through + // this one. possible to become detached while lock is released, so flush + // will be re-requested by the object if it doesn't own the future + for (auto &pair : flush_handlers) { + pair.first->flush(pair.second); + } + } +} + +ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers) { + std::lock_guard locker{m_lock}; + return prepare_flush(flush_handlers, m_lock); +} + +ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers, + ceph::mutex &lock) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + if (m_flush_state == FLUSH_STATE_NONE) { + m_flush_state = FLUSH_STATE_REQUESTED; + + auto h = m_flush_handler; + if (h) { + flush_handlers->try_emplace(std::move(h), this); + } + } + return m_prev_future; +} + +void FutureImpl::wait(Context *on_safe) { + ceph_assert(on_safe != NULL); + { + std::lock_guard locker{m_lock}; + if (!m_safe || !m_consistent) { + m_contexts.push_back(on_safe); + return; + } + } + + on_safe->complete(m_return_value); +} + +bool FutureImpl::is_complete() const { + std::lock_guard locker{m_lock}; + return m_safe && m_consistent; +} + +int FutureImpl::get_return_value() const { + std::lock_guard locker{m_lock}; + ceph_assert(m_safe && m_consistent); + return m_return_value; +} + +bool FutureImpl::attach(FlushHandler::ref flush_handler) { + std::lock_guard locker{m_lock}; + ceph_assert(!m_flush_handler); + m_flush_handler = std::move(flush_handler); + return m_flush_state != FLUSH_STATE_NONE; +} + +void FutureImpl::safe(int r) { + m_lock.lock(); + ceph_assert(!m_safe); + m_safe = true; + if (m_return_value == 0) { + m_return_value = r; + } + + m_flush_handler.reset(); + if (m_consistent) { + finish_unlock(); + } else { + m_lock.unlock(); + } +} + +void FutureImpl::consistent(int r) { + m_lock.lock(); + ceph_assert(!m_consistent); + m_consistent = true; + m_prev_future.reset(); + if (m_return_value == 0) { + m_return_value = r; + } + + if (m_safe) { + finish_unlock(); + } else { + m_lock.unlock(); + } +} + +void FutureImpl::finish_unlock() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_safe && m_consistent); + + Contexts contexts; + contexts.swap(m_contexts); + + m_lock.unlock(); + for (Contexts::iterator it = contexts.begin(); + it != contexts.end(); ++it) { + (*it)->complete(m_return_value); + } +} + +std::ostream &operator<<(std::ostream &os, const FutureImpl &future) { + os << "Future[tag_tid=" << future.m_tag_tid << ", " + << "entry_tid=" << future.m_entry_tid << ", " + << "commit_tid=" << future.m_commit_tid << "]"; + return os; +} + +} // namespace journal diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h new file mode 100644 index 000000000..241a09709 --- /dev/null +++ b/src/journal/FutureImpl.h @@ -0,0 +1,122 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_FUTURE_IMPL_H +#define CEPH_JOURNAL_FUTURE_IMPL_H + +#include "include/int_types.h" +#include "common/RefCountedObj.h" +#include "include/Context.h" +#include "journal/Future.h" +#include <list> +#include <map> +#include <boost/noncopyable.hpp> +#include "include/ceph_assert.h" + +class Context; + +namespace journal { + +class FutureImpl : public RefCountedObject, boost::noncopyable { +public: + struct FlushHandler { + using ref = std::shared_ptr<FlushHandler>; + virtual void flush(const ceph::ref_t<FutureImpl> &future) = 0; + virtual ~FlushHandler() = default; + }; + + void init(const ceph::ref_t<FutureImpl> &prev_future); + + inline uint64_t get_tag_tid() const { + return m_tag_tid; + } + inline uint64_t get_entry_tid() const { + return m_entry_tid; + } + inline uint64_t get_commit_tid() const { + return m_commit_tid; + } + + void flush(Context *on_safe = NULL); + void wait(Context *on_safe); + + bool is_complete() const; + int get_return_value() const; + + inline bool is_flush_in_progress() const { + std::lock_guard locker{m_lock}; + return (m_flush_state == FLUSH_STATE_IN_PROGRESS); + } + inline void set_flush_in_progress() { + auto h = std::move(m_flush_handler); + ceph_assert(h); + std::lock_guard locker{m_lock}; + m_flush_state = FLUSH_STATE_IN_PROGRESS; + } + + bool attach(FlushHandler::ref flush_handler); + inline void detach() { + m_flush_handler.reset(); + } + inline FlushHandler::ref get_flush_handler() const { + return m_flush_handler; + } + + void safe(int r); + +private: + friend std::ostream &operator<<(std::ostream &, const FutureImpl &); + + typedef std::map<FlushHandler::ref, ceph::ref_t<FutureImpl>> FlushHandlers; + typedef std::list<Context *> Contexts; + + enum FlushState { + FLUSH_STATE_NONE, + FLUSH_STATE_REQUESTED, + FLUSH_STATE_IN_PROGRESS + }; + + struct C_ConsistentAck : public Context { + ceph::ref_t<FutureImpl> future; + C_ConsistentAck(ceph::ref_t<FutureImpl> _future) : future(std::move(_future)) {} + void complete(int r) override { + future->consistent(r); + future.reset(); + } + void finish(int r) override {} + }; + + FRIEND_MAKE_REF(FutureImpl); + FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid); + ~FutureImpl() override = default; + + uint64_t m_tag_tid; + uint64_t m_entry_tid; + uint64_t m_commit_tid; + + mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false); + ceph::ref_t<FutureImpl> m_prev_future; + bool m_safe = false; + bool m_consistent = false; + int m_return_value = 0; + + FlushHandler::ref m_flush_handler; + FlushState m_flush_state = FLUSH_STATE_NONE; + + C_ConsistentAck m_consistent_ack; + Contexts m_contexts; + + ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers); + ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock); + + void consistent(int r); + void finish_unlock(); +}; + +std::ostream &operator<<(std::ostream &os, const FutureImpl &future); + +} // namespace journal + +using journal::operator<<; + +#endif // CEPH_JOURNAL_FUTURE_IMPL_H diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc new file mode 100644 index 000000000..bf04d4e1c --- /dev/null +++ b/src/journal/JournalMetadata.cc @@ -0,0 +1,1165 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalMetadata.h" +#include "journal/Utils.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "cls/journal/cls_journal_client.h" +#include <functional> +#include <set> + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalMetadata: " << this << " " + +namespace journal { + +using namespace cls::journal; + +namespace { + +struct C_GetClient : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + std::string client_id; + cls::journal::Client *client; + Context *on_finish; + + bufferlist out_bl; + + C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + AsyncOpTracker &async_op_tracker, const std::string &client_id, + cls::journal::Client *client, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + client_id(client_id), client(client), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_GetClient() override { + async_op_tracker.finish_op(); + } + + virtual void send() { + send_get_client(); + } + + void send_get_client() { + ldout(cct, 20) << "C_GetClient: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_client_start(&op, client_id); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_GetClient, &C_GetClient::handle_get_client>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_client(int r) { + ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + auto it = out_bl.cbegin(); + r = client::get_client_finish(&it, client); + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_AllocateTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + uint64_t tag_class; + Tag *tag; + Context *on_finish; + + bufferlist out_bl; + + C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx, + const std::string &oid, AsyncOpTracker &async_op_tracker, + uint64_t tag_class, const bufferlist &data, Tag *tag, + Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + tag_class(tag_class), tag(tag), on_finish(on_finish) { + async_op_tracker.start_op(); + tag->data = data; + } + ~C_AllocateTag() override { + async_op_tracker.finish_op(); + } + + void send() { + send_get_next_tag_tid(); + } + + void send_get_next_tag_tid() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_next_tag_tid_start(&op); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_next_tag_tid(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + auto iter = out_bl.cbegin(); + r = client::get_next_tag_tid_finish(&iter, &tag->tid); + } + if (r < 0) { + complete(r); + return; + } + send_tag_create(); + } + + void send_tag_create() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectWriteOperation op; + client::tag_create(&op, tag->tid, tag_class, tag->data); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_tag_create>); + + int r = ioctx.aio_operate(oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + } + + void handle_tag_create(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == -ESTALE) { + send_get_next_tag_tid(); + return; + } else if (r < 0) { + complete(r); + return; + } + + send_get_tag(); + } + + void send_get_tag() { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::get_tag_start(&op, tag->tid); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AllocateTag, &C_AllocateTag::handle_get_tag>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_tag(int r) { + ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl; + + if (r == 0) { + auto iter = out_bl.cbegin(); + + cls::journal::Tag journal_tag; + r = client::get_tag_finish(&iter, &journal_tag); + if (r == 0) { + *tag = journal_tag; + } + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_GetTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + uint64_t tag_tid; + JournalMetadata::Tag *tag; + Context *on_finish; + + bufferlist out_bl; + + C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + AsyncOpTracker &async_op_tracker, uint64_t tag_tid, + JournalMetadata::Tag *tag, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + tag_tid(tag_tid), tag(tag), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_GetTag() override { + async_op_tracker.finish_op(); + } + + void send() { + send_get_tag(); + } + + void send_get_tag() { + librados::ObjectReadOperation op; + client::get_tag_start(&op, tag_tid); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_GetTag, &C_GetTag::handle_get_tag>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_get_tag(int r) { + if (r == 0) { + auto iter = out_bl.cbegin(); + r = client::get_tag_finish(&iter, tag); + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_GetTags : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + const std::string &client_id; + AsyncOpTracker &async_op_tracker; + uint64_t start_after_tag_tid; + boost::optional<uint64_t> tag_class; + JournalMetadata::Tags *tags; + Context *on_finish; + + const uint64_t MAX_RETURN = 64; + bufferlist out_bl; + + C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, AsyncOpTracker &async_op_tracker, + uint64_t start_after_tag_tid, + const boost::optional<uint64_t> &tag_class, + JournalMetadata::Tags *tags, Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id), + async_op_tracker(async_op_tracker), + start_after_tag_tid(start_after_tag_tid), tag_class(tag_class), + tags(tags), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_GetTags() override { + async_op_tracker.finish_op(); + } + + void send() { + send_tag_list(); + } + + void send_tag_list() { + librados::ObjectReadOperation op; + client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id, + tag_class); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_GetTags, &C_GetTags::handle_tag_list>); + + out_bl.clear(); + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_tag_list(int r) { + if (r == 0) { + std::set<cls::journal::Tag> journal_tags; + auto iter = out_bl.cbegin(); + r = client::tag_list_finish(&iter, &journal_tags); + if (r == 0) { + for (auto &journal_tag : journal_tags) { + tags->push_back(journal_tag); + start_after_tag_tid = journal_tag.tid; + } + + if (journal_tags.size() == MAX_RETURN) { + send_tag_list(); + return; + } + } + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +struct C_FlushCommitPosition : public Context { + Context *commit_position_ctx; + Context *on_finish; + + C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish) + : commit_position_ctx(commit_position_ctx), on_finish(on_finish) { + } + void finish(int r) override { + if (commit_position_ctx != nullptr) { + commit_position_ctx->complete(r); + } + on_finish->complete(r); + } +}; + +struct C_AssertActiveTag : public Context { + CephContext *cct; + librados::IoCtx &ioctx; + const std::string &oid; + AsyncOpTracker &async_op_tracker; + std::string client_id; + uint64_t tag_tid; + Context *on_finish; + + bufferlist out_bl; + + C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx, + const std::string &oid, AsyncOpTracker &async_op_tracker, + const std::string &client_id, uint64_t tag_tid, + Context *on_finish) + : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker), + client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) { + async_op_tracker.start_op(); + } + ~C_AssertActiveTag() override { + async_op_tracker.finish_op(); + } + + void send() { + ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl; + + librados::ObjectReadOperation op; + client::tag_list_start(&op, tag_tid, 2, client_id, boost::none); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + this, &utils::rados_state_callback< + C_AssertActiveTag, &C_AssertActiveTag::handle_send>); + + int r = ioctx.aio_operate(oid, comp, &op, &out_bl); + ceph_assert(r == 0); + comp->release(); + } + + void handle_send(int r) { + ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl; + + std::set<cls::journal::Tag> tags; + if (r == 0) { + auto it = out_bl.cbegin(); + r = client::tag_list_finish(&it, &tags); + } + + // NOTE: since 0 is treated as an uninitialized list filter, we need to + // load to entries and look at the last tid + if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) { + r = -ESTALE; + } + complete(r); + } + + void finish(int r) override { + on_finish->complete(r); + } +}; + +} // anonymous namespace + +JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, + ceph::mutex *timer_lock, librados::IoCtx &ioctx, + const std::string &oid, + const std::string &client_id, + const Settings &settings) + : m_oid(oid), + m_client_id(client_id), m_settings(settings), + m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock), + m_watch_ctx(this) +{ + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); +} + +JournalMetadata::~JournalMetadata() { + std::lock_guard locker{m_lock}; + ceph_assert(!m_initialized); +} + +void JournalMetadata::init(Context *on_finish) { + { + std::lock_guard locker{m_lock}; + ceph_assert(!m_initialized); + m_initialized = true; + } + + // chain the init sequence (reverse order) + on_finish = utils::create_async_context_callback( + this, on_finish); + on_finish = new C_ImmutableMetadata(this, on_finish); + on_finish = new LambdaContext([this, on_finish](int r) { + if (r < 0) { + lderr(m_cct) << __func__ << ": failed to watch journal" + << cpp_strerror(r) << dendl; + std::lock_guard locker{m_lock}; + m_watch_handle = 0; + on_finish->complete(r); + return; + } + + get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish); + }); + + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::shut_down(Context *on_finish) { + + ldout(m_cct, 20) << __func__ << dendl; + + uint64_t watch_handle = 0; + { + std::lock_guard locker{m_lock}; + m_initialized = false; + std::swap(watch_handle, m_watch_handle); + } + + // chain the shut down sequence (reverse order) + on_finish = utils::create_async_context_callback( + this, on_finish); + on_finish = new LambdaContext([this, on_finish](int r) { + ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl; + m_async_op_tracker.wait_for_ops(on_finish); + }); + on_finish = new LambdaContext([this, on_finish](int r) { + ldout(m_cct, 20) << "shut_down: flushing watch" << dendl; + librados::Rados rados(m_ioctx); + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + r = rados.aio_watch_flush(comp); + ceph_assert(r == 0); + comp->release(); + }); + on_finish = new LambdaContext([this, on_finish](int r) { + flush_commit_position(on_finish); + }); + if (watch_handle != 0) { + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + int r = m_ioctx.aio_unwatch(watch_handle, comp); + ceph_assert(r == 0); + comp->release(); + } else { + on_finish->complete(0); + } +} + +void JournalMetadata::get_immutable_metadata(uint8_t *order, + uint8_t *splay_width, + int64_t *pool_id, + Context *on_finish) { + client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id, + on_finish); +} + +void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set, + uint64_t *active_set, + RegisteredClients *clients, + Context *on_finish) { + client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients, + on_finish); +} + +void JournalMetadata::register_client(const bufferlist &data, + Context *on_finish) { + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; + librados::ObjectWriteOperation op; + client::client_register(&op, m_client_id, data); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, + utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::update_client(const bufferlist &data, + Context *on_finish) { + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; + librados::ObjectWriteOperation op; + client::client_update_data(&op, m_client_id, data); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::unregister_client(Context *on_finish) { + ceph_assert(!m_client_id.empty()); + + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; + librados::ObjectWriteOperation op; + client::client_unregister(&op, m_client_id); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data, + Tag *tag, Context *on_finish) { + on_finish = new C_NotifyUpdate(this, on_finish); + C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid, + m_async_op_tracker, tag_class, + data, tag, on_finish); + ctx->send(); +} + +void JournalMetadata::get_client(const std::string &client_id, + cls::journal::Client *client, + Context *on_finish) { + C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker, + client_id, client, on_finish); + ctx->send(); +} + +void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { + C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker, + tag_tid, tag, on_finish); + ctx->send(); +} + +void JournalMetadata::get_tags(uint64_t start_after_tag_tid, + const boost::optional<uint64_t> &tag_class, + Tags *tags, Context *on_finish) { + C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id, + m_async_op_tracker, start_after_tag_tid, + tag_class, tags, on_finish); + ctx->send(); +} + +void JournalMetadata::add_listener(JournalMetadataListener *listener) { + std::unique_lock locker{m_lock}; + m_update_cond.wait(locker, [this] { + return m_update_notifications <= 0; + }); + m_listeners.push_back(listener); +} + +void JournalMetadata::remove_listener(JournalMetadataListener *listener) { + std::unique_lock locker{m_lock}; + m_update_cond.wait(locker, [this] { + return m_update_notifications <= 0; + }); + m_listeners.remove(listener); +} + +void JournalMetadata::set_minimum_set(uint64_t object_set) { + std::lock_guard locker{m_lock}; + + ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set + << ", new=" << object_set << dendl; + if (m_minimum_set >= object_set) { + return; + } + + librados::ObjectWriteOperation op; + client::set_minimum_set(&op, object_set); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + + m_minimum_set = object_set; +} + +int JournalMetadata::set_active_set(uint64_t object_set) { + C_SaferCond ctx; + set_active_set(object_set, &ctx); + return ctx.wait(); +} + +void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) { + std::lock_guard locker{m_lock}; + + ldout(m_cct, 20) << __func__ << ": current=" << m_active_set + << ", new=" << object_set << dendl; + if (m_active_set >= object_set) { + m_work_queue->queue(on_finish, 0); + return; + } + + librados::ObjectWriteOperation op; + client::set_active_set(&op, object_set); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + + m_active_set = object_set; +} + +void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { + std::lock_guard locker{m_lock}; + + C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid, + m_async_op_tracker, + m_client_id, tag_tid, + on_finish); + ctx->send(); +} + +void JournalMetadata::flush_commit_position() { + ldout(m_cct, 20) << __func__ << dendl; + + C_SaferCond ctx; + flush_commit_position(&ctx); + ctx.wait(); +} + +void JournalMetadata::flush_commit_position(Context *on_safe) { + ldout(m_cct, 20) << __func__ << dendl; + + std::scoped_lock locker{*m_timer_lock, m_lock}; + if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) { + // nothing to flush + if (on_safe != nullptr) { + m_work_queue->queue(on_safe, 0); + } + return; + } + + if (on_safe != nullptr) { + m_flush_commit_position_ctxs.push_back(on_safe); + } + if (m_commit_position_ctx == nullptr) { + return; + } + + cancel_commit_task(); + handle_commit_position_task(); +} + +void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { + std::lock_guard locker{m_lock}; + uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; + if (allocated_entry_tid <= entry_tid) { + allocated_entry_tid = entry_tid + 1; + } +} + +bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid, + uint64_t *entry_tid) const { + std::lock_guard locker{m_lock}; + + AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid); + if (it == m_allocated_entry_tids.end()) { + return false; + } + + ceph_assert(it->second > 0); + *entry_tid = it->second - 1; + return true; +} + +void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) { + if (r < 0) { + lderr(m_cct) << "failed to initialize immutable metadata: " + << cpp_strerror(r) << dendl; + on_init->complete(r); + return; + } + + ldout(m_cct, 10) << "initialized immutable metadata" << dendl; + refresh(on_init); +} + +void JournalMetadata::refresh(Context *on_complete) { + ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; + + { + std::lock_guard locker{m_lock}; + if (on_complete != nullptr) { + m_refresh_ctxs.push_back(on_complete); + } + ++m_refreshes_in_progress; + } + + auto refresh = new C_Refresh(this); + get_mutable_metadata(&refresh->minimum_set, &refresh->active_set, + &refresh->registered_clients, refresh); +} + +void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { + ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; + + m_lock.lock(); + if (r == 0) { + Client client(m_client_id, bufferlist()); + RegisteredClients::iterator it = refresh->registered_clients.find(client); + if (it != refresh->registered_clients.end()) { + if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { + ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id + << dendl; + } + m_minimum_set = std::max(m_minimum_set, refresh->minimum_set); + m_active_set = std::max(m_active_set, refresh->active_set); + m_registered_clients = refresh->registered_clients; + m_client = *it; + + ++m_update_notifications; + m_lock.unlock(); + for (Listeners::iterator it = m_listeners.begin(); + it != m_listeners.end(); ++it) { + (*it)->handle_update(this); + } + m_lock.lock(); + if (--m_update_notifications == 0) { + m_update_cond.notify_all(); + } + } else { + lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; + r = -ENOENT; + } + } + + Contexts refresh_ctxs; + ceph_assert(m_refreshes_in_progress > 0); + --m_refreshes_in_progress; + if (m_refreshes_in_progress == 0) { + std::swap(refresh_ctxs, m_refresh_ctxs); + } + m_lock.unlock(); + + for (auto ctx : refresh_ctxs) { + ctx->complete(r); + } +} + +void JournalMetadata::cancel_commit_task() { + ldout(m_cct, 20) << __func__ << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_commit_position_ctx != nullptr); + ceph_assert(m_commit_position_task_ctx != nullptr); + m_timer->cancel_event(m_commit_position_task_ctx); + m_commit_position_task_ctx = NULL; +} + +void JournalMetadata::schedule_commit_task() { + ldout(m_cct, 20) << __func__ << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_commit_position_ctx != nullptr); + if (m_commit_position_task_ctx == nullptr) { + m_commit_position_task_ctx = + m_timer->add_event_after(m_settings.commit_interval, + new C_CommitPositionTask(this)); + } +} + +void JournalMetadata::handle_commit_position_task() { + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); + ldout(m_cct, 20) << __func__ << ": " + << "client_id=" << m_client_id << ", " + << "commit_position=" << m_commit_position << dendl; + + m_commit_position_task_ctx = nullptr; + Context* commit_position_ctx = nullptr; + std::swap(commit_position_ctx, m_commit_position_ctx); + + m_async_op_tracker.start_op(); + ++m_flush_commits_in_progress; + + Context* ctx = new LambdaContext([this, commit_position_ctx](int r) { + Contexts flush_commit_position_ctxs; + m_lock.lock(); + ceph_assert(m_flush_commits_in_progress > 0); + --m_flush_commits_in_progress; + if (m_flush_commits_in_progress == 0) { + std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs); + } + m_lock.unlock(); + + commit_position_ctx->complete(0); + for (auto ctx : flush_commit_position_ctxs) { + ctx->complete(0); + } + m_async_op_tracker.finish_op(); + }); + ctx = new C_NotifyUpdate(this, ctx); + ctx = new LambdaContext([this, ctx](int r) { + // manually kick of a refresh in case the notification is missed + // and ignore the next notification that we are about to send + m_lock.lock(); + ++m_ignore_watch_notifies; + m_lock.unlock(); + + refresh(ctx); + }); + ctx = new LambdaContext([this, ctx](int r) { + schedule_laggy_clients_disconnect(ctx); + }); + + librados::ObjectWriteOperation op; + client::client_commit(&op, m_client_id, m_commit_position); + + auto comp = librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void JournalMetadata::schedule_watch_reset() { + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + m_timer->add_event_after(1, new C_WatchReset(this)); +} + +void JournalMetadata::handle_watch_reset() { + ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); + if (!m_initialized) { + return; + } + + int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); + if (r < 0) { + if (r == -ENOENT) { + ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; + } else if (r == -EBLOCKLISTED) { + ldout(m_cct, 5) << __func__ << ": client blocklisted" << dendl; + } else { + lderr(m_cct) << __func__ << ": failed to watch journal: " + << cpp_strerror(r) << dendl; + } + schedule_watch_reset(); + } else { + ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl; + refresh(NULL); + } +} + +void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { + ldout(m_cct, 10) << "journal header updated" << dendl; + + bufferlist bl; + m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); + + { + std::lock_guard locker{m_lock}; + if (m_ignore_watch_notifies > 0) { + --m_ignore_watch_notifies; + return; + } + } + + refresh(NULL); +} + +void JournalMetadata::handle_watch_error(int err) { + if (err == -ENOTCONN) { + ldout(m_cct, 5) << "journal watch error: header removed" << dendl; + } else if (err == -EBLOCKLISTED) { + lderr(m_cct) << "journal watch error: client blocklisted" << dendl; + } else { + lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; + } + + std::scoped_lock locker{*m_timer_lock, m_lock}; + + // release old watch on error + if (m_watch_handle != 0) { + m_ioctx.unwatch2(m_watch_handle); + m_watch_handle = 0; + } + + if (m_initialized && err != -ENOENT) { + schedule_watch_reset(); + } +} + +uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, + uint64_t tag_tid, + uint64_t entry_tid) { + std::lock_guard locker{m_lock}; + uint64_t commit_tid = ++m_commit_tid; + m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid, + entry_tid); + + ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" + << "object_num=" << object_num << ", " + << "tag_tid=" << tag_tid << ", " + << "entry_tid=" << entry_tid << "]" + << dendl; + return commit_tid; +} + +void JournalMetadata::overflow_commit_tid(uint64_t commit_tid, + uint64_t object_num) { + std::lock_guard locker{m_lock}; + + auto it = m_pending_commit_tids.find(commit_tid); + ceph_assert(it != m_pending_commit_tids.end()); + ceph_assert(it->second.object_num < object_num); + + ldout(m_cct, 20) << __func__ << ": " + << "commit_tid=" << commit_tid << ", " + << "old_object_num=" << it->second.object_num << ", " + << "new_object_num=" << object_num << dendl; + it->second.object_num = object_num; +} + +void JournalMetadata::get_commit_entry(uint64_t commit_tid, + uint64_t *object_num, + uint64_t *tag_tid, uint64_t *entry_tid) { + std::lock_guard locker{m_lock}; + + auto it = m_pending_commit_tids.find(commit_tid); + ceph_assert(it != m_pending_commit_tids.end()); + + *object_num = it->second.object_num; + *tag_tid = it->second.tag_tid; + *entry_tid = it->second.entry_tid; +} + +void JournalMetadata::committed(uint64_t commit_tid, + const CreateContext &create_context) { + ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; + + ObjectSetPosition commit_position; + Context *stale_ctx = nullptr; + { + std::scoped_lock locker{*m_timer_lock, m_lock}; + ceph_assert(commit_tid > m_commit_position_tid); + + if (!m_commit_position.object_positions.empty()) { + // in-flight commit position update + commit_position = m_commit_position; + } else { + // safe commit position + commit_position = m_client.commit_position; + } + + CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); + ceph_assert(it != m_pending_commit_tids.end()); + + CommitEntry &commit_entry = it->second; + commit_entry.committed = true; + + bool update_commit_position = false; + while (!m_pending_commit_tids.empty()) { + CommitTids::iterator it = m_pending_commit_tids.begin(); + CommitEntry &commit_entry = it->second; + if (!commit_entry.committed) { + break; + } + + commit_position.object_positions.emplace_front( + commit_entry.object_num, commit_entry.tag_tid, + commit_entry.entry_tid); + m_pending_commit_tids.erase(it); + update_commit_position = true; + } + + if (!update_commit_position) { + return; + } + + // prune the position to have one position per splay offset + std::set<uint8_t> in_use_splay_offsets; + ObjectPositions::iterator ob_it = commit_position.object_positions.begin(); + while (ob_it != commit_position.object_positions.end()) { + uint8_t splay_offset = ob_it->object_number % m_splay_width; + if (!in_use_splay_offsets.insert(splay_offset).second) { + ob_it = commit_position.object_positions.erase(ob_it); + } else { + ++ob_it; + } + } + + stale_ctx = m_commit_position_ctx; + m_commit_position_ctx = create_context(); + m_commit_position = commit_position; + m_commit_position_tid = commit_tid; + + ldout(m_cct, 20) << "updated commit position: " << commit_position << ", " + << "on_safe=" << m_commit_position_ctx << dendl; + schedule_commit_task(); + } + + + if (stale_ctx != nullptr) { + ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx + << dendl; + stale_ctx->complete(-ESTALE); + } +} + +void JournalMetadata::notify_update() { + ldout(m_cct, 10) << "notifying journal header update" << dendl; + + bufferlist bl; + m_ioctx.notify2(m_oid, bl, 5000, NULL); +} + +void JournalMetadata::async_notify_update(Context *on_safe) { + ldout(m_cct, 10) << "async notifying journal header update" << dendl; + + C_AioNotify *ctx = new C_AioNotify(this, on_safe); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + + bufferlist bl; + int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL); + ceph_assert(r == 0); + + comp->release(); +} + +void JournalMetadata::wait_for_ops() { + C_SaferCond ctx; + m_async_op_tracker.wait_for_ops(&ctx); + ctx.wait(); +} + +void JournalMetadata::handle_notified(int r) { + ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; +} + +void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + if (m_settings.max_concurrent_object_sets <= 0) { + on_finish->complete(0); + return; + } + + Context *ctx = on_finish; + { + std::lock_guard locker{m_lock}; + for (auto &c : m_registered_clients) { + if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || + c.id == m_client_id || + m_settings.ignored_laggy_clients.count(c.id) > 0) { + continue; + } + const std::string &client_id = c.id; + uint64_t object_set = 0; + if (!c.commit_position.object_positions.empty()) { + auto &position = *(c.commit_position.object_positions.begin()); + object_set = position.object_number / m_splay_width; + } + + if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { + ldout(m_cct, 1) << __func__ << ": " << client_id + << ": scheduling disconnect" << dendl; + + ctx = new LambdaContext([this, client_id, ctx](int r1) { + ldout(m_cct, 10) << __func__ << ": " << client_id + << ": flagging disconnected" << dendl; + + librados::ObjectWriteOperation op; + client::client_update_state( + &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED); + + auto comp = librados::Rados::aio_create_completion( + ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); + }); + } + } + } + + if (ctx == on_finish) { + ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; + } + ctx->complete(0); +} + +std::ostream &operator<<(std::ostream &os, + const JournalMetadata::RegisteredClients &clients) { + os << "["; + for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin(); + c != clients.end(); ++c) { + os << (c == clients.begin() ? "" : ", " ) << *c; + } + os << "]"; + return os; +} + +std::ostream &operator<<(std::ostream &os, + const JournalMetadata &jm) { + std::lock_guard locker{jm.m_lock}; + os << "[oid=" << jm.m_oid << ", " + << "initialized=" << jm.m_initialized << ", " + << "order=" << (int)jm.m_order << ", " + << "splay_width=" << (int)jm.m_splay_width << ", " + << "pool_id=" << jm.m_pool_id << ", " + << "minimum_set=" << jm.m_minimum_set << ", " + << "active_set=" << jm.m_active_set << ", " + << "client_id=" << jm.m_client_id << ", " + << "commit_tid=" << jm.m_commit_tid << ", " + << "commit_interval=" << jm.m_settings.commit_interval << ", " + << "commit_position=" << jm.m_commit_position << ", " + << "registered_clients=" << jm.m_registered_clients << "]"; + return os; +} + +} // namespace journal diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h new file mode 100644 index 000000000..071456f56 --- /dev/null +++ b/src/journal/JournalMetadata.h @@ -0,0 +1,375 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H +#define CEPH_JOURNAL_JOURNAL_METADATA_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/AsyncOpTracker.h" +#include "common/Cond.h" +#include "common/Timer.h" +#include "common/ceph_mutex.h" +#include "common/RefCountedObj.h" +#include "common/WorkQueue.h" +#include "cls/journal/cls_journal_types.h" +#include "journal/JournalMetadataListener.h" +#include "journal/Settings.h" +#include <boost/noncopyable.hpp> +#include <boost/optional.hpp> +#include <functional> +#include <list> +#include <map> +#include <string> +#include "include/ceph_assert.h" + +namespace journal { + +class JournalMetadata : public RefCountedObject, boost::noncopyable { +public: + typedef std::function<Context*()> CreateContext; + typedef cls::journal::ObjectPosition ObjectPosition; + typedef cls::journal::ObjectPositions ObjectPositions; + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + typedef cls::journal::Client Client; + typedef cls::journal::Tag Tag; + + typedef std::set<Client> RegisteredClients; + typedef std::list<Tag> Tags; + + void init(Context *on_init); + void shut_down(Context *on_finish); + + bool is_initialized() const { return m_initialized; } + + void get_immutable_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish); + + void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set, + RegisteredClients *clients, Context *on_finish); + + void add_listener(JournalMetadataListener *listener); + void remove_listener(JournalMetadataListener *listener); + + void register_client(const bufferlist &data, Context *on_finish); + void update_client(const bufferlist &data, Context *on_finish); + void unregister_client(Context *on_finish); + void get_client(const std::string &client_id, cls::journal::Client *client, + Context *on_finish); + + void allocate_tag(uint64_t tag_class, const bufferlist &data, + Tag *tag, Context *on_finish); + void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish); + void get_tags(uint64_t start_after_tag_tid, + const boost::optional<uint64_t> &tag_class, Tags *tags, + Context *on_finish); + + inline const Settings &get_settings() const { + return m_settings; + } + inline const std::string &get_client_id() const { + return m_client_id; + } + inline uint8_t get_order() const { + return m_order; + } + inline uint64_t get_object_size() const { + return 1 << m_order; + } + inline uint8_t get_splay_width() const { + return m_splay_width; + } + inline int64_t get_pool_id() const { + return m_pool_id; + } + + inline void queue(Context *on_finish, int r) { + m_work_queue->queue(on_finish, r); + } + + inline ContextWQ *get_work_queue() { + return m_work_queue; + } + + inline SafeTimer &get_timer() { + return *m_timer; + } + inline ceph::mutex &get_timer_lock() { + return *m_timer_lock; + } + + void set_minimum_set(uint64_t object_set); + inline uint64_t get_minimum_set() const { + std::lock_guard locker{m_lock}; + return m_minimum_set; + } + + int set_active_set(uint64_t object_set); + void set_active_set(uint64_t object_set, Context *on_finish); + inline uint64_t get_active_set() const { + std::lock_guard locker{m_lock}; + return m_active_set; + } + + void assert_active_tag(uint64_t tag_tid, Context *on_finish); + + void flush_commit_position(); + void flush_commit_position(Context *on_safe); + void get_commit_position(ObjectSetPosition *commit_position) const { + std::lock_guard locker{m_lock}; + *commit_position = m_client.commit_position; + } + + void get_registered_clients(RegisteredClients *registered_clients) { + std::lock_guard locker{m_lock}; + *registered_clients = m_registered_clients; + } + + inline uint64_t allocate_entry_tid(uint64_t tag_tid) { + std::lock_guard locker{m_lock}; + return m_allocated_entry_tids[tag_tid]++; + } + void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid); + bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const; + + uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid, + uint64_t entry_tid); + void overflow_commit_tid(uint64_t commit_tid, uint64_t object_num); + void get_commit_entry(uint64_t commit_tid, uint64_t *object_num, + uint64_t *tag_tid, uint64_t *entry_tid); + void committed(uint64_t commit_tid, const CreateContext &create_context); + + void notify_update(); + void async_notify_update(Context *on_safe); + + void wait_for_ops(); + +private: + FRIEND_MAKE_REF(JournalMetadata); + JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, + librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id, const Settings &settings); + ~JournalMetadata() override; + + typedef std::map<uint64_t, uint64_t> AllocatedEntryTids; + typedef std::list<JournalMetadataListener*> Listeners; + typedef std::list<Context*> Contexts; + + struct CommitEntry { + uint64_t object_num; + uint64_t tag_tid; + uint64_t entry_tid; + bool committed; + + CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) { + } + CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid) + : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid), + committed(false) { + } + }; + typedef std::map<uint64_t, CommitEntry> CommitTids; + + struct C_WatchCtx : public librados::WatchCtx2 { + JournalMetadata *journal_metadata; + + C_WatchCtx(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) {} + + void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) override { + journal_metadata->handle_watch_notify(notify_id, cookie); + } + void handle_error(uint64_t cookie, int err) override { + journal_metadata->handle_watch_error(err); + } + }; + + struct C_WatchReset : public Context { + JournalMetadata *journal_metadata; + + C_WatchReset(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) { + journal_metadata->m_async_op_tracker.start_op(); + } + ~C_WatchReset() override { + journal_metadata->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + journal_metadata->handle_watch_reset(); + } + }; + + struct C_CommitPositionTask : public Context { + JournalMetadata *journal_metadata; + + C_CommitPositionTask(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) { + journal_metadata->m_async_op_tracker.start_op(); + } + ~C_CommitPositionTask() override { + journal_metadata->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + std::lock_guard locker{journal_metadata->m_lock}; + journal_metadata->handle_commit_position_task(); + }; + }; + + struct C_AioNotify : public Context { + JournalMetadata* journal_metadata; + Context *on_safe; + + C_AioNotify(JournalMetadata *_journal_metadata, Context *_on_safe) + : journal_metadata(_journal_metadata), on_safe(_on_safe) { + journal_metadata->m_async_op_tracker.start_op(); + } + ~C_AioNotify() override { + journal_metadata->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + journal_metadata->handle_notified(r); + if (on_safe != nullptr) { + on_safe->complete(0); + } + } + }; + + struct C_NotifyUpdate : public Context { + JournalMetadata* journal_metadata; + Context *on_safe; + + C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL) + : journal_metadata(_journal_metadata), on_safe(_on_safe) { + journal_metadata->m_async_op_tracker.start_op(); + } + ~C_NotifyUpdate() override { + journal_metadata->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + if (r == 0) { + journal_metadata->async_notify_update(on_safe); + return; + } + if (on_safe != NULL) { + on_safe->complete(r); + } + } + }; + + struct C_ImmutableMetadata : public Context { + JournalMetadata* journal_metadata; + Context *on_finish; + + C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish) + : journal_metadata(_journal_metadata), on_finish(_on_finish) { + std::lock_guard locker{journal_metadata->m_lock}; + journal_metadata->m_async_op_tracker.start_op(); + } + ~C_ImmutableMetadata() override { + journal_metadata->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + journal_metadata->handle_immutable_metadata(r, on_finish); + } + }; + + struct C_Refresh : public Context { + JournalMetadata* journal_metadata; + uint64_t minimum_set; + uint64_t active_set; + RegisteredClients registered_clients; + + C_Refresh(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) { + std::lock_guard locker{journal_metadata->m_lock}; + journal_metadata->m_async_op_tracker.start_op(); + } + ~C_Refresh() override { + journal_metadata->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + journal_metadata->handle_refresh_complete(this, r); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct = nullptr; + std::string m_oid; + std::string m_client_id; + Settings m_settings; + + uint8_t m_order = 0; + uint8_t m_splay_width = 0; + int64_t m_pool_id = -1; + bool m_initialized = false; + + ContextWQ *m_work_queue; + SafeTimer *m_timer; + ceph::mutex *m_timer_lock; + + mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock"); + + uint64_t m_commit_tid = 0; + CommitTids m_pending_commit_tids; + + Listeners m_listeners; + + C_WatchCtx m_watch_ctx; + uint64_t m_watch_handle = 0; + + uint64_t m_minimum_set = 0; + uint64_t m_active_set = 0; + RegisteredClients m_registered_clients; + Client m_client; + + AllocatedEntryTids m_allocated_entry_tids; + + size_t m_update_notifications = 0; + ceph::condition_variable m_update_cond; + + size_t m_ignore_watch_notifies = 0; + size_t m_refreshes_in_progress = 0; + Contexts m_refresh_ctxs; + + uint64_t m_commit_position_tid = 0; + ObjectSetPosition m_commit_position; + Context *m_commit_position_ctx = nullptr; + Context *m_commit_position_task_ctx = nullptr; + + size_t m_flush_commits_in_progress = 0; + Contexts m_flush_commit_position_ctxs; + + AsyncOpTracker m_async_op_tracker; + + void handle_immutable_metadata(int r, Context *on_init); + + void refresh(Context *on_finish); + void handle_refresh_complete(C_Refresh *refresh, int r); + + void cancel_commit_task(); + void schedule_commit_task(); + void handle_commit_position_task(); + + void schedule_watch_reset(); + void handle_watch_reset(); + void handle_watch_notify(uint64_t notify_id, uint64_t cookie); + void handle_watch_error(int err); + void handle_notified(int r); + + void schedule_laggy_clients_disconnect(Context *on_finish); + + friend std::ostream &operator<<(std::ostream &os, + const JournalMetadata &journal_metadata); +}; + +std::ostream &operator<<(std::ostream &os, + const JournalMetadata::RegisteredClients &clients); + +std::ostream &operator<<(std::ostream &os, + const JournalMetadata &journal_metadata); + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_METADATA_H diff --git a/src/journal/JournalMetadataListener.h b/src/journal/JournalMetadataListener.h new file mode 100644 index 000000000..121fe6856 --- /dev/null +++ b/src/journal/JournalMetadataListener.h @@ -0,0 +1,30 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 SUSE LINUX GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_JOURNAL_JOURNAL_METADATA_LISTENER_H +#define CEPH_JOURNAL_JOURNAL_METADATA_LISTENER_H + +namespace journal { + +class JournalMetadata; + +struct JournalMetadataListener { + virtual ~JournalMetadataListener() {}; + virtual void handle_update(JournalMetadata *) = 0; +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_METADATA_LISTENER_H + diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc new file mode 100644 index 000000000..4a0912476 --- /dev/null +++ b/src/journal/JournalPlayer.cc @@ -0,0 +1,871 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/PriorityCache.h" +#include "include/stringify.h" +#include "journal/JournalPlayer.h" +#include "journal/Entry.h" +#include "journal/ReplayHandler.h" +#include "journal/Types.h" +#include "journal/Utils.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalPlayer: " << this << " " + +namespace journal { + +namespace { + +static const uint64_t MIN_FETCH_BYTES = 32768; + +struct C_HandleComplete : public Context { + ReplayHandler* replay_handler; + + explicit C_HandleComplete(ReplayHandler* r) : replay_handler(std::move(r)) {} + ~C_HandleComplete() override {} + void finish(int r) override { + replay_handler->handle_complete(r); + } +}; + +struct C_HandleEntriesAvailable : public Context { + ReplayHandler* replay_handler; + + explicit C_HandleEntriesAvailable(ReplayHandler* r) : replay_handler(std::move(r)) {} + ~C_HandleEntriesAvailable() override {} + void finish(int r) override { + replay_handler->handle_entries_available(); + } +}; + +} // anonymous namespace + +JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, + std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + ReplayHandler* replay_handler, + CacheManagerHandler *cache_manager_handler) + : m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(std::move(journal_metadata)), + m_replay_handler(std::move(replay_handler)), + m_cache_manager_handler(cache_manager_handler), + m_cache_rebalance_handler(this) +{ + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + + ObjectSetPosition commit_position; + m_journal_metadata->get_commit_position(&commit_position); + + if (!commit_position.object_positions.empty()) { + ldout(m_cct, 5) << "commit position: " << commit_position << dendl; + + // start replay after the last committed entry's object + uint8_t splay_width = m_journal_metadata->get_splay_width(); + auto &active_position = commit_position.object_positions.front(); + m_active_tag_tid = active_position.tag_tid; + m_commit_position_valid = true; + m_commit_position = active_position; + m_splay_offset = active_position.object_number % splay_width; + for (auto &position : commit_position.object_positions) { + uint8_t splay_offset = position.object_number % splay_width; + m_commit_positions[splay_offset] = position; + } + } + + if (m_cache_manager_handler != nullptr) { + m_cache_name = "JournalPlayer/" + stringify(m_ioctx.get_id()) + "/" + + m_object_oid_prefix; + auto order = m_journal_metadata->get_order(); + auto splay_width = m_journal_metadata->get_splay_width(); + uint64_t min_size = MIN_FETCH_BYTES * splay_width; + uint64_t max_size = (2 << order) * splay_width; + + m_cache_manager_handler->register_cache(m_cache_name, min_size, max_size, + &m_cache_rebalance_handler); + m_max_fetch_bytes = 0; + } else { + m_max_fetch_bytes = 2 << m_journal_metadata->get_order(); + } +} + +JournalPlayer::~JournalPlayer() { + ceph_assert(m_async_op_tracker.empty()); + { + std::lock_guard locker{m_lock}; + ceph_assert(m_shut_down); + ceph_assert(m_fetch_object_numbers.empty()); + ceph_assert(!m_watch_scheduled); + } + + if (m_cache_manager_handler != nullptr) { + m_cache_manager_handler->unregister_cache(m_cache_name); + } +} + +void JournalPlayer::prefetch() { + std::lock_guard locker{m_lock}; + ceph_assert(m_state == STATE_INIT); + + if (m_shut_down) { + return; + } + + if (m_cache_manager_handler != nullptr && m_max_fetch_bytes == 0) { + m_state = STATE_WAITCACHE; + return; + } + + m_state = STATE_PREFETCH; + + m_active_set = m_journal_metadata->get_active_set(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + m_prefetch_splay_offsets.insert(splay_offset); + } + + // compute active object for each splay offset (might be before + // active set) + std::map<uint8_t, uint64_t> splay_offset_to_objects; + for (auto &position : m_commit_positions) { + ceph_assert(splay_offset_to_objects.count(position.first) == 0); + splay_offset_to_objects[position.first] = position.second.object_number; + } + + // prefetch the active object for each splay offset + std::set<uint64_t> prefetch_object_numbers; + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + uint64_t object_number = splay_offset; + if (splay_offset_to_objects.count(splay_offset) != 0) { + object_number = splay_offset_to_objects[splay_offset]; + } + + prefetch_object_numbers.insert(object_number); + } + + ldout(m_cct, 10) << __func__ << ": prefetching " + << prefetch_object_numbers.size() << " " << "objects" + << dendl; + for (auto object_number : prefetch_object_numbers) { + fetch(object_number); + } +} + +void JournalPlayer::prefetch_and_watch(double interval) { + { + std::lock_guard locker{m_lock}; + m_watch_enabled = true; + m_watch_interval = interval; + m_watch_step = WATCH_STEP_FETCH_CURRENT; + } + prefetch(); +} + +void JournalPlayer::shut_down(Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + std::lock_guard locker{m_lock}; + + ceph_assert(!m_shut_down); + m_shut_down = true; + m_watch_enabled = false; + + on_finish = utils::create_async_context_callback( + m_journal_metadata, on_finish); + + if (m_watch_scheduled) { + auto object_player = get_object_player(); + switch (m_watch_step) { + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second; + // fallthrough + case WATCH_STEP_FETCH_CURRENT: + object_player->unwatch(); + break; + case WATCH_STEP_ASSERT_ACTIVE: + break; + } + } + + m_async_op_tracker.wait_for_ops(on_finish); +} + +bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { + ldout(m_cct, 20) << __func__ << dendl; + std::lock_guard locker{m_lock}; + + if (m_state != STATE_PLAYBACK) { + m_handler_notified = false; + return false; + } + + if (!verify_playback_ready()) { + if (!is_object_set_ready()) { + m_handler_notified = false; + } else { + refetch(true); + } + return false; + } + + auto object_player = get_object_player(); + ceph_assert(object_player && !object_player->empty()); + + object_player->front(entry); + object_player->pop_front(); + + uint64_t last_entry_tid; + if (m_journal_metadata->get_last_allocated_entry_tid( + entry->get_tag_tid(), &last_entry_tid) && + entry->get_entry_tid() != last_entry_tid + 1) { + lderr(m_cct) << "missing prior journal entry: " << *entry << dendl; + + m_state = STATE_ERROR; + notify_complete(-ENOMSG); + return false; + } + + advance_splay_object(); + remove_empty_object_player(object_player); + + m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(), + entry->get_entry_tid()); + *commit_tid = m_journal_metadata->allocate_commit_tid( + object_player->get_object_number(), entry->get_tag_tid(), + entry->get_entry_tid()); + return true; +} + +void JournalPlayer::process_state(uint64_t object_number, int r) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", " + << "r=" << r << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (r >= 0) { + switch (m_state) { + case STATE_PREFETCH: + ldout(m_cct, 10) << "PREFETCH" << dendl; + r = process_prefetch(object_number); + break; + case STATE_PLAYBACK: + ldout(m_cct, 10) << "PLAYBACK" << dendl; + r = process_playback(object_number); + break; + case STATE_ERROR: + ldout(m_cct, 10) << "ERROR" << dendl; + break; + default: + lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl; + ceph_abort(); + break; + } + } + + if (r < 0) { + m_state = STATE_ERROR; + notify_complete(r); + } +} + +int JournalPlayer::process_prefetch(uint64_t object_number) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + + PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find( + splay_offset); + if (it == m_prefetch_splay_offsets.end()) { + return 0; + } + + bool prefetch_complete = false; + ceph_assert(m_object_players.count(splay_offset) == 1); + auto object_player = m_object_players[splay_offset]; + + // prefetch in-order since a newer splay object could prefetch first + if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) { + // skip past known committed records + if (m_commit_positions.count(splay_offset) != 0 && + !object_player->empty()) { + ObjectPosition &position = m_commit_positions[splay_offset]; + + ldout(m_cct, 15) << "seeking known commit position " << position << " in " + << object_player->get_oid() << dendl; + + bool found_commit = false; + Entry entry; + while (!object_player->empty()) { + object_player->front(&entry); + + if (entry.get_tag_tid() == position.tag_tid && + entry.get_entry_tid() == position.entry_tid) { + found_commit = true; + } else if (found_commit) { + ldout(m_cct, 10) << "located next uncommitted entry: " << entry + << dendl; + break; + } + + ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl; + m_journal_metadata->reserve_entry_tid(entry.get_tag_tid(), + entry.get_entry_tid()); + object_player->pop_front(); + } + + // do not search for commit position for this object + // if we've already seen it + if (found_commit) { + m_commit_positions.erase(splay_offset); + } + } + + // if the object is empty, pre-fetch the next splay object + if (object_player->empty() && object_player->refetch_required()) { + ldout(m_cct, 10) << "refetching potentially partially decoded object" + << dendl; + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); + fetch(object_player); + } else if (!remove_empty_object_player(object_player)) { + ldout(m_cct, 10) << "prefetch of object complete" << dendl; + prefetch_complete = true; + } + } + + if (!prefetch_complete) { + return 0; + } + + m_prefetch_splay_offsets.erase(it); + if (!m_prefetch_splay_offsets.empty()) { + return 0; + } + + ldout(m_cct, 10) << "switching to playback mode" << dendl; + m_state = STATE_PLAYBACK; + + // if we have a valid commit position, our read should start with + // the next consistent journal entry in the sequence + if (m_commit_position_valid) { + splay_offset = m_commit_position.object_number % splay_width; + object_player = m_object_players[splay_offset]; + + if (object_player->empty()) { + if (!object_player->refetch_required()) { + advance_splay_object(); + } + } else { + Entry entry; + object_player->front(&entry); + if (entry.get_tag_tid() == m_commit_position.tag_tid) { + advance_splay_object(); + } + } + } + + if (verify_playback_ready()) { + notify_entries_available(); + } else if (is_object_set_ready()) { + refetch(false); + } + return 0; +} + +int JournalPlayer::process_playback(uint64_t object_number) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + + if (verify_playback_ready()) { + notify_entries_available(); + } else if (is_object_set_ready()) { + refetch(false); + } + return 0; +} + +bool JournalPlayer::is_object_set_ready() const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_watch_scheduled || !m_fetch_object_numbers.empty()) { + ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl; + return false; + } + + return true; +} + +bool JournalPlayer::verify_playback_ready() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + while (true) { + if (!is_object_set_ready()) { + ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; + return false; + } + + auto object_player = get_object_player(); + ceph_assert(object_player); + uint64_t object_num = object_player->get_object_number(); + + // Verify is the active object player has another entry available + // in the sequence + // NOTE: replay currently does not check tag class to playback multiple tags + // from different classes (issue #14909). When a new tag is discovered, it + // is assumed that the previous tag was closed at the last replayable entry. + Entry entry; + if (!object_player->empty()) { + m_watch_prune_active_tag = false; + object_player->front(&entry); + + if (!m_active_tag_tid) { + ldout(m_cct, 10) << __func__ << ": " + << "object_num=" << object_num << ", " + << "initial tag=" << entry.get_tag_tid() + << dendl; + m_active_tag_tid = entry.get_tag_tid(); + return true; + } else if (entry.get_tag_tid() < *m_active_tag_tid || + (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) { + // entry occurred before the current active tag + ldout(m_cct, 10) << __func__ << ": detected stale entry: " + << "object_num=" << object_num << ", " + << "entry=" << entry << dendl; + prune_tag(entry.get_tag_tid()); + continue; + } else if (entry.get_tag_tid() > *m_active_tag_tid) { + // new tag at current playback position -- implies that previous + // tag ended abruptly without flushing out all records + // search for the start record for the next tag + ldout(m_cct, 10) << __func__ << ": new tag detected: " + << "object_num=" << object_num << ", " + << "active_tag=" << *m_active_tag_tid << ", " + << "new_tag=" << entry.get_tag_tid() << dendl; + if (entry.get_entry_tid() == 0) { + // first entry in new tag -- can promote to active + prune_active_tag(entry.get_tag_tid()); + return true; + } else { + // prune current active and wait for initial entry for new tag + prune_active_tag(boost::none); + continue; + } + } else { + ldout(m_cct, 20) << __func__ << ": " + << "object_num=" << object_num << ", " + << "entry: " << entry << dendl; + ceph_assert(entry.get_tag_tid() == *m_active_tag_tid); + return true; + } + } else { + if (!m_active_tag_tid) { + // waiting for our first entry + ldout(m_cct, 10) << __func__ << ": waiting for first entry: " + << "object_num=" << object_num << dendl; + return false; + } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) { + ldout(m_cct, 10) << __func__ << ": no more entries" << dendl; + return false; + } else if (m_watch_enabled && m_watch_prune_active_tag) { + // detected current tag is now longer active and we have re-read the + // current object but it's still empty, so this tag is done + ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: " + << "object_num=" << object_num << ", " + << "active_tag " << *m_active_tag_tid << dendl; + prune_active_tag(boost::none); + continue; + } else if (object_player->refetch_required()) { + // if the active object requires a refetch, don't proceed looking for a + // new tag before this process completes + ldout(m_cct, 10) << __func__ << ": refetch required: " + << "object_num=" << object_num << dendl; + return false; + } else if (!m_watch_enabled) { + // current playback position is empty so this tag is done + ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: " + << "object_num=" << object_num << ", " + << "active_tag=" << *m_active_tag_tid << dendl; + prune_active_tag(boost::none); + continue; + } else if (!m_watch_scheduled) { + // no more entries and we don't have an active watch in-progress + ldout(m_cct, 10) << __func__ << ": no more entries -- watch required" + << dendl; + return false; + } + } + } + return false; +} + +void JournalPlayer::prune_tag(uint64_t tag_tid) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag " + << tag_tid << dendl; + + // prune records that are at or below the largest prune tag tid + if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) { + m_prune_tag_tid = tag_tid; + } + + bool pruned = false; + for (const auto &player_pair : m_object_players) { + auto& object_player = player_pair.second; + ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid() + << dendl; + while (!object_player->empty()) { + Entry entry; + object_player->front(&entry); + if (entry.get_tag_tid() == tag_tid) { + ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl; + object_player->pop_front(); + pruned = true; + } else { + break; + } + } + } + + // avoid watch delay when pruning stale tags from journal objects + if (pruned) { + ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate" + << dendl; + for (const auto &player_pair : m_object_players) { + auto& object_player = player_pair.second; + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE); + } + } + + // trim empty player to prefetch the next available object + for (const auto &player_pair : m_object_players) { + remove_empty_object_player(player_pair.second); + } +} + +void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_active_tag_tid); + + uint64_t active_tag_tid = *m_active_tag_tid; + if (tag_tid) { + m_active_tag_tid = tag_tid; + } + m_splay_offset = 0; + m_watch_step = WATCH_STEP_FETCH_CURRENT; + + prune_tag(active_tag_tid); +} + +ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player() const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + SplayedObjectPlayers::const_iterator it = m_object_players.find( + m_splay_offset); + ceph_assert(it != m_object_players.end()); + return it->second; +} + +ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player(uint64_t object_number) const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + auto splay_it = m_object_players.find(splay_offset); + ceph_assert(splay_it != m_object_players.end()); + + auto object_player = splay_it->second; + ceph_assert(object_player->get_object_number() == object_number); + return object_player; +} + +void JournalPlayer::advance_splay_object() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ++m_splay_offset; + m_splay_offset %= m_journal_metadata->get_splay_width(); + m_watch_step = WATCH_STEP_FETCH_CURRENT; + ldout(m_cct, 20) << __func__ << ": new offset " + << static_cast<uint32_t>(m_splay_offset) << dendl; +} + +bool JournalPlayer::remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &player) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_watch_scheduled); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t object_set = player->get_object_number() / splay_width; + uint64_t active_set = m_journal_metadata->get_active_set(); + if (!player->empty() || object_set == active_set) { + return false; + } else if (player->refetch_required()) { + ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires " + << "a refetch" << dendl; + return false; + } else if (m_active_set != active_set) { + ldout(m_cct, 20) << __func__ << ": new active set detected, all players " + << "require refetch" << dendl; + m_active_set = active_set; + for (const auto& pair : m_object_players) { + pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE); + } + return false; + } + + ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty" + << dendl; + + m_watch_prune_active_tag = false; + m_watch_step = WATCH_STEP_FETCH_CURRENT; + + uint64_t next_object_num = player->get_object_number() + splay_width; + fetch(next_object_num); + return true; +} + +void JournalPlayer::fetch(uint64_t object_num) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto object_player = ceph::make_ref<ObjectPlayer>( + m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(), + m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(), + m_max_fetch_bytes); + + auto splay_width = m_journal_metadata->get_splay_width(); + m_object_players[object_num % splay_width] = object_player; + fetch(object_player); +} + +void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint64_t object_num = object_player->get_object_number(); + std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); + ceph_assert(m_fetch_object_numbers.count(object_num) == 0); + m_fetch_object_numbers.insert(object_num); + + ldout(m_cct, 10) << __func__ << ": " << oid << dendl; + C_Fetch *fetch_ctx = new C_Fetch(this, object_num); + + object_player->fetch(fetch_ctx); +} + +void JournalPlayer::handle_fetched(uint64_t object_num, int r) { + ldout(m_cct, 10) << __func__ << ": " + << utils::get_object_name(m_object_oid_prefix, object_num) + << ": r=" << r << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_fetch_object_numbers.count(object_num) == 1); + m_fetch_object_numbers.erase(object_num); + + if (m_shut_down) { + return; + } + + if (r == 0) { + auto object_player = get_object_player(object_num); + remove_empty_object_player(object_player); + } + process_state(object_num, r); +} + +void JournalPlayer::refetch(bool immediate) { + ldout(m_cct, 10) << __func__ << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + m_handler_notified = false; + + // if watching the object, handle the periodic re-fetch + if (m_watch_enabled) { + schedule_watch(immediate); + return; + } + + auto object_player = get_object_player(); + if (object_player->refetch_required()) { + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); + fetch(object_player); + return; + } + + notify_complete(0); +} + +void JournalPlayer::schedule_watch(bool immediate) { + ldout(m_cct, 10) << __func__ << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_watch_scheduled) { + return; + } + + m_watch_scheduled = true; + + if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) { + // detect if a new tag has been created in case we are blocked + // by an incomplete tag sequence + ldout(m_cct, 20) << __func__ << ": asserting active tag=" + << *m_active_tag_tid << dendl; + + m_async_op_tracker.start_op(); + auto ctx = new LambdaContext([this](int r) { + handle_watch_assert_active(r); + }); + m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx); + return; + } + + ceph::ref_t<ObjectPlayer> object_player; + double watch_interval = m_watch_interval; + + switch (m_watch_step) { + case WATCH_STEP_FETCH_CURRENT: + { + object_player = get_object_player(); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t object_set = object_player->get_object_number() / splay_width; + if (immediate || + (object_player->get_refetch_state() == + ObjectPlayer::REFETCH_STATE_IMMEDIATE) || + (object_set < active_set && object_player->refetch_required())) { + ldout(m_cct, 20) << __func__ << ": immediately refetching " + << object_player->get_oid() + << dendl; + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); + watch_interval = 0; + } + } + break; + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second; + watch_interval = 0; + break; + default: + ceph_abort(); + } + + ldout(m_cct, 20) << __func__ << ": scheduling watch on " + << object_player->get_oid() << dendl; + Context *ctx = utils::create_async_context_callback( + m_journal_metadata, new C_Watch(this, object_player->get_object_number())); + object_player->watch(ctx, watch_interval); +} + +void JournalPlayer::handle_watch(uint64_t object_num, int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + std::lock_guard locker{m_lock}; + ceph_assert(m_watch_scheduled); + m_watch_scheduled = false; + + if (m_shut_down || r == -ECANCELED) { + // unwatch of object player(s) + return; + } + + auto object_player = get_object_player(object_num); + if (r == 0 && object_player->empty()) { + // possibly need to prune this empty object player if we've + // already fetched it after the active set was advanced with no + // new records + remove_empty_object_player(object_player); + } + + // determine what object to query on next watch schedule tick + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (m_watch_step == WATCH_STEP_FETCH_CURRENT && + object_player->get_object_number() % splay_width != 0) { + m_watch_step = WATCH_STEP_FETCH_FIRST; + } else if (m_active_tag_tid) { + m_watch_step = WATCH_STEP_ASSERT_ACTIVE; + } else { + m_watch_step = WATCH_STEP_FETCH_CURRENT; + } + + process_state(object_num, r); +} + +void JournalPlayer::handle_watch_assert_active(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_watch_scheduled); + m_watch_scheduled = false; + + if (r == -ESTALE) { + // newer tag exists -- since we are at this step in the watch sequence, + // we know we can prune the active tag if watch fails again + ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " " + << "no longer active" << dendl; + m_watch_prune_active_tag = true; + } + + m_watch_step = WATCH_STEP_FETCH_CURRENT; + if (!m_shut_down && m_watch_enabled) { + schedule_watch(false); + } + m_async_op_tracker.finish_op(); +} + +void JournalPlayer::notify_entries_available() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_handler_notified) { + return; + } + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_journal_metadata->queue(new C_HandleEntriesAvailable(m_replay_handler), 0); +} + +void JournalPlayer::notify_complete(int r) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; + m_journal_metadata->queue(new C_HandleComplete(m_replay_handler), r); +} + +void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) { + std::lock_guard locker{m_lock}; + + if (m_state == STATE_ERROR || m_shut_down) { + return; + } + + auto splay_width = m_journal_metadata->get_splay_width(); + m_max_fetch_bytes = p2align<uint64_t>(new_cache_bytes / splay_width, 4096); + + ldout(m_cct, 10) << __func__ << ": new_cache_bytes=" << new_cache_bytes + << ", max_fetch_bytes=" << m_max_fetch_bytes << dendl; + + uint64_t min_bytes = MIN_FETCH_BYTES; + + if (m_state == STATE_WAITCACHE) { + m_state = STATE_INIT; + if (m_max_fetch_bytes >= min_bytes) { + m_async_op_tracker.start_op(); + auto ctx = new LambdaContext( + [this](int r) { + prefetch(); + m_async_op_tracker.finish_op(); + }); + m_journal_metadata->queue(ctx, 0); + return; + } + } else { + min_bytes = p2align<uint64_t>(min_bytes - (rand() % min_bytes) / 2, 4096); + } + + if (m_max_fetch_bytes < min_bytes) { + lderr(m_cct) << __func__ << ": can't allocate enough memory from cache" + << dendl; + m_state = STATE_ERROR; + notify_complete(-ENOMEM); + return; + } + + for (auto &pair : m_object_players) { + pair.second->set_max_fetch_bytes(m_max_fetch_bytes); + } +} + + +} // namespace journal diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h new file mode 100644 index 000000000..a71117a83 --- /dev/null +++ b/src/journal/JournalPlayer.h @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H +#define CEPH_JOURNAL_JOURNAL_PLAYER_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/AsyncOpTracker.h" +#include "common/Timer.h" +#include "journal/JournalMetadata.h" +#include "journal/ObjectPlayer.h" +#include "journal/Types.h" +#include "cls/journal/cls_journal_types.h" +#include <boost/none.hpp> +#include <boost/optional.hpp> +#include <map> + +namespace journal { + +class CacheManagerHandler; +class Entry; +class ReplayHandler; + +class JournalPlayer { +public: + typedef cls::journal::ObjectPosition ObjectPosition; + typedef cls::journal::ObjectPositions ObjectPositions; + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + + JournalPlayer(librados::IoCtx &ioctx, std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + ReplayHandler* replay_handler, + CacheManagerHandler *cache_manager_handler); + ~JournalPlayer(); + + void prefetch(); + void prefetch_and_watch(double interval); + void shut_down(Context *on_finish); + + bool try_pop_front(Entry *entry, uint64_t *commit_tid); + +private: + typedef std::set<uint8_t> PrefetchSplayOffsets; + typedef std::map<uint8_t, ceph::ref_t<ObjectPlayer>> SplayedObjectPlayers; + typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions; + typedef std::set<uint64_t> ObjectNumbers; + + enum State { + STATE_INIT, + STATE_WAITCACHE, + STATE_PREFETCH, + STATE_PLAYBACK, + STATE_ERROR + }; + + enum WatchStep { + WATCH_STEP_FETCH_CURRENT, + WATCH_STEP_FETCH_FIRST, + WATCH_STEP_ASSERT_ACTIVE + }; + + struct C_Fetch : public Context { + JournalPlayer *player; + uint64_t object_num; + C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) { + player->m_async_op_tracker.start_op(); + } + ~C_Fetch() override { + player->m_async_op_tracker.finish_op(); + } + void finish(int r) override { + player->handle_fetched(object_num, r); + } + }; + + struct C_Watch : public Context { + JournalPlayer *player; + uint64_t object_num; + C_Watch(JournalPlayer *player, uint64_t object_num) + : player(player), object_num(object_num) { + player->m_async_op_tracker.start_op(); + } + ~C_Watch() override { + player->m_async_op_tracker.finish_op(); + } + + void finish(int r) override { + player->handle_watch(object_num, r); + } + }; + + struct CacheRebalanceHandler : public journal::CacheRebalanceHandler { + JournalPlayer *player; + + CacheRebalanceHandler(JournalPlayer *player) : player(player) { + } + + void handle_cache_rebalanced(uint64_t new_cache_bytes) override { + player->handle_cache_rebalanced(new_cache_bytes); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct = nullptr; + std::string m_object_oid_prefix; + ceph::ref_t<JournalMetadata> m_journal_metadata; + ReplayHandler* m_replay_handler; + CacheManagerHandler *m_cache_manager_handler; + + std::string m_cache_name; + CacheRebalanceHandler m_cache_rebalance_handler; + uint64_t m_max_fetch_bytes; + + AsyncOpTracker m_async_op_tracker; + + mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock"); + State m_state = STATE_INIT; + uint8_t m_splay_offset = 0; + + bool m_watch_enabled = false; + bool m_watch_scheduled = false; + double m_watch_interval = 0; + WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT; + bool m_watch_prune_active_tag = false; + + bool m_shut_down = false; + bool m_handler_notified = false; + + ObjectNumbers m_fetch_object_numbers; + + PrefetchSplayOffsets m_prefetch_splay_offsets; + SplayedObjectPlayers m_object_players; + + bool m_commit_position_valid = false; + ObjectPosition m_commit_position; + SplayedObjectPositions m_commit_positions; + uint64_t m_active_set = 0; + + boost::optional<uint64_t> m_active_tag_tid = boost::none; + boost::optional<uint64_t> m_prune_tag_tid = boost::none; + + void advance_splay_object(); + + bool is_object_set_ready() const; + bool verify_playback_ready(); + void prune_tag(uint64_t tag_tid); + void prune_active_tag(const boost::optional<uint64_t>& tag_tid); + + ceph::ref_t<ObjectPlayer> get_object_player() const; + ceph::ref_t<ObjectPlayer> get_object_player(uint64_t object_number) const; + bool remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &object_player); + + void process_state(uint64_t object_number, int r); + int process_prefetch(uint64_t object_number); + int process_playback(uint64_t object_number); + + void fetch(uint64_t object_num); + void fetch(const ceph::ref_t<ObjectPlayer> &object_player); + void handle_fetched(uint64_t object_num, int r); + void refetch(bool immediate); + + void schedule_watch(bool immediate); + void handle_watch(uint64_t object_num, int r); + void handle_watch_assert_active(int r); + + void notify_entries_available(); + void notify_complete(int r); + + void handle_cache_rebalanced(uint64_t new_cache_bytes); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_PLAYER_H diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc new file mode 100644 index 000000000..0304ae777 --- /dev/null +++ b/src/journal/JournalRecorder.cc @@ -0,0 +1,434 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalRecorder.h" +#include "common/errno.h" +#include "journal/Entry.h" +#include "journal/Utils.h" + +#include <atomic> + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \ + << ": " + +using std::shared_ptr; + +namespace journal { + +namespace { + +struct C_Flush : public Context { + ceph::ref_t<JournalMetadata> journal_metadata; + Context *on_finish; + std::atomic<int64_t> pending_flushes{0}; + int ret_val = 0; + + C_Flush(ceph::ref_t<JournalMetadata> _journal_metadata, Context *_on_finish, + size_t _pending_flushes) + : journal_metadata(std::move(_journal_metadata)), + on_finish(_on_finish), + pending_flushes(_pending_flushes) { + } + + void complete(int r) override { + if (r < 0 && ret_val == 0) { + ret_val = r; + } + if (--pending_flushes == 0) { + // ensure all prior callback have been flushed as well + journal_metadata->queue(on_finish, ret_val); + delete this; + } + } + void finish(int r) override { + } +}; + +} // anonymous namespace + +JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, + std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + uint64_t max_in_flight_appends) + : m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(std::move(journal_metadata)), + m_max_in_flight_appends(max_in_flight_appends), + m_listener(this), + m_object_handler(this), + m_current_set(m_journal_metadata->get_active_set()), + m_object_locks{ceph::make_lock_container<ceph::mutex>( + m_journal_metadata->get_splay_width(), [](const size_t splay_offset) { + return ceph::make_mutex("ObjectRecorder::m_lock::" + + std::to_string(splay_offset)); + })} +{ + std::lock_guard locker{m_lock}; + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + uint64_t object_number = splay_offset + (m_current_set * splay_width); + std::lock_guard locker{m_object_locks[splay_offset]}; + m_object_ptrs[splay_offset] = create_object_recorder( + object_number, &m_object_locks[splay_offset]); + } + + m_journal_metadata->add_listener(&m_listener); +} + +JournalRecorder::~JournalRecorder() { + m_journal_metadata->remove_listener(&m_listener); + + std::lock_guard locker{m_lock}; + ceph_assert(m_in_flight_advance_sets == 0); + ceph_assert(m_in_flight_object_closes == 0); +} + +void JournalRecorder::shut_down(Context *on_safe) { + on_safe = new LambdaContext( + [this, on_safe](int r) { + Context *ctx = nullptr; + { + std::lock_guard locker{m_lock}; + if (m_in_flight_advance_sets != 0) { + ceph_assert(m_on_object_set_advanced == nullptr); + m_on_object_set_advanced = new LambdaContext( + [on_safe, r](int) { + on_safe->complete(r); + }); + } else { + ctx = on_safe; + } + } + if (ctx != nullptr) { + ctx->complete(r); + } + }); + flush(on_safe); +} + +void JournalRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + std::lock_guard locker{m_lock}; + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + std::lock_guard object_locker{m_object_locks[splay_offset]}; + auto object_recorder = get_object(splay_offset); + object_recorder->set_append_batch_options(flush_interval, flush_bytes, + flush_age); + } +} + +Future JournalRecorder::append(uint64_t tag_tid, + const bufferlist &payload_bl) { + ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl; + + m_lock.lock(); + + uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = entry_tid % splay_width; + + auto object_ptr = get_object(splay_offset); + uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( + object_ptr->get_object_number(), tag_tid, entry_tid); + auto future = ceph::make_ref<FutureImpl>(tag_tid, entry_tid, commit_tid); + future->init(m_prev_future); + m_prev_future = future; + + m_object_locks[splay_offset].lock(); + m_lock.unlock(); + + bufferlist entry_bl; + encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), + entry_bl); + ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size()); + + bool object_full = object_ptr->append({{future, entry_bl}}); + m_object_locks[splay_offset].unlock(); + + if (object_full) { + ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" + << dendl; + std::lock_guard l{m_lock}; + close_and_advance_object_set(object_ptr->get_object_number() / splay_width); + } + return Future(future); +} + +void JournalRecorder::flush(Context *on_safe) { + ldout(m_cct, 20) << dendl; + + C_Flush *ctx; + { + std::lock_guard locker{m_lock}; + + ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); + for (const auto& p : m_object_ptrs) { + p.second->flush(ctx); + } + + } + + // avoid holding the lock in case there is nothing to flush + ctx->complete(0); +} + +ceph::ref_t<ObjectRecorder> JournalRecorder::get_object(uint8_t splay_offset) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + const auto& object_recoder = m_object_ptrs.at(splay_offset); + ceph_assert(object_recoder); + return object_recoder; +} + +void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + // entry overflow from open object + if (m_current_set != object_set) { + ldout(m_cct, 20) << "close already in-progress" << dendl; + return; + } + + // we shouldn't overflow upon append if already closed and we + // shouldn't receive an overflowed callback if already closed + ceph_assert(m_in_flight_advance_sets == 0); + ceph_assert(m_in_flight_object_closes == 0); + + uint64_t active_set = m_journal_metadata->get_active_set(); + ceph_assert(m_current_set == active_set); + ++m_current_set; + ++m_in_flight_advance_sets; + + ldout(m_cct, 10) << "closing active object set " << object_set << dendl; + if (close_object_set(m_current_set)) { + advance_object_set(); + } +} + +void JournalRecorder::advance_object_set() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + ceph_assert(m_in_flight_object_closes == 0); + ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl; + m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( + this)); +} + +void JournalRecorder::handle_advance_object_set(int r) { + Context *on_object_set_advanced = nullptr; + { + std::lock_guard locker{m_lock}; + ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; + + ceph_assert(m_in_flight_advance_sets > 0); + --m_in_flight_advance_sets; + + if (r < 0 && r != -ESTALE) { + lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r) + << dendl; + } + + if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { + open_object_set(); + std::swap(on_object_set_advanced, m_on_object_set_advanced); + } + } + if (on_object_set_advanced != nullptr) { + on_object_set_advanced->complete(0); + } +} + +void JournalRecorder::open_object_set() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + ldout(m_cct, 10) << "opening object set " << m_current_set << dendl; + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + bool overflowed = false; + + auto lockers{lock_object_recorders()}; + for (const auto& p : m_object_ptrs) { + const auto& object_recorder = p.second; + uint64_t object_number = object_recorder->get_object_number(); + if (object_number / splay_width != m_current_set) { + ceph_assert(object_recorder->is_closed()); + + // ready to close object and open object in active set + if (create_next_object_recorder(object_recorder)) { + overflowed = true; + } + } + } + lockers.clear(); + + if (overflowed) { + ldout(m_cct, 10) << "object set " << m_current_set << " now full" << dendl; + ldout(m_cct, 10) << "" << dendl; + close_and_advance_object_set(m_current_set); + } +} + +bool JournalRecorder::close_object_set(uint64_t active_set) { + ldout(m_cct, 10) << "active_set=" << active_set << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + + // object recorders will invoke overflow handler as they complete + // closing the object to ensure correct order of future appends + uint8_t splay_width = m_journal_metadata->get_splay_width(); + auto lockers{lock_object_recorders()}; + for (const auto& p : m_object_ptrs) { + const auto& object_recorder = p.second; + if (object_recorder->get_object_number() / splay_width != active_set) { + ldout(m_cct, 10) << "closing object " << object_recorder->get_oid() + << dendl; + // flush out all queued appends and hold future appends + if (!object_recorder->close()) { + ++m_in_flight_object_closes; + ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " " + << "close in-progress" << dendl; + } else { + ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed" + << dendl; + } + } + } + return (m_in_flight_object_closes == 0); +} + +ceph::ref_t<ObjectRecorder> JournalRecorder::create_object_recorder( + uint64_t object_number, ceph::mutex* lock) { + ldout(m_cct, 10) << "object_number=" << object_number << dendl; + auto object_recorder = ceph::make_ref<ObjectRecorder>( + m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), + object_number, lock, m_journal_metadata->get_work_queue(), + &m_object_handler, m_journal_metadata->get_order(), + m_max_in_flight_appends); + object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); + return object_recorder; +} + +bool JournalRecorder::create_next_object_recorder( + ceph::ref_t<ObjectRecorder> object_recorder) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint64_t object_number = object_recorder->get_object_number(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + ldout(m_cct, 10) << "object_number=" << object_number << dendl; + + ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset])); + + auto new_object_recorder = create_object_recorder( + (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]); + + ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", " + << "new oid=" << new_object_recorder->get_oid() << dendl; + AppendBuffers append_buffers; + object_recorder->claim_append_buffers(&append_buffers); + + // update the commit record to point to the correct object number + for (auto &append_buffer : append_buffers) { + m_journal_metadata->overflow_commit_tid( + append_buffer.first->get_commit_tid(), + new_object_recorder->get_object_number()); + } + + bool object_full = new_object_recorder->append(std::move(append_buffers)); + if (object_full) { + ldout(m_cct, 10) << "object " << new_object_recorder->get_oid() << " " + << "now full" << dendl; + } + + m_object_ptrs[splay_offset] = std::move(new_object_recorder); + return object_full; +} + +void JournalRecorder::handle_update() { + std::lock_guard locker{m_lock}; + + uint64_t active_set = m_journal_metadata->get_active_set(); + if (m_current_set < active_set) { + // peer journal client advanced the active set + ldout(m_cct, 10) << "current_set=" << m_current_set << ", " + << "active_set=" << active_set << dendl; + + uint64_t current_set = m_current_set; + m_current_set = active_set; + if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { + ldout(m_cct, 10) << "closing current object set " << current_set << dendl; + if (close_object_set(active_set)) { + open_object_set(); + } + } + } +} + +void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { + ldout(m_cct, 10) << object_recorder->get_oid() << dendl; + + std::lock_guard locker{m_lock}; + + uint64_t object_number = object_recorder->get_object_number(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + auto& active_object_recorder = m_object_ptrs.at(splay_offset); + ceph_assert(active_object_recorder->get_object_number() == object_number); + + ceph_assert(m_in_flight_object_closes > 0); + --m_in_flight_object_closes; + + // object closed after advance active set committed + ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() + << " closed" << dendl; + if (m_in_flight_object_closes == 0) { + if (m_in_flight_advance_sets == 0) { + // peer forced closing of object set + open_object_set(); + } else { + // local overflow advanced object set + advance_object_set(); + } + } +} + +void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { + ldout(m_cct, 10) << object_recorder->get_oid() << dendl; + + std::lock_guard locker{m_lock}; + + uint64_t object_number = object_recorder->get_object_number(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + auto& active_object_recorder = m_object_ptrs.at(splay_offset); + ceph_assert(active_object_recorder->get_object_number() == object_number); + + ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() + << " overflowed" << dendl; + close_and_advance_object_set(object_number / splay_width); +} + +JournalRecorder::Lockers JournalRecorder::lock_object_recorders() { + Lockers lockers; + lockers.reserve(m_object_ptrs.size()); + for (auto& lock : m_object_locks) { + lockers.emplace_back(lock); + } + return lockers; +} + +} // namespace journal diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h new file mode 100644 index 000000000..9d8ea6c10 --- /dev/null +++ b/src/journal/JournalRecorder.h @@ -0,0 +1,128 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H +#define CEPH_JOURNAL_JOURNAL_RECORDER_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/ceph_mutex.h" +#include "common/containers.h" +#include "common/Timer.h" +#include "journal/Future.h" +#include "journal/FutureImpl.h" +#include "journal/JournalMetadata.h" +#include "journal/ObjectRecorder.h" +#include <map> +#include <string> + +namespace journal { + +class JournalRecorder { +public: + JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + uint64_t max_in_flight_appends); + ~JournalRecorder(); + + void shut_down(Context *on_safe); + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + + Future append(uint64_t tag_tid, const bufferlist &bl); + void flush(Context *on_safe); + + ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset); + +private: + typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs; + typedef std::vector<std::unique_lock<ceph::mutex>> Lockers; + + struct Listener : public JournalMetadataListener { + JournalRecorder *journal_recorder; + + Listener(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) {} + + void handle_update(JournalMetadata *) override { + journal_recorder->handle_update(); + } + }; + + struct ObjectHandler : public ObjectRecorder::Handler { + JournalRecorder *journal_recorder; + + ObjectHandler(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) { + } + + void closed(ObjectRecorder *object_recorder) override { + journal_recorder->handle_closed(object_recorder); + } + void overflow(ObjectRecorder *object_recorder) override { + journal_recorder->handle_overflow(object_recorder); + } + }; + + struct C_AdvanceObjectSet : public Context { + JournalRecorder *journal_recorder; + + C_AdvanceObjectSet(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) { + } + void finish(int r) override { + journal_recorder->handle_advance_object_set(r); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct = nullptr; + std::string m_object_oid_prefix; + + ceph::ref_t<JournalMetadata> m_journal_metadata; + + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; + uint64_t m_max_in_flight_appends; + + Listener m_listener; + ObjectHandler m_object_handler; + + ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock"); + + uint32_t m_in_flight_advance_sets = 0; + uint32_t m_in_flight_object_closes = 0; + uint64_t m_current_set; + ObjectRecorderPtrs m_object_ptrs; + ceph::containers::tiny_vector<ceph::mutex> m_object_locks; + + ceph::ref_t<FutureImpl> m_prev_future; + + Context *m_on_object_set_advanced = nullptr; + + void open_object_set(); + bool close_object_set(uint64_t active_set); + + void advance_object_set(); + void handle_advance_object_set(int r); + + void close_and_advance_object_set(uint64_t object_set); + + ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number, + ceph::mutex* lock); + bool create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder); + + void handle_update(); + + void handle_closed(ObjectRecorder *object_recorder); + void handle_overflow(ObjectRecorder *object_recorder); + + Lockers lock_object_recorders(); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc new file mode 100644 index 000000000..3ad4f3b09 --- /dev/null +++ b/src/journal/JournalTrimmer.cc @@ -0,0 +1,247 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalTrimmer.h" +#include "journal/Utils.h" +#include "common/Cond.h" +#include "common/errno.h" +#include <limits> + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalTrimmer: " << this << " " + +namespace journal { + +struct JournalTrimmer::C_RemoveSet : public Context { + JournalTrimmer *journal_trimmer; + uint64_t object_set; + ceph::mutex lock = ceph::make_mutex("JournalTrimmer::m_lock"); + uint32_t refs; + int return_value; + + C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set, + uint8_t _splay_width); + void complete(int r) override; + void finish(int r) override { + journal_trimmer->handle_set_removed(r, object_set); + journal_trimmer->m_async_op_tracker.finish_op(); + } +}; + +JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, + const std::string &object_oid_prefix, + const ceph::ref_t<JournalMetadata>& journal_metadata) + : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(journal_metadata), m_metadata_listener(this), + m_remove_set_pending(false), + m_remove_set(0), m_remove_set_ctx(NULL) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + + m_journal_metadata->add_listener(&m_metadata_listener); +} + +JournalTrimmer::~JournalTrimmer() { + ceph_assert(m_shutdown); +} + +void JournalTrimmer::shut_down(Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + { + std::lock_guard locker{m_lock}; + ceph_assert(!m_shutdown); + m_shutdown = true; + } + + m_journal_metadata->remove_listener(&m_metadata_listener); + + // chain the shut down sequence (reverse order) + on_finish = new LambdaContext([this, on_finish](int r) { + m_async_op_tracker.wait_for_ops(on_finish); + }); + m_journal_metadata->flush_commit_position(on_finish); +} + +void JournalTrimmer::remove_objects(bool force, Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + + on_finish = new LambdaContext([this, force, on_finish](int r) { + std::lock_guard locker{m_lock}; + + if (m_remove_set_pending) { + on_finish->complete(-EBUSY); + } + + if (!force) { + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); + + if (registered_clients.size() == 0) { + on_finish->complete(-EINVAL); + return; + } else if (registered_clients.size() > 1) { + on_finish->complete(-EBUSY); + return; + } + } + + m_remove_set = std::numeric_limits<uint64_t>::max(); + m_remove_set_pending = true; + m_remove_set_ctx = on_finish; + + remove_set(m_journal_metadata->get_minimum_set()); + }); + + m_async_op_tracker.wait_for_ops(on_finish); +} + +void JournalTrimmer::committed(uint64_t commit_tid) { + ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl; + m_journal_metadata->committed(commit_tid, + m_create_commit_position_safe_context); +} + +void JournalTrimmer::trim_objects(uint64_t minimum_set) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl; + if (minimum_set <= m_journal_metadata->get_minimum_set()) { + return; + } + + if (m_remove_set_pending) { + m_remove_set = std::max(m_remove_set, minimum_set); + return; + } + + m_remove_set = minimum_set; + m_remove_set_pending = true; + remove_set(m_journal_metadata->get_minimum_set()); +} + +void JournalTrimmer::remove_set(uint64_t object_set) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + m_async_op_tracker.start_op(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width); + + ldout(m_cct, 20) << __func__ << ": removing object set " << object_set + << dendl; + for (uint64_t object_number = object_set * splay_width; + object_number < (object_set + 1) * splay_width; + ++object_number) { + std::string oid = utils::get_object_name(m_object_oid_prefix, + object_number); + + ldout(m_cct, 20) << "removing journal object " << oid << dendl; + auto comp = + librados::Rados::aio_create_completion(ctx, utils::rados_ctx_callback); + int r = m_ioctx.aio_remove(oid, comp, + CEPH_OSD_FLAG_FULL_FORCE | CEPH_OSD_FLAG_FULL_TRY); + ceph_assert(r == 0); + comp->release(); + } +} + +void JournalTrimmer::handle_metadata_updated() { + ldout(m_cct, 20) << __func__ << dendl; + + std::lock_guard locker{m_lock}; + + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t minimum_commit_set = active_set; + std::string minimum_client_id; + + for (auto &client : registered_clients) { + if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + continue; + } + + if (client.commit_position.object_positions.empty()) { + // client hasn't recorded any commits + minimum_commit_set = minimum_set; + minimum_client_id = client.id; + break; + } + + for (auto &position : client.commit_position.object_positions) { + uint64_t object_set = position.object_number / splay_width; + if (object_set < minimum_commit_set) { + minimum_client_id = client.id; + minimum_commit_set = object_set; + } + } + } + + if (minimum_commit_set > minimum_set) { + trim_objects(minimum_commit_set); + } else { + ldout(m_cct, 20) << "object set " << minimum_commit_set << " still " + << "in-use by client " << minimum_client_id << dendl; + } +} + +void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) { + ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", " + << "trim=" << m_remove_set << dendl; + + std::lock_guard locker{m_lock}; + m_remove_set_pending = false; + + if (r == -ENOENT) { + // no objects within the set existed + r = 0; + } + if (r == 0) { + // advance the minimum set to the next set + m_journal_metadata->set_minimum_set(object_set + 1); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); + + if (m_remove_set > minimum_set && minimum_set <= active_set) { + m_remove_set_pending = true; + remove_set(minimum_set); + } + } + + if (m_remove_set_ctx != nullptr && !m_remove_set_pending) { + ldout(m_cct, 20) << "completing remove set context" << dendl; + m_remove_set_ctx->complete(r); + m_remove_set_ctx = nullptr; + } +} + +JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer, + uint64_t _object_set, + uint8_t _splay_width) + : journal_trimmer(_journal_trimmer), object_set(_object_set), + lock(ceph::make_mutex(utils::unique_lock_name("C_RemoveSet::lock", this))), + refs(_splay_width), return_value(-ENOENT) { +} + +void JournalTrimmer::C_RemoveSet::complete(int r) { + lock.lock(); + if (r < 0 && r != -ENOENT && + (return_value == -ENOENT || return_value == 0)) { + return_value = r; + } else if (r == 0 && return_value == -ENOENT) { + return_value = 0; + } + + if (--refs == 0) { + finish(return_value); + lock.unlock(); + delete this; + } else { + lock.unlock(); + } +} + +} // namespace journal diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h new file mode 100644 index 000000000..9c74961c9 --- /dev/null +++ b/src/journal/JournalTrimmer.h @@ -0,0 +1,93 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_TRIMMER_H +#define CEPH_JOURNAL_JOURNAL_TRIMMER_H + +#include "include/int_types.h" +#include "include/rados/librados.hpp" +#include "include/Context.h" +#include "common/AsyncOpTracker.h" +#include "journal/JournalMetadata.h" +#include "cls/journal/cls_journal_types.h" +#include <functional> + +struct Context; + +namespace journal { + +class JournalTrimmer { +public: + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + + JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, + const ceph::ref_t<JournalMetadata> &journal_metadata); + ~JournalTrimmer(); + + void shut_down(Context *on_finish); + + void remove_objects(bool force, Context *on_finish); + void committed(uint64_t commit_tid); + +private: + typedef std::function<Context*()> CreateContext; + + struct MetadataListener : public JournalMetadataListener { + JournalTrimmer *journal_trimmer; + + MetadataListener(JournalTrimmer *journal_trimmer) + : journal_trimmer(journal_trimmer) { + } + void handle_update(JournalMetadata *) override { + journal_trimmer->handle_metadata_updated(); + } + }; + + struct C_CommitPositionSafe : public Context { + JournalTrimmer *journal_trimmer; + + C_CommitPositionSafe(JournalTrimmer *_journal_trimmer) + : journal_trimmer(_journal_trimmer) { + journal_trimmer->m_async_op_tracker.start_op(); + } + ~C_CommitPositionSafe() override { + journal_trimmer->m_async_op_tracker.finish_op(); + } + + void finish(int r) override { + } + }; + + struct C_RemoveSet; + + librados::IoCtx m_ioctx; + CephContext *m_cct; + std::string m_object_oid_prefix; + + ceph::ref_t<JournalMetadata> m_journal_metadata; + MetadataListener m_metadata_listener; + + AsyncOpTracker m_async_op_tracker; + + ceph::mutex m_lock = ceph::make_mutex("JournalTrimmer::m_lock"); + + bool m_remove_set_pending; + uint64_t m_remove_set; + Context *m_remove_set_ctx; + + bool m_shutdown = false; + + CreateContext m_create_commit_position_safe_context = [this]() { + return new C_CommitPositionSafe(this); + }; + + void trim_objects(uint64_t minimum_set); + void remove_set(uint64_t object_set); + + void handle_metadata_updated(); + void handle_set_removed(int r, uint64_t object_set); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_TRIMMER_H diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc new file mode 100644 index 000000000..1838d633a --- /dev/null +++ b/src/journal/Journaler.cc @@ -0,0 +1,462 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Journaler.h" +#include "include/stringify.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "journal/Entry.h" +#include "journal/FutureImpl.h" +#include "journal/JournalMetadata.h" +#include "journal/JournalPlayer.h" +#include "journal/JournalRecorder.h" +#include "journal/JournalTrimmer.h" +#include "journal/ReplayEntry.h" +#include "journal/ReplayHandler.h" +#include "cls/journal/cls_journal_client.h" +#include "cls/journal/cls_journal_types.h" +#include "Utils.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "Journaler: " << this << " " + +namespace journal { + +namespace { + +static const std::string JOURNAL_HEADER_PREFIX = "journal."; +static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; + +} // anonymous namespace + +using namespace cls::journal; +using utils::rados_ctx_callback; + +std::string Journaler::header_oid(const std::string &journal_id) { + return JOURNAL_HEADER_PREFIX + journal_id; +} + +std::string Journaler::object_oid_prefix(int pool_id, + const std::string &journal_id) { + return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + "."; +} + +Journaler::Threads::Threads(CephContext *cct) { + thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1); + thread_pool->start(); + + work_queue = new ContextWQ("Journaler::work_queue", + ceph::make_timespan(60), + thread_pool); + + timer = new SafeTimer(cct, timer_lock, true); + timer->init(); +} + +Journaler::Threads::~Threads() { + { + std::lock_guard timer_locker{timer_lock}; + timer->shutdown(); + } + delete timer; + timer = nullptr; + + work_queue->drain(); + delete work_queue; + work_queue = nullptr; + + thread_pool->stop(); + delete thread_pool; + thread_pool = nullptr; +} + +Journaler::Journaler(librados::IoCtx &header_ioctx, + const std::string &journal_id, + const std::string &client_id, const Settings &settings, + CacheManagerHandler *cache_manager_handler) + : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))), + m_client_id(client_id), m_cache_manager_handler(cache_manager_handler) { + set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock, + header_ioctx, journal_id, settings); +} + +Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer, + ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, + const std::string &journal_id, + const std::string &client_id, const Settings &settings, + CacheManagerHandler *cache_manager_handler) + : m_client_id(client_id), m_cache_manager_handler(cache_manager_handler) { + set_up(work_queue, timer, timer_lock, header_ioctx, journal_id, + settings); +} + +void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer, + ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, + const std::string &journal_id, + const Settings &settings) { + m_header_ioctx.dup(header_ioctx); + m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct()); + + m_header_oid = header_oid(journal_id); + m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id); + + m_metadata = ceph::make_ref<JournalMetadata>(work_queue, timer, timer_lock, + m_header_ioctx, m_header_oid, m_client_id, + settings); +} + +Journaler::~Journaler() { + if (m_metadata != nullptr) { + ceph_assert(!m_metadata->is_initialized()); + if (!m_initialized) { + // never initialized -- ensure any in-flight ops are complete + // since we wouldn't expect shut_down to be invoked + m_metadata->wait_for_ops(); + } + m_metadata.reset(); + } + ceph_assert(m_trimmer == nullptr); + ceph_assert(m_player == nullptr); + ceph_assert(m_recorder == nullptr); + + delete m_threads; + m_threads = nullptr; +} + +void Journaler::exists(Context *on_finish) const { + librados::ObjectReadOperation op; + op.stat(nullptr, nullptr, nullptr); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(on_finish, rados_ctx_callback); + int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, nullptr); + ceph_assert(r == 0); + comp->release(); +} + +void Journaler::init(Context *on_init) { + m_initialized = true; + m_metadata->init(new C_InitJournaler(this, on_init)); +} + +int Journaler::init_complete() { + int64_t pool_id = m_metadata->get_pool_id(); + + if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) { + ldout(m_cct, 20) << "using image pool for journal data" << dendl; + m_data_ioctx.dup(m_header_ioctx); + } else { + ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data" + << dendl; + librados::Rados rados(m_header_ioctx); + int r = rados.ioctx_create2(pool_id, m_data_ioctx); + if (r < 0) { + if (r == -ENOENT) { + ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists" + << dendl; + } + return r; + } + } + m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, + m_metadata); + return 0; +} + +void Journaler::shut_down() { + C_SaferCond ctx; + shut_down(&ctx); + ctx.wait(); +} + +void Journaler::shut_down(Context *on_finish) { + ceph_assert(m_player == nullptr); + ceph_assert(m_recorder == nullptr); + + auto metadata = std::move(m_metadata); + ceph_assert(metadata); + + on_finish = new LambdaContext([metadata, on_finish](int r) { + on_finish->complete(0); + }); + + JournalTrimmer *trimmer = nullptr; + std::swap(trimmer, m_trimmer); + if (!trimmer) { + metadata->shut_down(on_finish); + return; + } + + on_finish = new LambdaContext([trimmer, metadata, on_finish](int r) { + delete trimmer; + metadata->shut_down(on_finish); + }); + trimmer->shut_down(on_finish); +} + +bool Journaler::is_initialized() const { + return m_metadata->is_initialized(); +} + +void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish) { + m_metadata->get_immutable_metadata(order, splay_width, pool_id, on_finish); +} + +void Journaler::get_mutable_metadata(uint64_t *minimum_set, + uint64_t *active_set, + RegisteredClients *clients, + Context *on_finish) { + m_metadata->get_mutable_metadata(minimum_set, active_set, clients, on_finish); +} + +void Journaler::create(uint8_t order, uint8_t splay_width, + int64_t pool_id, Context *on_finish) { + if (order > 26 || order < 12) { + lderr(m_cct) << "order must be in the range [12, 26]" << dendl; + on_finish->complete(-EDOM); + return; + } + if (splay_width == 0) { + on_finish->complete(-EINVAL); + return; + } + + ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl; + + librados::ObjectWriteOperation op; + client::create(&op, order, splay_width, pool_id); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(on_finish, rados_ctx_callback); + int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op); + ceph_assert(r == 0); + comp->release(); +} + +void Journaler::remove(bool force, Context *on_finish) { + // chain journal removal (reverse order) + on_finish = new LambdaContext([this, on_finish](int r) { + librados::AioCompletion *comp = librados::Rados::aio_create_completion( + on_finish, utils::rados_ctx_callback); + r = m_header_ioctx.aio_remove(m_header_oid, comp); + ceph_assert(r == 0); + comp->release(); + }); + + on_finish = new LambdaContext([this, force, on_finish](int r) { + m_trimmer->remove_objects(force, on_finish); + }); + + m_metadata->shut_down(on_finish); +} + +void Journaler::flush_commit_position(Context *on_safe) { + m_metadata->flush_commit_position(on_safe); +} + +void Journaler::add_listener(JournalMetadataListener *listener) { + m_metadata->add_listener(listener); +} + +void Journaler::remove_listener(JournalMetadataListener *listener) { + m_metadata->remove_listener(listener); +} + +int Journaler::register_client(const bufferlist &data) { + C_SaferCond cond; + register_client(data, &cond); + return cond.wait(); +} + +int Journaler::unregister_client() { + C_SaferCond cond; + unregister_client(&cond); + return cond.wait(); +} + +void Journaler::register_client(const bufferlist &data, Context *on_finish) { + return m_metadata->register_client(data, on_finish); +} + +void Journaler::update_client(const bufferlist &data, Context *on_finish) { + return m_metadata->update_client(data, on_finish); +} + +void Journaler::unregister_client(Context *on_finish) { + return m_metadata->unregister_client(on_finish); +} + +void Journaler::get_client(const std::string &client_id, + cls::journal::Client *client, + Context *on_finish) { + m_metadata->get_client(client_id, client, on_finish); +} + +int Journaler::get_cached_client(const std::string &client_id, + cls::journal::Client *client) { + RegisteredClients clients; + m_metadata->get_registered_clients(&clients); + + auto it = clients.find({client_id, {}}); + if (it == clients.end()) { + return -ENOENT; + } + + *client = *it; + return 0; +} + +void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag, + Context *on_finish) { + m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag, + on_finish); +} + +void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data, + cls::journal::Tag *tag, Context *on_finish) { + m_metadata->allocate_tag(tag_class, data, tag, on_finish); +} + +void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) { + m_metadata->get_tag(tag_tid, tag, on_finish); +} + +void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) { + m_metadata->get_tags(0, tag_class, tags, on_finish); +} + +void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, + Tags *tags, Context *on_finish) { + m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish); +} + +void Journaler::start_replay(ReplayHandler* replay_handler) { + create_player(replay_handler); + m_player->prefetch(); +} + +void Journaler::start_live_replay(ReplayHandler* replay_handler, + double interval) { + create_player(replay_handler); + m_player->prefetch_and_watch(interval); +} + +bool Journaler::try_pop_front(ReplayEntry *replay_entry, + uint64_t *tag_tid) { + ceph_assert(m_player != nullptr); + + Entry entry; + uint64_t commit_tid; + if (!m_player->try_pop_front(&entry, &commit_tid)) { + return false; + } + + *replay_entry = ReplayEntry(entry.get_data(), commit_tid); + if (tag_tid != nullptr) { + *tag_tid = entry.get_tag_tid(); + } + return true; +} + +void Journaler::stop_replay() { + C_SaferCond ctx; + stop_replay(&ctx); + ctx.wait(); +} + +void Journaler::stop_replay(Context *on_finish) { + auto player = std::move(m_player); + auto* playerp = player.get(); + + auto f = [player=std::move(player), on_finish](int r) { + on_finish->complete(r); + }; + on_finish = new LambdaContext(std::move(f)); + playerp->shut_down(on_finish); +} + +void Journaler::committed(const ReplayEntry &replay_entry) { + m_trimmer->committed(replay_entry.get_commit_tid()); +} + +void Journaler::committed(const Future &future) { + auto& future_impl = future.get_future_impl(); + m_trimmer->committed(future_impl->get_commit_tid()); +} + +void Journaler::start_append(uint64_t max_in_flight_appends) { + ceph_assert(m_recorder == nullptr); + + // TODO verify active object set >= current replay object set + + m_recorder = std::make_unique<JournalRecorder>(m_data_ioctx, m_object_oid_prefix, + m_metadata, max_in_flight_appends); +} + +void Journaler::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ceph_assert(m_recorder != nullptr); + m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age); +} + +void Journaler::stop_append(Context *on_safe) { + auto recorder = std::move(m_recorder); + ceph_assert(recorder); + + auto* recorderp = recorder.get(); + on_safe = new LambdaContext([recorder=std::move(recorder), on_safe](int r) { + on_safe->complete(r); + }); + recorderp->shut_down(on_safe); +} + +uint64_t Journaler::get_max_append_size() const { + uint64_t max_payload_size = m_metadata->get_object_size() - + Entry::get_fixed_size(); + if (m_metadata->get_settings().max_payload_bytes > 0) { + max_payload_size = std::min(max_payload_size, + m_metadata->get_settings().max_payload_bytes); + } + return max_payload_size; +} + +Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) { + return m_recorder->append(tag_tid, payload_bl); +} + +void Journaler::flush_append(Context *on_safe) { + m_recorder->flush(on_safe); +} + +void Journaler::create_player(ReplayHandler* replay_handler) { + ceph_assert(m_player == nullptr); + m_player = std::make_unique<JournalPlayer>(m_data_ioctx, m_object_oid_prefix, m_metadata, + replay_handler, m_cache_manager_handler); +} + +void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id) { + ceph_assert(m_metadata != nullptr); + + *order = m_metadata->get_order(); + *splay_width = m_metadata->get_splay_width(); + *pool_id = m_metadata->get_pool_id(); +} + +std::ostream &operator<<(std::ostream &os, + const Journaler &journaler) { + os << "[metadata="; + if (journaler.m_metadata) { + os << *journaler.m_metadata; + } else { + os << "NULL"; + } + os << "]"; + return os; +} + +} // namespace journal diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h new file mode 100644 index 000000000..f42513c5d --- /dev/null +++ b/src/journal/Journaler.h @@ -0,0 +1,170 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNALER_H +#define CEPH_JOURNAL_JOURNALER_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "journal/Future.h" +#include "journal/JournalMetadataListener.h" +#include "cls/journal/cls_journal_types.h" +#include "common/Timer.h" +#include <list> +#include <map> +#include <string> +#include "include/ceph_assert.h" + +class ContextWQ; +class ThreadPool; + +namespace journal { + +struct CacheManagerHandler; + +class JournalTrimmer; +class ReplayEntry; +class ReplayHandler; +class Settings; + +class Journaler { +public: + struct Threads { + Threads(CephContext *cct); + ~Threads(); + + ThreadPool *thread_pool = nullptr; + ContextWQ *work_queue = nullptr; + + SafeTimer *timer; + ceph::mutex timer_lock = ceph::make_mutex("Journaler::timer_lock"); + }; + + typedef cls::journal::Tag Tag; + typedef std::list<cls::journal::Tag> Tags; + typedef std::set<cls::journal::Client> RegisteredClients; + + static std::string header_oid(const std::string &journal_id); + static std::string object_oid_prefix(int pool_id, + const std::string &journal_id); + + Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id, + const std::string &client_id, const Settings &settings, + CacheManagerHandler *cache_manager_handler); + Journaler(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, + librados::IoCtx &header_ioctx, const std::string &journal_id, + const std::string &client_id, const Settings &settings, + CacheManagerHandler *cache_manager_handler); + ~Journaler(); + + void exists(Context *on_finish) const; + void create(uint8_t order, uint8_t splay_width, int64_t pool_id, Context *ctx); + void remove(bool force, Context *on_finish); + + void init(Context *on_init); + void shut_down(); + void shut_down(Context *on_finish); + + bool is_initialized() const; + + void get_immutable_metadata(uint8_t *order, uint8_t *splay_width, + int64_t *pool_id, Context *on_finish); + void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set, + RegisteredClients *clients, Context *on_finish); + + void add_listener(JournalMetadataListener *listener); + void remove_listener(JournalMetadataListener *listener); + + int register_client(const bufferlist &data); + void register_client(const bufferlist &data, Context *on_finish); + + int unregister_client(); + void unregister_client(Context *on_finish); + + void update_client(const bufferlist &data, Context *on_finish); + void get_client(const std::string &client_id, cls::journal::Client *client, + Context *on_finish); + int get_cached_client(const std::string &client_id, + cls::journal::Client *client); + + void flush_commit_position(Context *on_safe); + + void allocate_tag(const bufferlist &data, cls::journal::Tag *tag, + Context *on_finish); + void allocate_tag(uint64_t tag_class, const bufferlist &data, + cls::journal::Tag *tag, Context *on_finish); + void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish); + void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish); + void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags, + Context *on_finish); + + void start_replay(ReplayHandler* replay_handler); + void start_live_replay(ReplayHandler* replay_handler, double interval); + bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr); + void stop_replay(); + void stop_replay(Context *on_finish); + + uint64_t get_max_append_size() const; + void start_append(uint64_t max_in_flight_appends); + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + Future append(uint64_t tag_tid, const bufferlist &bl); + void flush_append(Context *on_safe); + void stop_append(Context *on_safe); + + void committed(const ReplayEntry &replay_entry); + void committed(const Future &future); + + void get_metadata(uint8_t *order, uint8_t *splay_width, int64_t *pool_id); + +private: + struct C_InitJournaler : public Context { + Journaler *journaler; + Context *on_safe; + C_InitJournaler(Journaler *_journaler, Context *_on_safe) + : journaler(_journaler), on_safe(_on_safe) { + } + void finish(int r) override { + if (r == 0) { + r = journaler->init_complete(); + } + on_safe->complete(r); + } + }; + + Threads *m_threads = nullptr; + + mutable librados::IoCtx m_header_ioctx; + librados::IoCtx m_data_ioctx; + CephContext *m_cct; + std::string m_client_id; + CacheManagerHandler *m_cache_manager_handler; + + std::string m_header_oid; + std::string m_object_oid_prefix; + + bool m_initialized = false; + ceph::ref_t<class JournalMetadata> m_metadata; + std::unique_ptr<class JournalPlayer> m_player; + std::unique_ptr<class JournalRecorder> m_recorder; + JournalTrimmer *m_trimmer = nullptr; + + void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock, + librados::IoCtx &header_ioctx, const std::string &journal_id, + const Settings &settings); + + int init_complete(); + void create_player(ReplayHandler* replay_handler); + + friend std::ostream &operator<<(std::ostream &os, + const Journaler &journaler); +}; + +std::ostream &operator<<(std::ostream &os, + const Journaler &journaler); + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNALER_H diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc new file mode 100644 index 000000000..56eec51f6 --- /dev/null +++ b/src/journal/ObjectPlayer.cc @@ -0,0 +1,355 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectPlayer.h" +#include "journal/Utils.h" +#include "common/Timer.h" +#include <limits> + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "ObjectPlayer: " << this << " " + +namespace journal { + +namespace { + +bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter, + uint32_t *pad_len, bool *partial_entry) { + const uint32_t MAX_PAD = 8; + auto pad_bytes = MAX_PAD - off % MAX_PAD; + auto next = *iter; + + ceph_assert(!next.end()); + if (*next != '\0') { + return false; + } + + for (auto i = pad_bytes - 1; i > 0; i--) { + if ((++next).end()) { + *partial_entry = true; + return false; + } + if (*next != '\0') { + return false; + } + } + + *iter = next; + *pad_len += pad_bytes; + return true; +} + +} // anonymous namespace + +ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, + const std::string& object_oid_prefix, + uint64_t object_num, SafeTimer &timer, + ceph::mutex &timer_lock, uint8_t order, + uint64_t max_fetch_bytes) + : m_object_num(object_num), + m_oid(utils::get_object_name(object_oid_prefix, m_object_num)), + m_timer(timer), m_timer_lock(timer_lock), m_order(order), + m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order), + m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))) +{ + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); +} + +ObjectPlayer::~ObjectPlayer() { + { + std::lock_guard timer_locker{m_timer_lock}; + std::lock_guard locker{m_lock}; + ceph_assert(!m_fetch_in_progress); + ceph_assert(m_watch_ctx == nullptr); + } +} + +void ObjectPlayer::fetch(Context *on_finish) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(!m_fetch_in_progress); + m_fetch_in_progress = true; + + C_Fetch *context = new C_Fetch(this, on_finish); + librados::ObjectReadOperation op; + op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL); + op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + + auto rados_completion = + librados::Rados::aio_create_completion(context, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL); + ceph_assert(r == 0); + rados_completion->release(); +} + +void ObjectPlayer::watch(Context *on_fetch, double interval) { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl; + + std::lock_guard timer_locker{m_timer_lock}; + m_watch_interval = interval; + + ceph_assert(m_watch_ctx == nullptr); + m_watch_ctx = on_fetch; + + schedule_watch(); +} + +void ObjectPlayer::unwatch() { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl; + Context *watch_ctx = nullptr; + { + std::lock_guard timer_locker{m_timer_lock}; + ceph_assert(!m_unwatched); + m_unwatched = true; + + if (!cancel_watch()) { + return; + } + + std::swap(watch_ctx, m_watch_ctx); + } + + if (watch_ctx != nullptr) { + watch_ctx->complete(-ECANCELED); + } +} + +void ObjectPlayer::front(Entry *entry) const { + std::lock_guard locker{m_lock}; + ceph_assert(!m_entries.empty()); + *entry = m_entries.front(); +} + +void ObjectPlayer::pop_front() { + std::lock_guard locker{m_lock}; + ceph_assert(!m_entries.empty()); + + auto &entry = m_entries.front(); + m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()}); + m_entries.pop_front(); +} + +int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl, + bool *refetch) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len=" + << bl.length() << dendl; + + *refetch = false; + if (r == -ENOENT) { + return 0; + } else if (r < 0) { + return r; + } else if (bl.length() == 0) { + return 0; + } + + std::lock_guard locker{m_lock}; + ceph_assert(m_fetch_in_progress); + m_read_off += bl.length(); + m_read_bl.append(bl); + m_refetch_state = REFETCH_STATE_REQUIRED; + + bool full_fetch = (m_max_fetch_bytes == 2U << m_order); + bool partial_entry = false; + bool invalid = false; + uint32_t invalid_start_off = 0; + + clear_invalid_range(m_read_bl_off, m_read_bl.length()); + bufferlist::const_iterator iter{&m_read_bl, 0}; + uint32_t pad_len = 0; + while (!iter.end()) { + uint32_t bytes_needed; + uint32_t bl_off = iter.get_off(); + if (!Entry::is_readable(iter, &bytes_needed)) { + if (bytes_needed != 0) { + invalid_start_off = m_read_bl_off + bl_off; + invalid = true; + partial_entry = true; + if (full_fetch) { + lderr(m_cct) << ": partial record at offset " << invalid_start_off + << dendl; + } else { + ldout(m_cct, 20) << ": partial record detected, will re-fetch" + << dendl; + } + break; + } + + if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter, + &pad_len, &partial_entry)) { + invalid_start_off = m_read_bl_off + bl_off; + invalid = true; + if (partial_entry) { + if (full_fetch) { + lderr(m_cct) << ": partial pad at offset " << invalid_start_off + << dendl; + } else { + ldout(m_cct, 20) << ": partial pad detected, will re-fetch" + << dendl; + } + } else { + lderr(m_cct) << ": detected corrupt journal entry at offset " + << invalid_start_off << dendl; + } + break; + } + ++iter; + continue; + } + + Entry entry; + decode(entry, iter); + ldout(m_cct, 20) << ": " << entry << " decoded" << dendl; + + uint32_t entry_len = iter.get_off() - bl_off; + if (invalid) { + // new corrupt region detected + uint32_t invalid_end_off = m_read_bl_off + bl_off; + lderr(m_cct) << ": corruption range [" << invalid_start_off + << ", " << invalid_end_off << ")" << dendl; + m_invalid_ranges.insert(invalid_start_off, + invalid_end_off - invalid_start_off); + invalid = false; + + m_read_bl_off = invalid_end_off; + } + + EntryKey entry_key(std::make_pair(entry.get_tag_tid(), + entry.get_entry_tid())); + if (m_entry_keys.find(entry_key) == m_entry_keys.end()) { + m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry); + } else { + ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl; + *m_entry_keys[entry_key] = entry; + } + + // prune decoded / corrupted journal entries from front of bl + bufferlist sub_bl; + sub_bl.substr_of(m_read_bl, iter.get_off(), + m_read_bl.length() - iter.get_off()); + sub_bl.swap(m_read_bl); + iter = bufferlist::iterator(&m_read_bl, 0); + + // advance the decoded entry offset + m_read_bl_off += entry_len + pad_len; + pad_len = 0; + } + + if (invalid) { + uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length(); + if (!partial_entry) { + lderr(m_cct) << ": corruption range [" << invalid_start_off + << ", " << invalid_end_off << ")" << dendl; + } + m_invalid_ranges.insert(invalid_start_off, + invalid_end_off - invalid_start_off); + } + + if (!m_invalid_ranges.empty() && !partial_entry) { + return -EBADMSG; + } else if (partial_entry && (full_fetch || m_entries.empty())) { + *refetch = true; + return -EAGAIN; + } + + return 0; +} + +void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) { + // possibly remove previously partial record region + InvalidRanges decode_range; + decode_range.insert(off, len); + InvalidRanges intersect_range; + intersect_range.intersection_of(m_invalid_ranges, decode_range); + if (!intersect_range.empty()) { + ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range + << dendl; + m_invalid_ranges.subtract(intersect_range); + } +} + +void ObjectPlayer::schedule_watch() { + ceph_assert(ceph_mutex_is_locked(m_timer_lock)); + if (m_watch_ctx == NULL) { + return; + } + + ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl; + ceph_assert(m_watch_task == nullptr); + m_watch_task = m_timer.add_event_after( + m_watch_interval, + new LambdaContext([this](int) { + handle_watch_task(); + })); +} + +bool ObjectPlayer::cancel_watch() { + ceph_assert(ceph_mutex_is_locked(m_timer_lock)); + ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl; + if (m_watch_task != nullptr) { + bool canceled = m_timer.cancel_event(m_watch_task); + ceph_assert(canceled); + + m_watch_task = nullptr; + return true; + } + return false; +} + +void ObjectPlayer::handle_watch_task() { + ceph_assert(ceph_mutex_is_locked(m_timer_lock)); + + ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl; + ceph_assert(m_watch_ctx != nullptr); + ceph_assert(m_watch_task != nullptr); + + m_watch_task = nullptr; + fetch(new C_WatchFetch(this)); +} + +void ObjectPlayer::handle_watch_fetched(int r) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r + << dendl; + + Context *watch_ctx = nullptr; + { + std::lock_guard timer_locker{m_timer_lock}; + std::swap(watch_ctx, m_watch_ctx); + + if (m_unwatched) { + m_unwatched = false; + r = -ECANCELED; + } + } + + if (watch_ctx != nullptr) { + watch_ctx->complete(r); + } +} + +void ObjectPlayer::C_Fetch::finish(int r) { + bool refetch = false; + r = object_player->handle_fetch_complete(r, read_bl, &refetch); + + { + std::lock_guard locker{object_player->m_lock}; + object_player->m_fetch_in_progress = false; + } + + if (refetch) { + object_player->fetch(on_finish); + return; + } + + object_player.reset(); + on_finish->complete(r); +} + +void ObjectPlayer::C_WatchFetch::finish(int r) { + object_player->handle_watch_fetched(r); +} + +} // namespace journal diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h new file mode 100644 index 000000000..b9446252a --- /dev/null +++ b/src/journal/ObjectPlayer.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H +#define CEPH_JOURNAL_OBJECT_PLAYER_H + +#include "include/Context.h" +#include "include/interval_set.h" +#include "include/rados/librados.hpp" +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "common/RefCountedObj.h" +#include "journal/Entry.h" +#include <list> +#include <string> +#include <boost/noncopyable.hpp> +#include <boost/unordered_map.hpp> +#include "include/ceph_assert.h" + +namespace journal { + +class ObjectPlayer : public RefCountedObject { +public: + typedef std::list<Entry> Entries; + typedef interval_set<uint64_t> InvalidRanges; + + enum RefetchState { + REFETCH_STATE_NONE, + REFETCH_STATE_REQUIRED, + REFETCH_STATE_IMMEDIATE + }; + + inline const std::string &get_oid() const { + return m_oid; + } + inline uint64_t get_object_number() const { + return m_object_num; + } + + void fetch(Context *on_finish); + void watch(Context *on_fetch, double interval); + void unwatch(); + + void front(Entry *entry) const; + void pop_front(); + inline bool empty() const { + std::lock_guard locker{m_lock}; + return m_entries.empty(); + } + + inline void get_entries(Entries *entries) { + std::lock_guard locker{m_lock}; + *entries = m_entries; + } + inline void get_invalid_ranges(InvalidRanges *invalid_ranges) { + std::lock_guard locker{m_lock}; + *invalid_ranges = m_invalid_ranges; + } + + inline bool refetch_required() const { + return (get_refetch_state() != REFETCH_STATE_NONE); + } + inline RefetchState get_refetch_state() const { + return m_refetch_state; + } + inline void set_refetch_state(RefetchState refetch_state) { + m_refetch_state = refetch_state; + } + + inline void set_max_fetch_bytes(uint64_t max_fetch_bytes) { + std::lock_guard locker{m_lock}; + m_max_fetch_bytes = max_fetch_bytes; + } + +private: + FRIEND_MAKE_REF(ObjectPlayer); + ObjectPlayer(librados::IoCtx &ioctx, const std::string& object_oid_prefix, + uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock, + uint8_t order, uint64_t max_fetch_bytes); + ~ObjectPlayer() override; + + typedef std::pair<uint64_t, uint64_t> EntryKey; + typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys; + + struct C_Fetch : public Context { + ceph::ref_t<ObjectPlayer> object_player; + Context *on_finish; + bufferlist read_bl; + C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) { + } + void finish(int r) override; + }; + struct C_WatchFetch : public Context { + ceph::ref_t<ObjectPlayer> object_player; + C_WatchFetch(ObjectPlayer *o) : object_player(o) { + } + void finish(int r) override; + }; + + librados::IoCtx m_ioctx; + uint64_t m_object_num; + std::string m_oid; + CephContext *m_cct = nullptr; + + SafeTimer &m_timer; + ceph::mutex &m_timer_lock; + + uint8_t m_order; + uint64_t m_max_fetch_bytes; + + double m_watch_interval = 0; + Context *m_watch_task = nullptr; + + mutable ceph::mutex m_lock; + bool m_fetch_in_progress = false; + bufferlist m_read_bl; + uint32_t m_read_off = 0; + uint32_t m_read_bl_off = 0; + + Entries m_entries; + EntryKeys m_entry_keys; + InvalidRanges m_invalid_ranges; + + Context *m_watch_ctx = nullptr; + + bool m_unwatched = false; + RefetchState m_refetch_state = REFETCH_STATE_IMMEDIATE; + + int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch); + + void clear_invalid_range(uint32_t off, uint32_t len); + + void schedule_watch(); + bool cancel_watch(); + void handle_watch_task(); + void handle_watch_fetched(int r); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_OBJECT_PLAYER_H diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc new file mode 100644 index 000000000..2c77f85b0 --- /dev/null +++ b/src/journal/ObjectRecorder.cc @@ -0,0 +1,424 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectRecorder.h" +#include "journal/Future.h" +#include "journal/Utils.h" +#include "include/ceph_assert.h" +#include "common/Timer.h" +#include "common/errno.h" +#include "cls/journal/cls_journal_client.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "ObjectRecorder: " << this << " " \ + << __func__ << " (" << m_oid << "): " + +using namespace cls::journal; +using std::shared_ptr; + +namespace journal { + +ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid, + uint64_t object_number, ceph::mutex* lock, + ContextWQ *work_queue, Handler *handler, + uint8_t order, int32_t max_in_flight_appends) + : m_oid(oid), m_object_number(object_number), + m_op_work_queue(work_queue), m_handler(handler), + m_order(order), m_soft_max_size(1 << m_order), + m_max_in_flight_appends(max_in_flight_appends), + m_lock(lock) +{ + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); + ceph_assert(m_handler != NULL); + + librados::Rados rados(m_ioctx); + int8_t require_osd_release = 0; + int r = rados.get_min_compatible_osd(&require_osd_release); + if (r < 0) { + ldout(m_cct, 0) << "failed to retrieve min OSD release: " + << cpp_strerror(r) << dendl; + } + m_compat_mode = require_osd_release < CEPH_RELEASE_OCTOPUS; + + ldout(m_cct, 20) << dendl; +} + +ObjectRecorder::~ObjectRecorder() { + ldout(m_cct, 20) << dendl; + ceph_assert(m_pending_buffers.empty()); + ceph_assert(m_in_flight_tids.empty()); + ceph_assert(m_in_flight_appends.empty()); +} + +void ObjectRecorder::set_append_batch_options(int flush_interval, + uint64_t flush_bytes, + double flush_age) { + ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " + << "flush_bytes=" << flush_bytes << ", " + << "flush_age=" << flush_age << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_lock)); + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; +} + +bool ObjectRecorder::append(AppendBuffers &&append_buffers) { + ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_lock)); + + ceph::ref_t<FutureImpl> last_flushed_future; + auto flush_handler = get_flush_handler(); + for (auto& append_buffer : append_buffers) { + ldout(m_cct, 20) << *append_buffer.first << ", " + << "size=" << append_buffer.second.length() << dendl; + bool flush_requested = append_buffer.first->attach(flush_handler); + if (flush_requested) { + last_flushed_future = append_buffer.first; + } + + m_pending_buffers.push_back(append_buffer); + m_pending_bytes += append_buffer.second.length(); + } + + return send_appends(!!last_flushed_future, last_flushed_future); +} + +void ObjectRecorder::flush(Context *on_safe) { + ldout(m_cct, 20) << dendl; + + Future future; + { + std::unique_lock locker{*m_lock}; + + // if currently handling flush notifications, wait so that + // we notify in the correct order (since lock is dropped on + // callback) + while (m_in_flight_callbacks > 0) { + m_in_flight_callbacks_cond.wait(locker); + } + + // attach the flush to the most recent append + if (!m_pending_buffers.empty()) { + future = Future(m_pending_buffers.rbegin()->first); + } else if (!m_in_flight_appends.empty()) { + AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second; + ceph_assert(!append_buffers.empty()); + future = Future(append_buffers.rbegin()->first); + } + } + + if (future.is_valid()) { + // cannot be invoked while the same lock context + m_op_work_queue->queue(new LambdaContext( + [future, on_safe] (int r) mutable { + future.flush(on_safe); + })); + } else { + on_safe->complete(0); + } +} + +void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) { + ldout(m_cct, 20) << "flushing " << *future << dendl; + + std::unique_lock locker{*m_lock}; + auto flush_handler = future->get_flush_handler(); + auto my_handler = get_flush_handler(); + if (flush_handler != my_handler) { + // if we don't own this future, re-issue the flush so that it hits the + // correct journal object owner + future->flush(); + return; + } else if (future->is_flush_in_progress()) { + return; + } + + if (!m_object_closed && !m_overflowed && send_appends(true, future)) { + ++m_in_flight_callbacks; + notify_handler_unlock(locker, true); + } +} + +void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { + ldout(m_cct, 20) << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_lock)); + ceph_assert(m_in_flight_tids.empty()); + ceph_assert(m_in_flight_appends.empty()); + ceph_assert(m_object_closed || m_overflowed); + + for (auto& append_buffer : m_pending_buffers) { + ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl; + append_buffer.first->detach(); + } + append_buffers->splice(append_buffers->end(), m_pending_buffers, + m_pending_buffers.begin(), m_pending_buffers.end()); +} + +bool ObjectRecorder::close() { + ceph_assert(ceph_mutex_is_locked(*m_lock)); + + ldout(m_cct, 20) << dendl; + + send_appends(true, {}); + + ceph_assert(!m_object_closed); + m_object_closed = true; + + if (!m_in_flight_tids.empty() || m_in_flight_callbacks > 0) { + m_object_closed_notify = true; + return false; + } + + return true; +} + +void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { + ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl; + + std::unique_lock locker{*m_lock}; + ++m_in_flight_callbacks; + + auto tid_iter = m_in_flight_tids.find(tid); + ceph_assert(tid_iter != m_in_flight_tids.end()); + m_in_flight_tids.erase(tid_iter); + + InFlightAppends::iterator iter = m_in_flight_appends.find(tid); + ceph_assert(iter != m_in_flight_appends.end()); + + bool notify_overflowed = false; + AppendBuffers append_buffers; + if (r == -EOVERFLOW) { + ldout(m_cct, 10) << "append overflowed: " + << "idle=" << m_in_flight_tids.empty() << ", " + << "previous_overflow=" << m_overflowed << dendl; + if (m_in_flight_tids.empty()) { + append_overflowed(); + } + + if (!m_object_closed && !m_overflowed) { + notify_overflowed = true; + } + m_overflowed = true; + } else { + append_buffers.swap(iter->second); + ceph_assert(!append_buffers.empty()); + + for (auto& append_buffer : append_buffers) { + auto length = append_buffer.second.length(); + m_object_bytes += length; + + ceph_assert(m_in_flight_bytes >= length); + m_in_flight_bytes -= length; + } + ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl; + + m_in_flight_appends.erase(iter); + } + locker.unlock(); + + // Flag the associated futures as complete. + for (auto& append_buffer : append_buffers) { + ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl; + append_buffer.first->safe(r); + } + + // attempt to kick off more appends to the object + locker.lock(); + if (!m_object_closed && !m_overflowed && send_appends(false, {})) { + notify_overflowed = true; + } + + ldout(m_cct, 20) << "pending tids=" << m_in_flight_tids << dendl; + + // notify of overflow if one just occurred or indicate that all in-flight + // appends have completed on a closed object (or wake up stalled flush + // requests that was waiting for this strand to complete). + notify_handler_unlock(locker, notify_overflowed); +} + +void ObjectRecorder::append_overflowed() { + ldout(m_cct, 10) << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_lock)); + ceph_assert(!m_in_flight_appends.empty()); + + InFlightAppends in_flight_appends; + in_flight_appends.swap(m_in_flight_appends); + + AppendBuffers restart_append_buffers; + for (InFlightAppends::iterator it = in_flight_appends.begin(); + it != in_flight_appends.end(); ++it) { + restart_append_buffers.insert(restart_append_buffers.end(), + it->second.begin(), it->second.end()); + } + + restart_append_buffers.splice(restart_append_buffers.end(), + m_pending_buffers, + m_pending_buffers.begin(), + m_pending_buffers.end()); + restart_append_buffers.swap(m_pending_buffers); +} + +bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_future) { + ldout(m_cct, 20) << dendl; + + ceph_assert(ceph_mutex_is_locked(*m_lock)); + if (m_object_closed || m_overflowed) { + ldout(m_cct, 20) << "already closed or overflowed" << dendl; + return false; + } + + if (m_pending_buffers.empty()) { + ldout(m_cct, 20) << "append buffers empty" << dendl; + return false; + } + + if (!force && + ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) || + (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) || + (m_flush_age > 0 && !m_last_flush_time.is_zero() && + m_last_flush_time + m_flush_age <= ceph_clock_now()))) { + ldout(m_cct, 20) << "forcing batch flush" << dendl; + force = true; + } + + // start tracking flush time after the first append event + if (m_last_flush_time.is_zero()) { + m_last_flush_time = ceph_clock_now(); + } + + auto max_in_flight_appends = m_max_in_flight_appends; + if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) { + if (!force && max_in_flight_appends == 0) { + ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl; + max_in_flight_appends = 1; + } + } else if (max_in_flight_appends < 0) { + max_in_flight_appends = 0; + } + + if (!force && max_in_flight_appends != 0 && + static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) { + ldout(m_cct, 10) << "max in flight appends reached" << dendl; + return false; + } + + librados::ObjectWriteOperation op; + if (m_compat_mode) { + client::guard_append(&op, m_soft_max_size); + } + + size_t append_bytes = 0; + AppendBuffers append_buffers; + bufferlist append_bl; + for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { + auto& future = it->first; + auto& bl = it->second; + auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length(); + if (size == m_soft_max_size) { + ldout(m_cct, 10) << "object at capacity (" << size << ") " << *future << dendl; + m_overflowed = true; + } else if (size > m_soft_max_size) { + ldout(m_cct, 10) << "object beyond capacity (" << size << ") " << *future << dendl; + m_overflowed = true; + break; + } + + bool flush_break = (force && flush_future && flush_future == future); + ldout(m_cct, 20) << "flushing " << *future << dendl; + future->set_flush_in_progress(); + + if (m_compat_mode) { + op.append(bl); + op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + } else { + append_bl.append(bl); + } + + append_bytes += bl.length(); + append_buffers.push_back(*it); + it = m_pending_buffers.erase(it); + + if (flush_break) { + ldout(m_cct, 20) << "stopping at requested flush future" << dendl; + break; + } + } + + if (append_bytes > 0) { + m_last_flush_time = ceph_clock_now(); + + uint64_t append_tid = m_append_tid++; + m_in_flight_tids.insert(append_tid); + m_in_flight_appends[append_tid].swap(append_buffers); + m_in_flight_bytes += append_bytes; + + ceph_assert(m_pending_bytes >= append_bytes); + m_pending_bytes -= append_bytes; + + if (!m_compat_mode) { + client::append(&op, m_soft_max_size, append_bl); + } + + auto rados_completion = librados::Rados::aio_create_completion( + new C_AppendFlush(this, append_tid), utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); + ceph_assert(r == 0); + rados_completion->release(); + ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", " + << "append_bytes=" << append_bytes << ", " + << "in_flight_bytes=" << m_in_flight_bytes << ", " + << "pending_bytes=" << m_pending_bytes << dendl; + } + + return m_overflowed; +} + +void ObjectRecorder::wake_up_flushes() { + ceph_assert(ceph_mutex_is_locked(*m_lock)); + --m_in_flight_callbacks; + if (m_in_flight_callbacks == 0) { + m_in_flight_callbacks_cond.notify_all(); + } +} + +void ObjectRecorder::notify_handler_unlock( + std::unique_lock<ceph::mutex>& locker, bool notify_overflowed) { + ceph_assert(ceph_mutex_is_locked(*m_lock)); + ceph_assert(m_in_flight_callbacks > 0); + + if (!m_object_closed && notify_overflowed) { + // TODO need to delay completion until after aio_notify completes + ldout(m_cct, 10) << "overflow" << dendl; + ceph_assert(m_overflowed); + + locker.unlock(); + m_handler->overflow(this); + locker.lock(); + } + + // wake up blocked flush requests + wake_up_flushes(); + + // An overflow notification might have blocked a close. A close + // notification could lead to the immediate destruction of this object + // so the object shouldn't be referenced anymore + bool object_closed_notify = false; + if (m_in_flight_tids.empty()) { + std::swap(object_closed_notify, m_object_closed_notify); + } + ceph_assert(m_object_closed || !object_closed_notify); + locker.unlock(); + + if (object_closed_notify) { + ldout(m_cct, 10) << "closed" << dendl; + m_handler->closed(this); + } +} + +} // namespace journal diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h new file mode 100644 index 000000000..5c5f88c86 --- /dev/null +++ b/src/journal/ObjectRecorder.h @@ -0,0 +1,160 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H +#define CEPH_JOURNAL_OBJECT_RECORDER_H + +#include "include/utime.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/ceph_mutex.h" +#include "common/RefCountedObj.h" +#include "common/WorkQueue.h" +#include "common/Timer.h" +#include "journal/FutureImpl.h" +#include <list> +#include <map> +#include <set> +#include <boost/noncopyable.hpp> +#include "include/ceph_assert.h" + +namespace journal { + +class ObjectRecorder; + +typedef std::pair<ceph::ref_t<FutureImpl>, bufferlist> AppendBuffer; +typedef std::list<AppendBuffer> AppendBuffers; + +class ObjectRecorder : public RefCountedObject, boost::noncopyable { +public: + struct Handler { + virtual ~Handler() { + } + virtual void closed(ObjectRecorder *object_recorder) = 0; + virtual void overflow(ObjectRecorder *object_recorder) = 0; + }; + + void set_append_batch_options(int flush_interval, uint64_t flush_bytes, + double flush_age); + + inline uint64_t get_object_number() const { + return m_object_number; + } + inline const std::string &get_oid() const { + return m_oid; + } + + bool append(AppendBuffers &&append_buffers); + void flush(Context *on_safe); + void flush(const ceph::ref_t<FutureImpl> &future); + + void claim_append_buffers(AppendBuffers *append_buffers); + + bool is_closed() const { + ceph_assert(ceph_mutex_is_locked(*m_lock)); + return (m_object_closed && m_in_flight_appends.empty()); + } + bool close(); + + inline CephContext *cct() const { + return m_cct; + } + + inline size_t get_pending_appends() const { + std::lock_guard locker{*m_lock}; + return m_pending_buffers.size(); + } + +private: + FRIEND_MAKE_REF(ObjectRecorder); + ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid, + uint64_t object_number, ceph::mutex* lock, + ContextWQ *work_queue, Handler *handler, uint8_t order, + int32_t max_in_flight_appends); + ~ObjectRecorder() override; + + typedef std::set<uint64_t> InFlightTids; + typedef std::map<uint64_t, AppendBuffers> InFlightAppends; + + struct FlushHandler : public FutureImpl::FlushHandler { + ceph::ref_t<ObjectRecorder> object_recorder; + virtual void flush(const ceph::ref_t<FutureImpl> &future) override { + object_recorder->flush(future); + } + FlushHandler(ceph::ref_t<ObjectRecorder> o) : object_recorder(std::move(o)) {} + }; + struct C_AppendFlush : public Context { + ceph::ref_t<ObjectRecorder> object_recorder; + uint64_t tid; + C_AppendFlush(ceph::ref_t<ObjectRecorder> o, uint64_t _tid) + : object_recorder(std::move(o)), tid(_tid) { + } + void finish(int r) override { + object_recorder->handle_append_flushed(tid, r); + } + }; + + librados::IoCtx m_ioctx; + std::string m_oid; + uint64_t m_object_number; + CephContext *m_cct = nullptr; + + ContextWQ *m_op_work_queue; + + Handler *m_handler; + + uint8_t m_order; + uint64_t m_soft_max_size; + + uint32_t m_flush_interval = 0; + uint64_t m_flush_bytes = 0; + double m_flush_age = 0; + int32_t m_max_in_flight_appends; + + bool m_compat_mode; + + /* So that ObjectRecorder::FlushHandler doesn't create a circular reference: */ + std::weak_ptr<FlushHandler> m_flush_handler; + auto get_flush_handler() { + auto h = m_flush_handler.lock(); + if (!h) { + h = std::make_shared<FlushHandler>(this); + m_flush_handler = h; + } + return h; + } + + mutable ceph::mutex* m_lock; + AppendBuffers m_pending_buffers; + uint64_t m_pending_bytes = 0; + utime_t m_last_flush_time; + + uint64_t m_append_tid = 0; + + InFlightTids m_in_flight_tids; + InFlightAppends m_in_flight_appends; + uint64_t m_object_bytes = 0; + + bool m_overflowed = false; + + bool m_object_closed = false; + bool m_object_closed_notify = false; + + bufferlist m_prefetch_bl; + + uint32_t m_in_flight_callbacks = 0; + ceph::condition_variable m_in_flight_callbacks_cond; + uint64_t m_in_flight_bytes = 0; + + bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal); + void handle_append_flushed(uint64_t tid, int r); + void append_overflowed(); + + void wake_up_flushes(); + void notify_handler_unlock(std::unique_lock<ceph::mutex>& locker, + bool notify_overflowed); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_OBJECT_RECORDER_H diff --git a/src/journal/ReplayEntry.h b/src/journal/ReplayEntry.h new file mode 100644 index 000000000..4dd3ba475 --- /dev/null +++ b/src/journal/ReplayEntry.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_REPLAY_ENTRY_H +#define CEPH_JOURNAL_REPLAY_ENTRY_H + +#include "include/int_types.h" +#include "include/buffer.h" + +namespace journal { + +class ReplayEntry { +public: + ReplayEntry() : m_commit_tid(0) { + } + ReplayEntry(const bufferlist &data, uint64_t commit_tid) + : m_data(data), m_commit_tid(commit_tid) { + } + + inline const bufferlist &get_data() const { + return m_data; + } + inline uint64_t get_commit_tid() const { + return m_commit_tid; + } + +private: + bufferlist m_data; + uint64_t m_commit_tid; +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_REPLAY_ENTRY_H diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h new file mode 100644 index 000000000..d0967c68d --- /dev/null +++ b/src/journal/ReplayHandler.h @@ -0,0 +1,17 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_REPLAY_HANDLER_H +#define CEPH_JOURNAL_REPLAY_HANDLER_H + +namespace journal { + +struct ReplayHandler { + virtual void handle_entries_available() = 0; + virtual void handle_complete(int r) = 0; + virtual ~ReplayHandler() {} +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_REPLAY_HANDLER_H diff --git a/src/journal/Settings.h b/src/journal/Settings.h new file mode 100644 index 000000000..538225284 --- /dev/null +++ b/src/journal/Settings.h @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_SETTINGS_H +#define CEPH_JOURNAL_SETTINGS_H + +#include "include/int_types.h" + +namespace journal { + +struct Settings { + double commit_interval = 5; ///< commit position throttle (in secs) + uint64_t max_payload_bytes = 0; ///< 0 implies object size limit + int max_concurrent_object_sets = 0; ///< 0 implies no limit + std::set<std::string> ignored_laggy_clients; + ///< clients that mustn't be disconnected +}; + +} // namespace journal + +#endif // # CEPH_JOURNAL_SETTINGS_H diff --git a/src/journal/Types.h b/src/journal/Types.h new file mode 100644 index 000000000..8f8ce7800 --- /dev/null +++ b/src/journal/Types.h @@ -0,0 +1,28 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_TYPES_H +#define CEPH_JOURNAL_TYPES_H + +namespace journal { + +struct CacheRebalanceHandler { + virtual ~CacheRebalanceHandler() { + } + + virtual void handle_cache_rebalanced(uint64_t new_cache_bytes) = 0; +}; + +struct CacheManagerHandler { + virtual ~CacheManagerHandler() { + } + + virtual void register_cache(const std::string &cache_name, + uint64_t min_size, uint64_t max_size, + CacheRebalanceHandler* handler) = 0; + virtual void unregister_cache(const std::string &cache_name) = 0; +}; + +} // namespace journal + +#endif // # CEPH_JOURNAL_TYPES_H diff --git a/src/journal/Utils.cc b/src/journal/Utils.cc new file mode 100644 index 000000000..2a8d945a3 --- /dev/null +++ b/src/journal/Utils.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Utils.h" +#include "include/Context.h" +#include "include/stringify.h" + +namespace journal { +namespace utils { + +std::string get_object_name(const std::string &prefix, uint64_t number) { + return prefix + stringify(number); +} + +std::string unique_lock_name(const std::string &name, void *address) { + return name + " (" + stringify(address) + ")"; +} + +void rados_ctx_callback(rados_completion_t c, void *arg) { + Context *comp = reinterpret_cast<Context *>(arg); + comp->complete(rados_aio_get_return_value(c)); +} + +} // namespace utils +} // namespace journal diff --git a/src/journal/Utils.h b/src/journal/Utils.h new file mode 100644 index 000000000..c5695e583 --- /dev/null +++ b/src/journal/Utils.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_UTILS_H +#define CEPH_JOURNAL_UTILS_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include <string> + +namespace journal { +namespace utils { + +namespace detail { + +template <typename M> +struct C_AsyncCallback : public Context { + M journal_metadata; + Context *on_finish; + + C_AsyncCallback(M journal_metadata, Context *on_finish) + : journal_metadata(journal_metadata), on_finish(on_finish) { + } + void finish(int r) override { + journal_metadata->queue(on_finish, r); + } +}; + +} // namespace detail + +template <typename T, void(T::*MF)(int)> +void rados_state_callback(rados_completion_t c, void *arg) { + T *obj = reinterpret_cast<T*>(arg); + int r = rados_aio_get_return_value(c); + (obj->*MF)(r); +} + +std::string get_object_name(const std::string &prefix, uint64_t number); + +std::string unique_lock_name(const std::string &name, void *address); + +void rados_ctx_callback(rados_completion_t c, void *arg); + +template <typename M> +Context *create_async_context_callback(M journal_metadata, Context *on_finish) { + // use async callback to acquire a clean lock context + return new detail::C_AsyncCallback<M>(journal_metadata, on_finish); +} + +} // namespace utils +} // namespace journal + +#endif // CEPH_JOURNAL_UTILS_H |