summaryrefslogtreecommitdiffstats
path: root/src/journal/ObjectPlayer.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/journal/ObjectPlayer.cc355
1 files changed, 355 insertions, 0 deletions
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
new file mode 100644
index 000000000..56eec51f6
--- /dev/null
+++ b/src/journal/ObjectPlayer.cc
@@ -0,0 +1,355 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Utils.h"
+#include "common/Timer.h"
+#include <limits>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectPlayer: " << this << " "
+
+namespace journal {
+
+namespace {
+
+bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
+ uint32_t *pad_len, bool *partial_entry) {
+ const uint32_t MAX_PAD = 8;
+ auto pad_bytes = MAX_PAD - off % MAX_PAD;
+ auto next = *iter;
+
+ ceph_assert(!next.end());
+ if (*next != '\0') {
+ return false;
+ }
+
+ for (auto i = pad_bytes - 1; i > 0; i--) {
+ if ((++next).end()) {
+ *partial_entry = true;
+ return false;
+ }
+ if (*next != '\0') {
+ return false;
+ }
+ }
+
+ *iter = next;
+ *pad_len += pad_bytes;
+ return true;
+}
+
+} // anonymous namespace
+
+ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
+ const std::string& object_oid_prefix,
+ uint64_t object_num, SafeTimer &timer,
+ ceph::mutex &timer_lock, uint8_t order,
+ uint64_t max_fetch_bytes)
+ : m_object_num(object_num),
+ m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
+ m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+ m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
+ m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
+{
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+ObjectPlayer::~ObjectPlayer() {
+ {
+ std::lock_guard timer_locker{m_timer_lock};
+ std::lock_guard locker{m_lock};
+ ceph_assert(!m_fetch_in_progress);
+ ceph_assert(m_watch_ctx == nullptr);
+ }
+}
+
+void ObjectPlayer::fetch(Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(!m_fetch_in_progress);
+ m_fetch_in_progress = true;
+
+ C_Fetch *context = new C_Fetch(this, on_finish);
+ librados::ObjectReadOperation op;
+ op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ auto rados_completion =
+ librados::Rados::aio_create_completion(context, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
+ ceph_assert(r == 0);
+ rados_completion->release();
+}
+
+void ObjectPlayer::watch(Context *on_fetch, double interval) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
+
+ std::lock_guard timer_locker{m_timer_lock};
+ m_watch_interval = interval;
+
+ ceph_assert(m_watch_ctx == nullptr);
+ m_watch_ctx = on_fetch;
+
+ schedule_watch();
+}
+
+void ObjectPlayer::unwatch() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
+ Context *watch_ctx = nullptr;
+ {
+ std::lock_guard timer_locker{m_timer_lock};
+ ceph_assert(!m_unwatched);
+ m_unwatched = true;
+
+ if (!cancel_watch()) {
+ return;
+ }
+
+ std::swap(watch_ctx, m_watch_ctx);
+ }
+
+ if (watch_ctx != nullptr) {
+ watch_ctx->complete(-ECANCELED);
+ }
+}
+
+void ObjectPlayer::front(Entry *entry) const {
+ std::lock_guard locker{m_lock};
+ ceph_assert(!m_entries.empty());
+ *entry = m_entries.front();
+}
+
+void ObjectPlayer::pop_front() {
+ std::lock_guard locker{m_lock};
+ ceph_assert(!m_entries.empty());
+
+ auto &entry = m_entries.front();
+ m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
+ m_entries.pop_front();
+}
+
+int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
+ bool *refetch) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
+ << bl.length() << dendl;
+
+ *refetch = false;
+ if (r == -ENOENT) {
+ return 0;
+ } else if (r < 0) {
+ return r;
+ } else if (bl.length() == 0) {
+ return 0;
+ }
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_fetch_in_progress);
+ m_read_off += bl.length();
+ m_read_bl.append(bl);
+ m_refetch_state = REFETCH_STATE_REQUIRED;
+
+ bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
+ bool partial_entry = false;
+ bool invalid = false;
+ uint32_t invalid_start_off = 0;
+
+ clear_invalid_range(m_read_bl_off, m_read_bl.length());
+ bufferlist::const_iterator iter{&m_read_bl, 0};
+ uint32_t pad_len = 0;
+ while (!iter.end()) {
+ uint32_t bytes_needed;
+ uint32_t bl_off = iter.get_off();
+ if (!Entry::is_readable(iter, &bytes_needed)) {
+ if (bytes_needed != 0) {
+ invalid_start_off = m_read_bl_off + bl_off;
+ invalid = true;
+ partial_entry = true;
+ if (full_fetch) {
+ lderr(m_cct) << ": partial record at offset " << invalid_start_off
+ << dendl;
+ } else {
+ ldout(m_cct, 20) << ": partial record detected, will re-fetch"
+ << dendl;
+ }
+ break;
+ }
+
+ if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
+ &pad_len, &partial_entry)) {
+ invalid_start_off = m_read_bl_off + bl_off;
+ invalid = true;
+ if (partial_entry) {
+ if (full_fetch) {
+ lderr(m_cct) << ": partial pad at offset " << invalid_start_off
+ << dendl;
+ } else {
+ ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
+ << dendl;
+ }
+ } else {
+ lderr(m_cct) << ": detected corrupt journal entry at offset "
+ << invalid_start_off << dendl;
+ }
+ break;
+ }
+ ++iter;
+ continue;
+ }
+
+ Entry entry;
+ decode(entry, iter);
+ ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
+
+ uint32_t entry_len = iter.get_off() - bl_off;
+ if (invalid) {
+ // new corrupt region detected
+ uint32_t invalid_end_off = m_read_bl_off + bl_off;
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ m_invalid_ranges.insert(invalid_start_off,
+ invalid_end_off - invalid_start_off);
+ invalid = false;
+
+ m_read_bl_off = invalid_end_off;
+ }
+
+ EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
+ entry.get_entry_tid()));
+ if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
+ m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
+ } else {
+ ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
+ *m_entry_keys[entry_key] = entry;
+ }
+
+ // prune decoded / corrupted journal entries from front of bl
+ bufferlist sub_bl;
+ sub_bl.substr_of(m_read_bl, iter.get_off(),
+ m_read_bl.length() - iter.get_off());
+ sub_bl.swap(m_read_bl);
+ iter = bufferlist::iterator(&m_read_bl, 0);
+
+ // advance the decoded entry offset
+ m_read_bl_off += entry_len + pad_len;
+ pad_len = 0;
+ }
+
+ if (invalid) {
+ uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
+ if (!partial_entry) {
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ }
+ m_invalid_ranges.insert(invalid_start_off,
+ invalid_end_off - invalid_start_off);
+ }
+
+ if (!m_invalid_ranges.empty() && !partial_entry) {
+ return -EBADMSG;
+ } else if (partial_entry && (full_fetch || m_entries.empty())) {
+ *refetch = true;
+ return -EAGAIN;
+ }
+
+ return 0;
+}
+
+void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
+ // possibly remove previously partial record region
+ InvalidRanges decode_range;
+ decode_range.insert(off, len);
+ InvalidRanges intersect_range;
+ intersect_range.intersection_of(m_invalid_ranges, decode_range);
+ if (!intersect_range.empty()) {
+ ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
+ << dendl;
+ m_invalid_ranges.subtract(intersect_range);
+ }
+}
+
+void ObjectPlayer::schedule_watch() {
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
+ if (m_watch_ctx == NULL) {
+ return;
+ }
+
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
+ ceph_assert(m_watch_task == nullptr);
+ m_watch_task = m_timer.add_event_after(
+ m_watch_interval,
+ new LambdaContext([this](int) {
+ handle_watch_task();
+ }));
+}
+
+bool ObjectPlayer::cancel_watch() {
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
+ if (m_watch_task != nullptr) {
+ bool canceled = m_timer.cancel_event(m_watch_task);
+ ceph_assert(canceled);
+
+ m_watch_task = nullptr;
+ return true;
+ }
+ return false;
+}
+
+void ObjectPlayer::handle_watch_task() {
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
+
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
+ ceph_assert(m_watch_ctx != nullptr);
+ ceph_assert(m_watch_task != nullptr);
+
+ m_watch_task = nullptr;
+ fetch(new C_WatchFetch(this));
+}
+
+void ObjectPlayer::handle_watch_fetched(int r) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
+ << dendl;
+
+ Context *watch_ctx = nullptr;
+ {
+ std::lock_guard timer_locker{m_timer_lock};
+ std::swap(watch_ctx, m_watch_ctx);
+
+ if (m_unwatched) {
+ m_unwatched = false;
+ r = -ECANCELED;
+ }
+ }
+
+ if (watch_ctx != nullptr) {
+ watch_ctx->complete(r);
+ }
+}
+
+void ObjectPlayer::C_Fetch::finish(int r) {
+ bool refetch = false;
+ r = object_player->handle_fetch_complete(r, read_bl, &refetch);
+
+ {
+ std::lock_guard locker{object_player->m_lock};
+ object_player->m_fetch_in_progress = false;
+ }
+
+ if (refetch) {
+ object_player->fetch(on_finish);
+ return;
+ }
+
+ object_player.reset();
+ on_finish->complete(r);
+}
+
+void ObjectPlayer::C_WatchFetch::finish(int r) {
+ object_player->handle_watch_fetched(r);
+}
+
+} // namespace journal