diff options
Diffstat (limited to 'src/journal/JournalPlayer.cc')
-rw-r--r-- | src/journal/JournalPlayer.cc | 871 |
1 files changed, 871 insertions, 0 deletions
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc new file mode 100644 index 000000000..4a0912476 --- /dev/null +++ b/src/journal/JournalPlayer.cc @@ -0,0 +1,871 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/PriorityCache.h" +#include "include/stringify.h" +#include "journal/JournalPlayer.h" +#include "journal/Entry.h" +#include "journal/ReplayHandler.h" +#include "journal/Types.h" +#include "journal/Utils.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalPlayer: " << this << " " + +namespace journal { + +namespace { + +static const uint64_t MIN_FETCH_BYTES = 32768; + +struct C_HandleComplete : public Context { + ReplayHandler* replay_handler; + + explicit C_HandleComplete(ReplayHandler* r) : replay_handler(std::move(r)) {} + ~C_HandleComplete() override {} + void finish(int r) override { + replay_handler->handle_complete(r); + } +}; + +struct C_HandleEntriesAvailable : public Context { + ReplayHandler* replay_handler; + + explicit C_HandleEntriesAvailable(ReplayHandler* r) : replay_handler(std::move(r)) {} + ~C_HandleEntriesAvailable() override {} + void finish(int r) override { + replay_handler->handle_entries_available(); + } +}; + +} // anonymous namespace + +JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, + std::string_view object_oid_prefix, + ceph::ref_t<JournalMetadata> journal_metadata, + ReplayHandler* replay_handler, + CacheManagerHandler *cache_manager_handler) + : m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(std::move(journal_metadata)), + m_replay_handler(std::move(replay_handler)), + m_cache_manager_handler(cache_manager_handler), + m_cache_rebalance_handler(this) +{ + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + + ObjectSetPosition commit_position; + m_journal_metadata->get_commit_position(&commit_position); + + if (!commit_position.object_positions.empty()) { + ldout(m_cct, 5) << "commit position: " << commit_position << dendl; + + // start replay after the last committed entry's object + uint8_t splay_width = m_journal_metadata->get_splay_width(); + auto &active_position = commit_position.object_positions.front(); + m_active_tag_tid = active_position.tag_tid; + m_commit_position_valid = true; + m_commit_position = active_position; + m_splay_offset = active_position.object_number % splay_width; + for (auto &position : commit_position.object_positions) { + uint8_t splay_offset = position.object_number % splay_width; + m_commit_positions[splay_offset] = position; + } + } + + if (m_cache_manager_handler != nullptr) { + m_cache_name = "JournalPlayer/" + stringify(m_ioctx.get_id()) + "/" + + m_object_oid_prefix; + auto order = m_journal_metadata->get_order(); + auto splay_width = m_journal_metadata->get_splay_width(); + uint64_t min_size = MIN_FETCH_BYTES * splay_width; + uint64_t max_size = (2 << order) * splay_width; + + m_cache_manager_handler->register_cache(m_cache_name, min_size, max_size, + &m_cache_rebalance_handler); + m_max_fetch_bytes = 0; + } else { + m_max_fetch_bytes = 2 << m_journal_metadata->get_order(); + } +} + +JournalPlayer::~JournalPlayer() { + ceph_assert(m_async_op_tracker.empty()); + { + std::lock_guard locker{m_lock}; + ceph_assert(m_shut_down); + ceph_assert(m_fetch_object_numbers.empty()); + ceph_assert(!m_watch_scheduled); + } + + if (m_cache_manager_handler != nullptr) { + m_cache_manager_handler->unregister_cache(m_cache_name); + } +} + +void JournalPlayer::prefetch() { + std::lock_guard locker{m_lock}; + ceph_assert(m_state == STATE_INIT); + + if (m_shut_down) { + return; + } + + if (m_cache_manager_handler != nullptr && m_max_fetch_bytes == 0) { + m_state = STATE_WAITCACHE; + return; + } + + m_state = STATE_PREFETCH; + + m_active_set = m_journal_metadata->get_active_set(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + m_prefetch_splay_offsets.insert(splay_offset); + } + + // compute active object for each splay offset (might be before + // active set) + std::map<uint8_t, uint64_t> splay_offset_to_objects; + for (auto &position : m_commit_positions) { + ceph_assert(splay_offset_to_objects.count(position.first) == 0); + splay_offset_to_objects[position.first] = position.second.object_number; + } + + // prefetch the active object for each splay offset + std::set<uint64_t> prefetch_object_numbers; + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + uint64_t object_number = splay_offset; + if (splay_offset_to_objects.count(splay_offset) != 0) { + object_number = splay_offset_to_objects[splay_offset]; + } + + prefetch_object_numbers.insert(object_number); + } + + ldout(m_cct, 10) << __func__ << ": prefetching " + << prefetch_object_numbers.size() << " " << "objects" + << dendl; + for (auto object_number : prefetch_object_numbers) { + fetch(object_number); + } +} + +void JournalPlayer::prefetch_and_watch(double interval) { + { + std::lock_guard locker{m_lock}; + m_watch_enabled = true; + m_watch_interval = interval; + m_watch_step = WATCH_STEP_FETCH_CURRENT; + } + prefetch(); +} + +void JournalPlayer::shut_down(Context *on_finish) { + ldout(m_cct, 20) << __func__ << dendl; + std::lock_guard locker{m_lock}; + + ceph_assert(!m_shut_down); + m_shut_down = true; + m_watch_enabled = false; + + on_finish = utils::create_async_context_callback( + m_journal_metadata, on_finish); + + if (m_watch_scheduled) { + auto object_player = get_object_player(); + switch (m_watch_step) { + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second; + // fallthrough + case WATCH_STEP_FETCH_CURRENT: + object_player->unwatch(); + break; + case WATCH_STEP_ASSERT_ACTIVE: + break; + } + } + + m_async_op_tracker.wait_for_ops(on_finish); +} + +bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { + ldout(m_cct, 20) << __func__ << dendl; + std::lock_guard locker{m_lock}; + + if (m_state != STATE_PLAYBACK) { + m_handler_notified = false; + return false; + } + + if (!verify_playback_ready()) { + if (!is_object_set_ready()) { + m_handler_notified = false; + } else { + refetch(true); + } + return false; + } + + auto object_player = get_object_player(); + ceph_assert(object_player && !object_player->empty()); + + object_player->front(entry); + object_player->pop_front(); + + uint64_t last_entry_tid; + if (m_journal_metadata->get_last_allocated_entry_tid( + entry->get_tag_tid(), &last_entry_tid) && + entry->get_entry_tid() != last_entry_tid + 1) { + lderr(m_cct) << "missing prior journal entry: " << *entry << dendl; + + m_state = STATE_ERROR; + notify_complete(-ENOMSG); + return false; + } + + advance_splay_object(); + remove_empty_object_player(object_player); + + m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(), + entry->get_entry_tid()); + *commit_tid = m_journal_metadata->allocate_commit_tid( + object_player->get_object_number(), entry->get_tag_tid(), + entry->get_entry_tid()); + return true; +} + +void JournalPlayer::process_state(uint64_t object_number, int r) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", " + << "r=" << r << dendl; + + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (r >= 0) { + switch (m_state) { + case STATE_PREFETCH: + ldout(m_cct, 10) << "PREFETCH" << dendl; + r = process_prefetch(object_number); + break; + case STATE_PLAYBACK: + ldout(m_cct, 10) << "PLAYBACK" << dendl; + r = process_playback(object_number); + break; + case STATE_ERROR: + ldout(m_cct, 10) << "ERROR" << dendl; + break; + default: + lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl; + ceph_abort(); + break; + } + } + + if (r < 0) { + m_state = STATE_ERROR; + notify_complete(r); + } +} + +int JournalPlayer::process_prefetch(uint64_t object_number) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + + PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find( + splay_offset); + if (it == m_prefetch_splay_offsets.end()) { + return 0; + } + + bool prefetch_complete = false; + ceph_assert(m_object_players.count(splay_offset) == 1); + auto object_player = m_object_players[splay_offset]; + + // prefetch in-order since a newer splay object could prefetch first + if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) { + // skip past known committed records + if (m_commit_positions.count(splay_offset) != 0 && + !object_player->empty()) { + ObjectPosition &position = m_commit_positions[splay_offset]; + + ldout(m_cct, 15) << "seeking known commit position " << position << " in " + << object_player->get_oid() << dendl; + + bool found_commit = false; + Entry entry; + while (!object_player->empty()) { + object_player->front(&entry); + + if (entry.get_tag_tid() == position.tag_tid && + entry.get_entry_tid() == position.entry_tid) { + found_commit = true; + } else if (found_commit) { + ldout(m_cct, 10) << "located next uncommitted entry: " << entry + << dendl; + break; + } + + ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl; + m_journal_metadata->reserve_entry_tid(entry.get_tag_tid(), + entry.get_entry_tid()); + object_player->pop_front(); + } + + // do not search for commit position for this object + // if we've already seen it + if (found_commit) { + m_commit_positions.erase(splay_offset); + } + } + + // if the object is empty, pre-fetch the next splay object + if (object_player->empty() && object_player->refetch_required()) { + ldout(m_cct, 10) << "refetching potentially partially decoded object" + << dendl; + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); + fetch(object_player); + } else if (!remove_empty_object_player(object_player)) { + ldout(m_cct, 10) << "prefetch of object complete" << dendl; + prefetch_complete = true; + } + } + + if (!prefetch_complete) { + return 0; + } + + m_prefetch_splay_offsets.erase(it); + if (!m_prefetch_splay_offsets.empty()) { + return 0; + } + + ldout(m_cct, 10) << "switching to playback mode" << dendl; + m_state = STATE_PLAYBACK; + + // if we have a valid commit position, our read should start with + // the next consistent journal entry in the sequence + if (m_commit_position_valid) { + splay_offset = m_commit_position.object_number % splay_width; + object_player = m_object_players[splay_offset]; + + if (object_player->empty()) { + if (!object_player->refetch_required()) { + advance_splay_object(); + } + } else { + Entry entry; + object_player->front(&entry); + if (entry.get_tag_tid() == m_commit_position.tag_tid) { + advance_splay_object(); + } + } + } + + if (verify_playback_ready()) { + notify_entries_available(); + } else if (is_object_set_ready()) { + refetch(false); + } + return 0; +} + +int JournalPlayer::process_playback(uint64_t object_number) { + ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + + if (verify_playback_ready()) { + notify_entries_available(); + } else if (is_object_set_ready()) { + refetch(false); + } + return 0; +} + +bool JournalPlayer::is_object_set_ready() const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_watch_scheduled || !m_fetch_object_numbers.empty()) { + ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl; + return false; + } + + return true; +} + +bool JournalPlayer::verify_playback_ready() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + while (true) { + if (!is_object_set_ready()) { + ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; + return false; + } + + auto object_player = get_object_player(); + ceph_assert(object_player); + uint64_t object_num = object_player->get_object_number(); + + // Verify is the active object player has another entry available + // in the sequence + // NOTE: replay currently does not check tag class to playback multiple tags + // from different classes (issue #14909). When a new tag is discovered, it + // is assumed that the previous tag was closed at the last replayable entry. + Entry entry; + if (!object_player->empty()) { + m_watch_prune_active_tag = false; + object_player->front(&entry); + + if (!m_active_tag_tid) { + ldout(m_cct, 10) << __func__ << ": " + << "object_num=" << object_num << ", " + << "initial tag=" << entry.get_tag_tid() + << dendl; + m_active_tag_tid = entry.get_tag_tid(); + return true; + } else if (entry.get_tag_tid() < *m_active_tag_tid || + (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) { + // entry occurred before the current active tag + ldout(m_cct, 10) << __func__ << ": detected stale entry: " + << "object_num=" << object_num << ", " + << "entry=" << entry << dendl; + prune_tag(entry.get_tag_tid()); + continue; + } else if (entry.get_tag_tid() > *m_active_tag_tid) { + // new tag at current playback position -- implies that previous + // tag ended abruptly without flushing out all records + // search for the start record for the next tag + ldout(m_cct, 10) << __func__ << ": new tag detected: " + << "object_num=" << object_num << ", " + << "active_tag=" << *m_active_tag_tid << ", " + << "new_tag=" << entry.get_tag_tid() << dendl; + if (entry.get_entry_tid() == 0) { + // first entry in new tag -- can promote to active + prune_active_tag(entry.get_tag_tid()); + return true; + } else { + // prune current active and wait for initial entry for new tag + prune_active_tag(boost::none); + continue; + } + } else { + ldout(m_cct, 20) << __func__ << ": " + << "object_num=" << object_num << ", " + << "entry: " << entry << dendl; + ceph_assert(entry.get_tag_tid() == *m_active_tag_tid); + return true; + } + } else { + if (!m_active_tag_tid) { + // waiting for our first entry + ldout(m_cct, 10) << __func__ << ": waiting for first entry: " + << "object_num=" << object_num << dendl; + return false; + } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) { + ldout(m_cct, 10) << __func__ << ": no more entries" << dendl; + return false; + } else if (m_watch_enabled && m_watch_prune_active_tag) { + // detected current tag is now longer active and we have re-read the + // current object but it's still empty, so this tag is done + ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: " + << "object_num=" << object_num << ", " + << "active_tag " << *m_active_tag_tid << dendl; + prune_active_tag(boost::none); + continue; + } else if (object_player->refetch_required()) { + // if the active object requires a refetch, don't proceed looking for a + // new tag before this process completes + ldout(m_cct, 10) << __func__ << ": refetch required: " + << "object_num=" << object_num << dendl; + return false; + } else if (!m_watch_enabled) { + // current playback position is empty so this tag is done + ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: " + << "object_num=" << object_num << ", " + << "active_tag=" << *m_active_tag_tid << dendl; + prune_active_tag(boost::none); + continue; + } else if (!m_watch_scheduled) { + // no more entries and we don't have an active watch in-progress + ldout(m_cct, 10) << __func__ << ": no more entries -- watch required" + << dendl; + return false; + } + } + } + return false; +} + +void JournalPlayer::prune_tag(uint64_t tag_tid) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag " + << tag_tid << dendl; + + // prune records that are at or below the largest prune tag tid + if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) { + m_prune_tag_tid = tag_tid; + } + + bool pruned = false; + for (const auto &player_pair : m_object_players) { + auto& object_player = player_pair.second; + ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid() + << dendl; + while (!object_player->empty()) { + Entry entry; + object_player->front(&entry); + if (entry.get_tag_tid() == tag_tid) { + ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl; + object_player->pop_front(); + pruned = true; + } else { + break; + } + } + } + + // avoid watch delay when pruning stale tags from journal objects + if (pruned) { + ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate" + << dendl; + for (const auto &player_pair : m_object_players) { + auto& object_player = player_pair.second; + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE); + } + } + + // trim empty player to prefetch the next available object + for (const auto &player_pair : m_object_players) { + remove_empty_object_player(player_pair.second); + } +} + +void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(m_active_tag_tid); + + uint64_t active_tag_tid = *m_active_tag_tid; + if (tag_tid) { + m_active_tag_tid = tag_tid; + } + m_splay_offset = 0; + m_watch_step = WATCH_STEP_FETCH_CURRENT; + + prune_tag(active_tag_tid); +} + +ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player() const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + SplayedObjectPlayers::const_iterator it = m_object_players.find( + m_splay_offset); + ceph_assert(it != m_object_players.end()); + return it->second; +} + +ceph::ref_t<ObjectPlayer> JournalPlayer::get_object_player(uint64_t object_number) const { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + auto splay_it = m_object_players.find(splay_offset); + ceph_assert(splay_it != m_object_players.end()); + + auto object_player = splay_it->second; + ceph_assert(object_player->get_object_number() == object_number); + return object_player; +} + +void JournalPlayer::advance_splay_object() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ++m_splay_offset; + m_splay_offset %= m_journal_metadata->get_splay_width(); + m_watch_step = WATCH_STEP_FETCH_CURRENT; + ldout(m_cct, 20) << __func__ << ": new offset " + << static_cast<uint32_t>(m_splay_offset) << dendl; +} + +bool JournalPlayer::remove_empty_object_player(const ceph::ref_t<ObjectPlayer> &player) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + ceph_assert(!m_watch_scheduled); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t object_set = player->get_object_number() / splay_width; + uint64_t active_set = m_journal_metadata->get_active_set(); + if (!player->empty() || object_set == active_set) { + return false; + } else if (player->refetch_required()) { + ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires " + << "a refetch" << dendl; + return false; + } else if (m_active_set != active_set) { + ldout(m_cct, 20) << __func__ << ": new active set detected, all players " + << "require refetch" << dendl; + m_active_set = active_set; + for (const auto& pair : m_object_players) { + pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE); + } + return false; + } + + ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty" + << dendl; + + m_watch_prune_active_tag = false; + m_watch_step = WATCH_STEP_FETCH_CURRENT; + + uint64_t next_object_num = player->get_object_number() + splay_width; + fetch(next_object_num); + return true; +} + +void JournalPlayer::fetch(uint64_t object_num) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + auto object_player = ceph::make_ref<ObjectPlayer>( + m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(), + m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(), + m_max_fetch_bytes); + + auto splay_width = m_journal_metadata->get_splay_width(); + m_object_players[object_num % splay_width] = object_player; + fetch(object_player); +} + +void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + uint64_t object_num = object_player->get_object_number(); + std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); + ceph_assert(m_fetch_object_numbers.count(object_num) == 0); + m_fetch_object_numbers.insert(object_num); + + ldout(m_cct, 10) << __func__ << ": " << oid << dendl; + C_Fetch *fetch_ctx = new C_Fetch(this, object_num); + + object_player->fetch(fetch_ctx); +} + +void JournalPlayer::handle_fetched(uint64_t object_num, int r) { + ldout(m_cct, 10) << __func__ << ": " + << utils::get_object_name(m_object_oid_prefix, object_num) + << ": r=" << r << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_fetch_object_numbers.count(object_num) == 1); + m_fetch_object_numbers.erase(object_num); + + if (m_shut_down) { + return; + } + + if (r == 0) { + auto object_player = get_object_player(object_num); + remove_empty_object_player(object_player); + } + process_state(object_num, r); +} + +void JournalPlayer::refetch(bool immediate) { + ldout(m_cct, 10) << __func__ << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + m_handler_notified = false; + + // if watching the object, handle the periodic re-fetch + if (m_watch_enabled) { + schedule_watch(immediate); + return; + } + + auto object_player = get_object_player(); + if (object_player->refetch_required()) { + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); + fetch(object_player); + return; + } + + notify_complete(0); +} + +void JournalPlayer::schedule_watch(bool immediate) { + ldout(m_cct, 10) << __func__ << dendl; + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_watch_scheduled) { + return; + } + + m_watch_scheduled = true; + + if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) { + // detect if a new tag has been created in case we are blocked + // by an incomplete tag sequence + ldout(m_cct, 20) << __func__ << ": asserting active tag=" + << *m_active_tag_tid << dendl; + + m_async_op_tracker.start_op(); + auto ctx = new LambdaContext([this](int r) { + handle_watch_assert_active(r); + }); + m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx); + return; + } + + ceph::ref_t<ObjectPlayer> object_player; + double watch_interval = m_watch_interval; + + switch (m_watch_step) { + case WATCH_STEP_FETCH_CURRENT: + { + object_player = get_object_player(); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t object_set = object_player->get_object_number() / splay_width; + if (immediate || + (object_player->get_refetch_state() == + ObjectPlayer::REFETCH_STATE_IMMEDIATE) || + (object_set < active_set && object_player->refetch_required())) { + ldout(m_cct, 20) << __func__ << ": immediately refetching " + << object_player->get_oid() + << dendl; + object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE); + watch_interval = 0; + } + } + break; + case WATCH_STEP_FETCH_FIRST: + object_player = m_object_players.begin()->second; + watch_interval = 0; + break; + default: + ceph_abort(); + } + + ldout(m_cct, 20) << __func__ << ": scheduling watch on " + << object_player->get_oid() << dendl; + Context *ctx = utils::create_async_context_callback( + m_journal_metadata, new C_Watch(this, object_player->get_object_number())); + object_player->watch(ctx, watch_interval); +} + +void JournalPlayer::handle_watch(uint64_t object_num, int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + std::lock_guard locker{m_lock}; + ceph_assert(m_watch_scheduled); + m_watch_scheduled = false; + + if (m_shut_down || r == -ECANCELED) { + // unwatch of object player(s) + return; + } + + auto object_player = get_object_player(object_num); + if (r == 0 && object_player->empty()) { + // possibly need to prune this empty object player if we've + // already fetched it after the active set was advanced with no + // new records + remove_empty_object_player(object_player); + } + + // determine what object to query on next watch schedule tick + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (m_watch_step == WATCH_STEP_FETCH_CURRENT && + object_player->get_object_number() % splay_width != 0) { + m_watch_step = WATCH_STEP_FETCH_FIRST; + } else if (m_active_tag_tid) { + m_watch_step = WATCH_STEP_ASSERT_ACTIVE; + } else { + m_watch_step = WATCH_STEP_FETCH_CURRENT; + } + + process_state(object_num, r); +} + +void JournalPlayer::handle_watch_assert_active(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + + std::lock_guard locker{m_lock}; + ceph_assert(m_watch_scheduled); + m_watch_scheduled = false; + + if (r == -ESTALE) { + // newer tag exists -- since we are at this step in the watch sequence, + // we know we can prune the active tag if watch fails again + ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " " + << "no longer active" << dendl; + m_watch_prune_active_tag = true; + } + + m_watch_step = WATCH_STEP_FETCH_CURRENT; + if (!m_shut_down && m_watch_enabled) { + schedule_watch(false); + } + m_async_op_tracker.finish_op(); +} + +void JournalPlayer::notify_entries_available() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_handler_notified) { + return; + } + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_journal_metadata->queue(new C_HandleEntriesAvailable(m_replay_handler), 0); +} + +void JournalPlayer::notify_complete(int r) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; + m_journal_metadata->queue(new C_HandleComplete(m_replay_handler), r); +} + +void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) { + std::lock_guard locker{m_lock}; + + if (m_state == STATE_ERROR || m_shut_down) { + return; + } + + auto splay_width = m_journal_metadata->get_splay_width(); + m_max_fetch_bytes = p2align<uint64_t>(new_cache_bytes / splay_width, 4096); + + ldout(m_cct, 10) << __func__ << ": new_cache_bytes=" << new_cache_bytes + << ", max_fetch_bytes=" << m_max_fetch_bytes << dendl; + + uint64_t min_bytes = MIN_FETCH_BYTES; + + if (m_state == STATE_WAITCACHE) { + m_state = STATE_INIT; + if (m_max_fetch_bytes >= min_bytes) { + m_async_op_tracker.start_op(); + auto ctx = new LambdaContext( + [this](int r) { + prefetch(); + m_async_op_tracker.finish_op(); + }); + m_journal_metadata->queue(ctx, 0); + return; + } + } else { + min_bytes = p2align<uint64_t>(min_bytes - (rand() % min_bytes) / 2, 4096); + } + + if (m_max_fetch_bytes < min_bytes) { + lderr(m_cct) << __func__ << ": can't allocate enough memory from cache" + << dendl; + m_state = STATE_ERROR; + notify_complete(-ENOMEM); + return; + } + + for (auto &pair : m_object_players) { + pair.second->set_max_fetch_bytes(m_max_fetch_bytes); + } +} + + +} // namespace journal |