diff options
Diffstat (limited to '')
-rw-r--r-- | src/osdc/Objecter.cc | 5344 |
1 files changed, 5344 insertions, 0 deletions
diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc new file mode 100644 index 000000000..6fb200eb1 --- /dev/null +++ b/src/osdc/Objecter.cc @@ -0,0 +1,5344 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <algorithm> +#include <cerrno> + +#include "Objecter.h" +#include "osd/OSDMap.h" +#include "osd/error_code.h" +#include "Filer.h" + +#include "mon/MonClient.h" +#include "mon/error_code.h" + +#include "msg/Messenger.h" +#include "msg/Message.h" + +#include "messages/MPing.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "messages/MOSDBackoff.h" +#include "messages/MOSDMap.h" + +#include "messages/MPoolOp.h" +#include "messages/MPoolOpReply.h" + +#include "messages/MGetPoolStats.h" +#include "messages/MGetPoolStatsReply.h" +#include "messages/MStatfs.h" +#include "messages/MStatfsReply.h" + +#include "messages/MMonCommand.h" + +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" + +#include "messages/MWatchNotify.h" + + +#include "common/Cond.h" +#include "common/config.h" +#include "common/perf_counters.h" +#include "common/scrub_types.h" +#include "include/str_list.h" +#include "common/errno.h" +#include "common/EventTrace.h" +#include "common/async/waiter.h" +#include "error_code.h" + + +using std::list; +using std::make_pair; +using std::map; +using std::ostream; +using std::ostringstream; +using std::pair; +using std::set; +using std::string; +using std::stringstream; +using std::vector; + +using ceph::decode; +using ceph::encode; +using ceph::Formatter; + +using std::defer_lock; + +using ceph::real_time; +using ceph::real_clock; + +using ceph::mono_clock; +using ceph::mono_time; + +using ceph::timespan; + +using ceph::shunique_lock; +using ceph::acquire_shared; +using ceph::acquire_unique; + +namespace bc = boost::container; +namespace bs = boost::system; +namespace ca = ceph::async; +namespace cb = ceph::buffer; + +#define dout_subsys ceph_subsys_objecter +#undef dout_prefix +#define dout_prefix *_dout << messenger->get_myname() << ".objecter " + + +enum { + l_osdc_first = 123200, + l_osdc_op_active, + l_osdc_op_laggy, + l_osdc_op_send, + l_osdc_op_send_bytes, + l_osdc_op_resend, + l_osdc_op_reply, + l_osdc_oplen_avg, + + l_osdc_op, + l_osdc_op_r, + l_osdc_op_w, + l_osdc_op_rmw, + l_osdc_op_pg, + + l_osdc_osdop_stat, + l_osdc_osdop_create, + l_osdc_osdop_read, + l_osdc_osdop_write, + l_osdc_osdop_writefull, + l_osdc_osdop_writesame, + l_osdc_osdop_append, + l_osdc_osdop_zero, + l_osdc_osdop_truncate, + l_osdc_osdop_delete, + l_osdc_osdop_mapext, + l_osdc_osdop_sparse_read, + l_osdc_osdop_clonerange, + l_osdc_osdop_getxattr, + l_osdc_osdop_setxattr, + l_osdc_osdop_cmpxattr, + l_osdc_osdop_rmxattr, + l_osdc_osdop_resetxattrs, + l_osdc_osdop_call, + l_osdc_osdop_watch, + l_osdc_osdop_notify, + l_osdc_osdop_src_cmpxattr, + l_osdc_osdop_pgls, + l_osdc_osdop_pgls_filter, + l_osdc_osdop_other, + + l_osdc_linger_active, + l_osdc_linger_send, + l_osdc_linger_resend, + l_osdc_linger_ping, + + l_osdc_poolop_active, + l_osdc_poolop_send, + l_osdc_poolop_resend, + + l_osdc_poolstat_active, + l_osdc_poolstat_send, + l_osdc_poolstat_resend, + + l_osdc_statfs_active, + l_osdc_statfs_send, + l_osdc_statfs_resend, + + l_osdc_command_active, + l_osdc_command_send, + l_osdc_command_resend, + + l_osdc_map_epoch, + l_osdc_map_full, + l_osdc_map_inc, + + l_osdc_osd_sessions, + l_osdc_osd_session_open, + l_osdc_osd_session_close, + l_osdc_osd_laggy, + + l_osdc_osdop_omap_wr, + l_osdc_osdop_omap_rd, + l_osdc_osdop_omap_del, + + l_osdc_last, +}; + +namespace { +inline bs::error_code osdcode(int r) { + return (r < 0) ? bs::error_code(-r, osd_category()) : bs::error_code(); +} +} + +// config obs ---------------------------- + +class Objecter::RequestStateHook : public AdminSocketHook { + Objecter *m_objecter; +public: + explicit RequestStateHook(Objecter *objecter); + int call(std::string_view command, const cmdmap_t& cmdmap, + Formatter *f, + std::ostream& ss, + cb::list& out) override; +}; + +std::unique_lock<std::mutex> Objecter::OSDSession::get_lock(object_t& oid) +{ + if (oid.name.empty()) + return {}; + + static constexpr uint32_t HASH_PRIME = 1021; + uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) + % HASH_PRIME; + + return {completion_locks[h % num_locks], std::defer_lock}; +} + +const char** Objecter::get_tracked_conf_keys() const +{ + static const char *config_keys[] = { + "crush_location", + "rados_mon_op_timeout", + "rados_osd_op_timeout", + NULL + }; + return config_keys; +} + + +void Objecter::handle_conf_change(const ConfigProxy& conf, + const std::set <std::string> &changed) +{ + if (changed.count("crush_location")) { + update_crush_location(); + } + if (changed.count("rados_mon_op_timeout")) { + mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout"); + } + if (changed.count("rados_osd_op_timeout")) { + osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout"); + } +} + +void Objecter::update_crush_location() +{ + unique_lock wl(rwlock); + crush_location = cct->crush_location.get_location(); +} + +// messages ------------------------------ + +/* + * initialize only internal data structures, don't initiate cluster interaction + */ +void Objecter::init() +{ + ceph_assert(!initialized); + + if (!logger) { + PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last); + + pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv", + PerfCountersBuilder::PRIO_CRITICAL); + pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations"); + pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations"); + pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES)); + pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations"); + pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply"); + pcb.add_u64_avg(l_osdc_oplen_avg, "oplen_avg", "Average length of operation vector"); + + pcb.add_u64_counter(l_osdc_op, "op", "Operations"); + pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd", + PerfCountersBuilder::PRIO_CRITICAL); + pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr", + PerfCountersBuilder::PRIO_CRITICAL); + pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations", + "rdwr", PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation"); + + pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations"); + pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create", + "Create object operations"); + pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations"); + pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations"); + pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull", + "Write full object operations"); + pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame", + "Write same operations"); + pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append", + "Append operation"); + pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero", + "Set object to zero operations"); + pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate", + "Truncate object operations"); + pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete", + "Delete object operations"); + pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext", + "Map extent operations"); + pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read", + "Sparse read operations"); + pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange", + "Clone range operations"); + pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr", + "Get xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr", + "Set xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr", + "Xattr comparison operations"); + pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr", + "Remove xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs", + "Reset xattr operations"); + pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call", + "Call (execute) operations"); + pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch", + "Watch by object operations"); + pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify", + "Notify about object operations"); + pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr", + "Extended attribute comparison in multi operations"); + pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls"); + pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter"); + pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations"); + + pcb.add_u64(l_osdc_linger_active, "linger_active", + "Active lingering operations"); + pcb.add_u64_counter(l_osdc_linger_send, "linger_send", + "Sent lingering operations"); + pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend", + "Resent lingering operations"); + pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping", + "Sent pings to lingering operations"); + + pcb.add_u64(l_osdc_poolop_active, "poolop_active", + "Active pool operations"); + pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send", + "Sent pool operations"); + pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend", + "Resent pool operations"); + + pcb.add_u64(l_osdc_poolstat_active, "poolstat_active", + "Active get pool stat operations"); + pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send", + "Pool stat operations sent"); + pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend", + "Resent pool stats"); + + pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations"); + pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats"); + pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend", + "Resent FS stats"); + + pcb.add_u64(l_osdc_command_active, "command_active", "Active commands"); + pcb.add_u64_counter(l_osdc_command_send, "command_send", + "Sent commands"); + pcb.add_u64_counter(l_osdc_command_resend, "command_resend", + "Resent commands"); + + pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch"); + pcb.add_u64_counter(l_osdc_map_full, "map_full", + "Full OSD maps received"); + pcb.add_u64_counter(l_osdc_map_inc, "map_inc", + "Incremental OSD maps received"); + + pcb.add_u64(l_osdc_osd_sessions, "osd_sessions", + "Open sessions"); // open sessions + pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open", + "Sessions opened"); + pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close", + "Sessions closed"); + pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions"); + + pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr", + "OSD OMAP write operations"); + pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd", + "OSD OMAP read operations"); + pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del", + "OSD OMAP delete operations"); + + logger = pcb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + } + + m_request_state_hook = new RequestStateHook(this); + auto admin_socket = cct->get_admin_socket(); + int ret = admin_socket->register_command("objecter_requests", + m_request_state_hook, + "show in-progress osd requests"); + + /* Don't warn on EEXIST, happens if multiple ceph clients + * are instantiated from one process */ + if (ret < 0 && ret != -EEXIST) { + lderr(cct) << "error registering admin socket command: " + << cpp_strerror(ret) << dendl; + } + + update_crush_location(); + + cct->_conf.add_observer(this); + + initialized = true; +} + +/* + * ok, cluster interaction can happen + */ +void Objecter::start(const OSDMap* o) +{ + shared_lock rl(rwlock); + + start_tick(); + if (o) { + osdmap->deepish_copy_from(*o); + prune_pg_mapping(osdmap->get_pools()); + } else if (osdmap->get_epoch() == 0) { + _maybe_request_map(); + } +} + +void Objecter::shutdown() +{ + ceph_assert(initialized); + + unique_lock wl(rwlock); + + initialized = false; + + wl.unlock(); + cct->_conf.remove_observer(this); + wl.lock(); + + while (!osd_sessions.empty()) { + auto p = osd_sessions.begin(); + close_session(p->second); + } + + while(!check_latest_map_lingers.empty()) { + auto i = check_latest_map_lingers.begin(); + i->second->put(); + check_latest_map_lingers.erase(i->first); + } + + while(!check_latest_map_ops.empty()) { + auto i = check_latest_map_ops.begin(); + i->second->put(); + check_latest_map_ops.erase(i->first); + } + + while(!check_latest_map_commands.empty()) { + auto i = check_latest_map_commands.begin(); + i->second->put(); + check_latest_map_commands.erase(i->first); + } + + while(!poolstat_ops.empty()) { + auto i = poolstat_ops.begin(); + delete i->second; + poolstat_ops.erase(i->first); + } + + while(!statfs_ops.empty()) { + auto i = statfs_ops.begin(); + delete i->second; + statfs_ops.erase(i->first); + } + + while(!pool_ops.empty()) { + auto i = pool_ops.begin(); + delete i->second; + pool_ops.erase(i->first); + } + + ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl; + while(!homeless_session->linger_ops.empty()) { + auto i = homeless_session->linger_ops.begin(); + ldout(cct, 10) << " linger_op " << i->first << dendl; + LingerOp *lop = i->second; + { + std::unique_lock swl(homeless_session->lock); + _session_linger_op_remove(homeless_session, lop); + } + linger_ops.erase(lop->linger_id); + linger_ops_set.erase(lop); + lop->put(); + } + + while(!homeless_session->ops.empty()) { + auto i = homeless_session->ops.begin(); + ldout(cct, 10) << " op " << i->first << dendl; + auto op = i->second; + { + std::unique_lock swl(homeless_session->lock); + _session_op_remove(homeless_session, op); + } + op->put(); + } + + while(!homeless_session->command_ops.empty()) { + auto i = homeless_session->command_ops.begin(); + ldout(cct, 10) << " command_op " << i->first << dendl; + auto cop = i->second; + { + std::unique_lock swl(homeless_session->lock); + _session_command_op_remove(homeless_session, cop); + } + cop->put(); + } + + if (tick_event) { + if (timer.cancel_event(tick_event)) { + ldout(cct, 10) << " successfully canceled tick" << dendl; + } + tick_event = 0; + } + + if (logger) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + logger = NULL; + } + + // Let go of Objecter write lock so timer thread can shutdown + wl.unlock(); + + // Outside of lock to avoid cycle WRT calls to RequestStateHook + // This is safe because we guarantee no concurrent calls to + // shutdown() with the ::initialized check at start. + if (m_request_state_hook) { + auto admin_socket = cct->get_admin_socket(); + admin_socket->unregister_commands(m_request_state_hook); + delete m_request_state_hook; + m_request_state_hook = NULL; + } +} + +void Objecter::_send_linger(LingerOp *info, + ceph::shunique_lock<ceph::shared_mutex>& sul) +{ + ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock); + + fu2::unique_function<Op::OpSig> oncommit; + osdc_opvec opv; + std::shared_lock watchl(info->watch_lock); + cb::list *poutbl = nullptr; + if (info->registered && info->is_watch) { + ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" + << dendl; + opv.push_back(OSDOp()); + opv.back().op.op = CEPH_OSD_OP_WATCH; + opv.back().op.watch.cookie = info->get_cookie(); + opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT; + opv.back().op.watch.gen = ++info->register_gen; + oncommit = CB_Linger_Reconnect(this, info); + } else { + ldout(cct, 15) << "send_linger " << info->linger_id << " register" + << dendl; + opv = info->ops; + // TODO Augment ca::Completion with an equivalent of + // target so we can handle these cases better. + auto c = std::make_unique<CB_Linger_Commit>(this, info); + if (!info->is_watch) { + info->notify_id = 0; + poutbl = &c->outbl; + } + oncommit = [c = std::move(c)](bs::error_code ec) mutable { + std::move(*c)(ec); + }; + } + watchl.unlock(); + auto o = new Op(info->target.base_oid, info->target.base_oloc, + std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ, + std::move(oncommit), info->pobjver); + o->outbl = poutbl; + o->snapid = info->snap; + o->snapc = info->snapc; + o->mtime = info->mtime; + + o->target = info->target; + o->tid = ++last_tid; + + // do not resend this; we will send a new op to reregister + o->should_resend = false; + o->ctx_budgeted = true; + + if (info->register_tid) { + // repeat send. cancel old registration op, if any. + std::unique_lock sl(info->session->lock); + if (info->session->ops.count(info->register_tid)) { + auto o = info->session->ops[info->register_tid]; + _op_cancel_map_check(o); + _cancel_linger_op(o); + } + sl.unlock(); + } + + _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget); + + logger->inc(l_osdc_linger_send); +} + +void Objecter::_linger_commit(LingerOp *info, bs::error_code ec, + cb::list& outbl) +{ + std::unique_lock wl(info->watch_lock); + ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl; + if (info->on_reg_commit) { + info->on_reg_commit->defer(std::move(info->on_reg_commit), + ec, cb::list{}); + info->on_reg_commit.reset(); + } + if (ec && info->on_notify_finish) { + info->on_notify_finish->defer(std::move(info->on_notify_finish), + ec, cb::list{}); + info->on_notify_finish.reset(); + } + + // only tell the user the first time we do this + info->registered = true; + info->pobjver = NULL; + + if (!info->is_watch) { + // make note of the notify_id + auto p = outbl.cbegin(); + try { + decode(info->notify_id, p); + ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id + << dendl; + } + catch (cb::error& e) { + } + } +} + +class CB_DoWatchError { + Objecter *objecter; + boost::intrusive_ptr<Objecter::LingerOp> info; + bs::error_code ec; +public: + CB_DoWatchError(Objecter *o, Objecter::LingerOp *i, + bs::error_code ec) + : objecter(o), info(i), ec(ec) { + info->_queued_async(); + } + void operator()() { + std::unique_lock wl(objecter->rwlock); + bool canceled = info->canceled; + wl.unlock(); + + if (!canceled) { + info->handle(ec, 0, info->get_cookie(), 0, {}); + } + + info->finished_async(); + } +}; + +bs::error_code Objecter::_normalize_watch_error(bs::error_code ec) +{ + // translate ENOENT -> ENOTCONN so that a delete->disconnection + // notification and a failure to reconnect because we raced with + // the delete appear the same to the user. + if (ec == bs::errc::no_such_file_or_directory) + ec = bs::error_code(ENOTCONN, osd_category()); + return ec; +} + +void Objecter::_linger_reconnect(LingerOp *info, bs::error_code ec) +{ + ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << ec + << " (last_error " << info->last_error << ")" << dendl; + std::unique_lock wl(info->watch_lock); + if (ec) { + if (!info->last_error) { + ec = _normalize_watch_error(ec); + if (info->handle) { + boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + } + } + } + + info->last_error = ec; +} + +void Objecter::_send_linger_ping(LingerOp *info) +{ + // rwlock is locked unique + // info->session->lock is locked + + if (cct->_conf->objecter_inject_no_watch_ping) { + ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING" + << dendl; + return; + } + if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) { + ldout(cct, 10) << __func__ << " PAUSERD" << dendl; + return; + } + + ceph::coarse_mono_time now = ceph::coarse_mono_clock::now(); + ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now + << dendl; + + osdc_opvec opv(1); + opv[0].op.op = CEPH_OSD_OP_WATCH; + opv[0].op.watch.cookie = info->get_cookie(); + opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING; + opv[0].op.watch.gen = info->register_gen; + + Op *o = new Op(info->target.base_oid, info->target.base_oloc, + std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ, + CB_Linger_Ping(this, info, now), + nullptr, nullptr); + o->target = info->target; + o->should_resend = false; + _send_op_account(o); + o->tid = ++last_tid; + _session_op_assign(info->session, o); + _send_op(o); + info->ping_tid = o->tid; + + logger->inc(l_osdc_linger_ping); +} + +void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono_time sent, + uint32_t register_gen) +{ + std::unique_lock l(info->watch_lock); + ldout(cct, 10) << __func__ << " " << info->linger_id + << " sent " << sent << " gen " << register_gen << " = " << ec + << " (last_error " << info->last_error + << " register_gen " << info->register_gen << ")" << dendl; + if (info->register_gen == register_gen) { + if (!ec) { + info->watch_valid_thru = sent; + } else if (ec && !info->last_error) { + ec = _normalize_watch_error(ec); + info->last_error = ec; + if (info->handle) { + boost::asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + } + } + } else { + ldout(cct, 20) << " ignoring old gen" << dendl; + } +} + +tl::expected<ceph::timespan, + bs::error_code> Objecter::linger_check(LingerOp *info) +{ + std::shared_lock l(info->watch_lock); + + ceph::coarse_mono_time stamp = info->watch_valid_thru; + if (!info->watch_pending_async.empty()) + stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front()); + auto age = ceph::coarse_mono_clock::now() - stamp; + + ldout(cct, 10) << __func__ << " " << info->linger_id + << " err " << info->last_error + << " age " << age << dendl; + if (info->last_error) + return tl::unexpected(info->last_error); + // return a safe upper bound (we are truncating to ms) + return age; +} + +void Objecter::linger_cancel(LingerOp *info) +{ + unique_lock wl(rwlock); + _linger_cancel(info); + info->put(); +} + +void Objecter::_linger_cancel(LingerOp *info) +{ + // rwlock is locked unique + ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl; + if (!info->canceled) { + OSDSession *s = info->session; + std::unique_lock sl(s->lock); + _session_linger_op_remove(s, info); + sl.unlock(); + + linger_ops.erase(info->linger_id); + linger_ops_set.erase(info); + ceph_assert(linger_ops.size() == linger_ops_set.size()); + + info->canceled = true; + info->put(); + + logger->dec(l_osdc_linger_active); + } +} + + + +Objecter::LingerOp *Objecter::linger_register(const object_t& oid, + const object_locator_t& oloc, + int flags) +{ + unique_lock l(rwlock); + // Acquire linger ID + auto info = new LingerOp(this, ++max_linger_id); + info->target.base_oid = oid; + info->target.base_oloc = oloc; + if (info->target.base_oloc.key == oid) + info->target.base_oloc.key.clear(); + info->target.flags = flags; + info->watch_valid_thru = ceph::coarse_mono_clock::now(); + ldout(cct, 10) << __func__ << " info " << info + << " linger_id " << info->linger_id + << " cookie " << info->get_cookie() + << dendl; + linger_ops[info->linger_id] = info; + linger_ops_set.insert(info); + ceph_assert(linger_ops.size() == linger_ops_set.size()); + + info->get(); // for the caller + return info; +} + +ceph_tid_t Objecter::linger_watch(LingerOp *info, + ObjectOperation& op, + const SnapContext& snapc, + real_time mtime, + cb::list& inbl, + decltype(info->on_reg_commit)&& oncommit, + version_t *objver) +{ + info->is_watch = true; + info->snapc = snapc; + info->mtime = mtime; + info->target.flags |= CEPH_OSD_FLAG_WRITE; + info->ops = op.ops; + info->inbl = inbl; + info->pobjver = objver; + info->on_reg_commit = std::move(oncommit); + + info->ctx_budget = take_linger_budget(info); + + shunique_lock sul(rwlock, ceph::acquire_unique); + _linger_submit(info, sul); + logger->inc(l_osdc_linger_active); + + op.clear(); + return info->linger_id; +} + +ceph_tid_t Objecter::linger_notify(LingerOp *info, + ObjectOperation& op, + snapid_t snap, cb::list& inbl, + decltype(LingerOp::on_reg_commit)&& onfinish, + version_t *objver) +{ + info->snap = snap; + info->target.flags |= CEPH_OSD_FLAG_READ; + info->ops = op.ops; + info->inbl = inbl; + info->pobjver = objver; + info->on_reg_commit = std::move(onfinish); + info->ctx_budget = take_linger_budget(info); + + shunique_lock sul(rwlock, ceph::acquire_unique); + _linger_submit(info, sul); + logger->inc(l_osdc_linger_active); + + op.clear(); + return info->linger_id; +} + +void Objecter::_linger_submit(LingerOp *info, + ceph::shunique_lock<ceph::shared_mutex>& sul) +{ + ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock); + ceph_assert(info->linger_id); + ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already! + + // Populate Op::target + OSDSession *s = NULL; + _calc_target(&info->target, nullptr); + + // Create LingerOp<->OSDSession relation + int r = _get_session(info->target.osd, &s, sul); + ceph_assert(r == 0); + unique_lock sl(s->lock); + _session_linger_op_assign(s, info); + sl.unlock(); + put_session(s); + + _send_linger(info, sul); +} + +struct CB_DoWatchNotify { + Objecter *objecter; + boost::intrusive_ptr<Objecter::LingerOp> info; + boost::intrusive_ptr<MWatchNotify> msg; + CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m) + : objecter(o), info(i), msg(m) { + info->_queued_async(); + } + void operator()() { + objecter->_do_watch_notify(std::move(info), std::move(msg)); + } +}; + +void Objecter::handle_watch_notify(MWatchNotify *m) +{ + shared_lock l(rwlock); + if (!initialized) { + return; + } + + LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie); + if (linger_ops_set.count(info) == 0) { + ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl; + return; + } + std::unique_lock wl(info->watch_lock); + if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) { + if (!info->last_error) { + info->last_error = bs::error_code(ENOTCONN, osd_category()); + if (info->handle) { + boost::asio::defer(finish_strand, CB_DoWatchError(this, info, + info->last_error)); + } + } + } else if (!info->is_watch) { + // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline + // since we know the only user (librados) is safe to call in + // fast-dispatch context + if (info->notify_id && + info->notify_id != m->notify_id) { + ldout(cct, 10) << __func__ << " reply notify " << m->notify_id + << " != " << info->notify_id << ", ignoring" << dendl; + } else if (info->on_notify_finish) { + info->on_notify_finish->defer( + std::move(info->on_notify_finish), + osdcode(m->return_code), std::move(m->get_data())); + + // if we race with reconnect we might get a second notify; only + // notify the caller once! + info->on_notify_finish = nullptr; + } + } else { + boost::asio::defer(finish_strand, CB_DoWatchNotify(this, info, m)); + } +} + +void Objecter::_do_watch_notify(boost::intrusive_ptr<LingerOp> info, + boost::intrusive_ptr<MWatchNotify> m) +{ + ldout(cct, 10) << __func__ << " " << *m << dendl; + + shared_lock l(rwlock); + ceph_assert(initialized); + + if (info->canceled) { + l.unlock(); + goto out; + } + + // notify completion? + ceph_assert(info->is_watch); + ceph_assert(info->handle); + ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT); + + l.unlock(); + + switch (m->opcode) { + case CEPH_WATCH_EVENT_NOTIFY: + info->handle({}, m->notify_id, m->cookie, m->notifier_gid, std::move(m->bl)); + break; + } + + out: + info->finished_async(); +} + +bool Objecter::ms_dispatch(Message *m) +{ + ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl; + switch (m->get_type()) { + // these we exlusively handle + case CEPH_MSG_OSD_OPREPLY: + handle_osd_op_reply(static_cast<MOSDOpReply*>(m)); + return true; + + case CEPH_MSG_OSD_BACKOFF: + handle_osd_backoff(static_cast<MOSDBackoff*>(m)); + return true; + + case CEPH_MSG_WATCH_NOTIFY: + handle_watch_notify(static_cast<MWatchNotify*>(m)); + m->put(); + return true; + + case MSG_COMMAND_REPLY: + if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) { + handle_command_reply(static_cast<MCommandReply*>(m)); + return true; + } else { + return false; + } + + case MSG_GETPOOLSTATSREPLY: + handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m)); + return true; + + case CEPH_MSG_POOLOP_REPLY: + handle_pool_op_reply(static_cast<MPoolOpReply*>(m)); + return true; + + case CEPH_MSG_STATFS_REPLY: + handle_fs_stats_reply(static_cast<MStatfsReply*>(m)); + return true; + + // these we give others a chance to inspect + + // MDS, OSD + case CEPH_MSG_OSD_MAP: + handle_osd_map(static_cast<MOSDMap*>(m)); + return false; + } + return false; +} + +void Objecter::_scan_requests( + OSDSession *s, + bool skipped_map, + bool cluster_full, + map<int64_t, bool> *pool_full_map, + map<ceph_tid_t, Op*>& need_resend, + list<LingerOp*>& need_resend_linger, + map<ceph_tid_t, CommandOp*>& need_resend_command, + ceph::shunique_lock<ceph::shared_mutex>& sul) +{ + ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock); + + list<LingerOp*> unregister_lingers; + + std::unique_lock sl(s->lock); + + // check for changed linger mappings (_before_ regular ops) + auto lp = s->linger_ops.begin(); + while (lp != s->linger_ops.end()) { + auto op = lp->second; + ceph_assert(op->session == s); + // check_linger_pool_dne() may touch linger_ops; prevent iterator + // invalidation + ++lp; + ldout(cct, 10) << " checking linger op " << op->linger_id << dendl; + bool unregister, force_resend_writes = cluster_full; + int r = _recalc_linger_op_target(op, sul); + if (pool_full_map) + force_resend_writes = force_resend_writes || + (*pool_full_map)[op->target.base_oloc.pool]; + switch (r) { + case RECALC_OP_TARGET_NO_ACTION: + if (!skipped_map && !force_resend_writes) + break; + // -- fall-thru -- + case RECALC_OP_TARGET_NEED_RESEND: + need_resend_linger.push_back(op); + _linger_cancel_map_check(op); + break; + case RECALC_OP_TARGET_POOL_DNE: + _check_linger_pool_dne(op, &unregister); + if (unregister) { + ldout(cct, 10) << " need to unregister linger op " + << op->linger_id << dendl; + op->get(); + unregister_lingers.push_back(op); + } + break; + } + } + + // check for changed request mappings + auto p = s->ops.begin(); + while (p != s->ops.end()) { + Op *op = p->second; + ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation + ldout(cct, 10) << " checking op " << op->tid << dendl; + _prune_snapc(osdmap->get_new_removed_snaps(), op); + bool force_resend_writes = cluster_full; + if (pool_full_map) + force_resend_writes = force_resend_writes || + (*pool_full_map)[op->target.base_oloc.pool]; + int r = _calc_target(&op->target, + op->session ? op->session->con.get() : nullptr); + switch (r) { + case RECALC_OP_TARGET_NO_ACTION: + if (!skipped_map && !(force_resend_writes && op->target.respects_full())) + break; + // -- fall-thru -- + case RECALC_OP_TARGET_NEED_RESEND: + _session_op_remove(op->session, op); + need_resend[op->tid] = op; + _op_cancel_map_check(op); + break; + case RECALC_OP_TARGET_POOL_DNE: + _check_op_pool_dne(op, &sl); + break; + } + } + + // commands + auto cp = s->command_ops.begin(); + while (cp != s->command_ops.end()) { + auto c = cp->second; + ++cp; + ldout(cct, 10) << " checking command " << c->tid << dendl; + bool force_resend_writes = cluster_full; + if (pool_full_map) + force_resend_writes = force_resend_writes || + (*pool_full_map)[c->target_pg.pool()]; + int r = _calc_command_target(c, sul); + switch (r) { + case RECALC_OP_TARGET_NO_ACTION: + // resend if skipped map; otherwise do nothing. + if (!skipped_map && !force_resend_writes) + break; + // -- fall-thru -- + case RECALC_OP_TARGET_NEED_RESEND: + need_resend_command[c->tid] = c; + _session_command_op_remove(c->session, c); + _command_cancel_map_check(c); + break; + case RECALC_OP_TARGET_POOL_DNE: + case RECALC_OP_TARGET_OSD_DNE: + case RECALC_OP_TARGET_OSD_DOWN: + _check_command_map_dne(c); + break; + } + } + + sl.unlock(); + + for (auto iter = unregister_lingers.begin(); + iter != unregister_lingers.end(); + ++iter) { + _linger_cancel(*iter); + (*iter)->put(); + } +} + +void Objecter::handle_osd_map(MOSDMap *m) +{ + ceph::shunique_lock sul(rwlock, acquire_unique); + if (!initialized) + return; + + ceph_assert(osdmap); + + if (m->fsid != monc->get_fsid()) { + ldout(cct, 0) << "handle_osd_map fsid " << m->fsid + << " != " << monc->get_fsid() << dendl; + return; + } + + bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); + bool cluster_full = _osdmap_full_flag(); + bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full || + _osdmap_has_pool_full(); + map<int64_t, bool> pool_full_map; + for (auto it = osdmap->get_pools().begin(); + it != osdmap->get_pools().end(); ++it) + pool_full_map[it->first] = _osdmap_pool_full(it->second); + + + list<LingerOp*> need_resend_linger; + map<ceph_tid_t, Op*> need_resend; + map<ceph_tid_t, CommandOp*> need_resend_command; + + if (m->get_last() <= osdmap->get_epoch()) { + ldout(cct, 3) << "handle_osd_map ignoring epochs [" + << m->get_first() << "," << m->get_last() + << "] <= " << osdmap->get_epoch() << dendl; + } else { + ldout(cct, 3) << "handle_osd_map got epochs [" + << m->get_first() << "," << m->get_last() + << "] > " << osdmap->get_epoch() << dendl; + + if (osdmap->get_epoch()) { + bool skipped_map = false; + // we want incrementals + for (epoch_t e = osdmap->get_epoch() + 1; + e <= m->get_last(); + e++) { + + if (osdmap->get_epoch() == e-1 && + m->incremental_maps.count(e)) { + ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e + << dendl; + OSDMap::Incremental inc(m->incremental_maps[e]); + osdmap->apply_incremental(inc); + + emit_blocklist_events(inc); + + logger->inc(l_osdc_map_inc); + } + else if (m->maps.count(e)) { + ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl; + auto new_osdmap = std::make_unique<OSDMap>(); + new_osdmap->decode(m->maps[e]); + + emit_blocklist_events(*osdmap, *new_osdmap); + osdmap = std::move(new_osdmap); + + logger->inc(l_osdc_map_full); + } + else { + if (e >= m->get_oldest()) { + ldout(cct, 3) << "handle_osd_map requesting missing epoch " + << osdmap->get_epoch()+1 << dendl; + _maybe_request_map(); + break; + } + ldout(cct, 3) << "handle_osd_map missing epoch " + << osdmap->get_epoch()+1 + << ", jumping to " << m->get_oldest() << dendl; + e = m->get_oldest() - 1; + skipped_map = true; + continue; + } + logger->set(l_osdc_map_epoch, osdmap->get_epoch()); + + prune_pg_mapping(osdmap->get_pools()); + cluster_full = cluster_full || _osdmap_full_flag(); + update_pool_full_map(pool_full_map); + + // check all outstanding requests on every epoch + for (auto& i : need_resend) { + _prune_snapc(osdmap->get_new_removed_snaps(), i.second); + } + _scan_requests(homeless_session, skipped_map, cluster_full, + &pool_full_map, need_resend, + need_resend_linger, need_resend_command, sul); + for (auto p = osd_sessions.begin(); + p != osd_sessions.end(); ) { + auto s = p->second; + _scan_requests(s, skipped_map, cluster_full, + &pool_full_map, need_resend, + need_resend_linger, need_resend_command, sul); + ++p; + // osd down or addr change? + if (!osdmap->is_up(s->osd) || + (s->con && + s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) { + close_session(s); + } + } + + ceph_assert(e == osdmap->get_epoch()); + } + + } else { + // first map. we want the full thing. + if (m->maps.count(m->get_last())) { + for (auto p = osd_sessions.begin(); + p != osd_sessions.end(); ++p) { + OSDSession *s = p->second; + _scan_requests(s, false, false, NULL, need_resend, + need_resend_linger, need_resend_command, sul); + } + ldout(cct, 3) << "handle_osd_map decoding full epoch " + << m->get_last() << dendl; + osdmap->decode(m->maps[m->get_last()]); + prune_pg_mapping(osdmap->get_pools()); + + _scan_requests(homeless_session, false, false, NULL, + need_resend, need_resend_linger, + need_resend_command, sul); + } else { + ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting" + << dendl; + monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); + monc->renew_subs(); + } + } + } + + // make sure need_resend targets reflect latest map + for (auto p = need_resend.begin(); p != need_resend.end(); ) { + Op *op = p->second; + if (op->target.epoch < osdmap->get_epoch()) { + ldout(cct, 10) << __func__ << " checking op " << p->first << dendl; + int r = _calc_target(&op->target, nullptr); + if (r == RECALC_OP_TARGET_POOL_DNE) { + p = need_resend.erase(p); + _check_op_pool_dne(op, nullptr); + } else { + ++p; + } + } else { + ++p; + } + } + + bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); + bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() + || _osdmap_has_pool_full(); + + // was/is paused? + if (was_pauserd || was_pausewr || pauserd || pausewr || + osdmap->get_epoch() < epoch_barrier) { + _maybe_request_map(); + } + + // resend requests + for (auto p = need_resend.begin(); + p != need_resend.end(); ++p) { + auto op = p->second; + auto s = op->session; + bool mapped_session = false; + if (!s) { + int r = _map_session(&op->target, &s, sul); + ceph_assert(r == 0); + mapped_session = true; + } else { + get_session(s); + } + std::unique_lock sl(s->lock); + if (mapped_session) { + _session_op_assign(s, op); + } + if (op->should_resend) { + if (!op->session->is_homeless() && !op->target.paused) { + logger->inc(l_osdc_op_resend); + _send_op(op); + } + } else { + _op_cancel_map_check(op); + _cancel_linger_op(op); + } + sl.unlock(); + put_session(s); + } + for (auto p = need_resend_linger.begin(); + p != need_resend_linger.end(); ++p) { + LingerOp *op = *p; + ceph_assert(op->session); + if (!op->session->is_homeless()) { + logger->inc(l_osdc_linger_resend); + _send_linger(op, sul); + } + } + for (auto p = need_resend_command.begin(); + p != need_resend_command.end(); ++p) { + auto c = p->second; + if (c->target.osd >= 0) { + _assign_command_session(c, sul); + if (c->session && !c->session->is_homeless()) { + _send_command(c); + } + } + } + + _dump_active(); + + // finish any Contexts that were waiting on a map update + auto p = waiting_for_map.begin(); + while (p != waiting_for_map.end() && + p->first <= osdmap->get_epoch()) { + //go through the list and call the onfinish methods + for (auto& [c, ec] : p->second) { + ca::post(std::move(c), ec); + } + waiting_for_map.erase(p++); + } + + monc->sub_got("osdmap", osdmap->get_epoch()); + + if (!waiting_for_map.empty()) { + _maybe_request_map(); + } +} + +void Objecter::enable_blocklist_events() +{ + unique_lock wl(rwlock); + + blocklist_events_enabled = true; +} + +void Objecter::consume_blocklist_events(std::set<entity_addr_t> *events) +{ + unique_lock wl(rwlock); + + if (events->empty()) { + events->swap(blocklist_events); + } else { + for (const auto &i : blocklist_events) { + events->insert(i); + } + blocklist_events.clear(); + } +} + +void Objecter::emit_blocklist_events(const OSDMap::Incremental &inc) +{ + if (!blocklist_events_enabled) { + return; + } + + for (const auto &i : inc.new_blocklist) { + blocklist_events.insert(i.first); + } +} + +void Objecter::emit_blocklist_events(const OSDMap &old_osd_map, + const OSDMap &new_osd_map) +{ + if (!blocklist_events_enabled) { + return; + } + + std::set<entity_addr_t> old_set; + std::set<entity_addr_t> new_set; + std::set<entity_addr_t> old_range_set; + std::set<entity_addr_t> new_range_set; + + old_osd_map.get_blocklist(&old_set, &old_range_set); + new_osd_map.get_blocklist(&new_set, &new_range_set); + + std::set<entity_addr_t> delta_set; + std::set_difference( + new_set.begin(), new_set.end(), old_set.begin(), old_set.end(), + std::inserter(delta_set, delta_set.begin())); + std::set_difference( + new_range_set.begin(), new_range_set.end(), + old_range_set.begin(), old_range_set.end(), + std::inserter(delta_set, delta_set.begin())); + blocklist_events.insert(delta_set.begin(), delta_set.end()); +} + +// op pool check + +void Objecter::CB_Op_Map_Latest::operator()(bs::error_code e, + version_t latest, version_t) +{ + if (e == bs::errc::resource_unavailable_try_again || + e == bs::errc::operation_canceled) + return; + + lgeneric_subdout(objecter->cct, objecter, 10) + << "op_map_latest r=" << e << " tid=" << tid + << " latest " << latest << dendl; + + unique_lock wl(objecter->rwlock); + + auto iter = objecter->check_latest_map_ops.find(tid); + if (iter == objecter->check_latest_map_ops.end()) { + lgeneric_subdout(objecter->cct, objecter, 10) + << "op_map_latest op "<< tid << " not found" << dendl; + return; + } + + Op *op = iter->second; + objecter->check_latest_map_ops.erase(iter); + + lgeneric_subdout(objecter->cct, objecter, 20) + << "op_map_latest op "<< op << dendl; + + if (op->map_dne_bound == 0) + op->map_dne_bound = latest; + + unique_lock sl(op->session->lock, defer_lock); + objecter->_check_op_pool_dne(op, &sl); + + op->put(); +} + +int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, + snapid_t *snap) const +{ + shared_lock rl(rwlock); + + auto& pools = osdmap->get_pools(); + auto iter = pools.find(poolid); + if (iter == pools.end()) { + return -ENOENT; + } + const pg_pool_t& pg_pool = iter->second; + for (auto p = pg_pool.snaps.begin(); + p != pg_pool.snaps.end(); + ++p) { + if (p->second.name == snap_name) { + *snap = p->first; + return 0; + } + } + return -ENOENT; +} + +int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, + pool_snap_info_t *info) const +{ + shared_lock rl(rwlock); + + auto& pools = osdmap->get_pools(); + auto iter = pools.find(poolid); + if (iter == pools.end()) { + return -ENOENT; + } + const pg_pool_t& pg_pool = iter->second; + auto p = pg_pool.snaps.find(snap); + if (p == pg_pool.snaps.end()) + return -ENOENT; + *info = p->second; + + return 0; +} + +int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps) +{ + shared_lock rl(rwlock); + + const pg_pool_t *pi = osdmap->get_pg_pool(poolid); + if (!pi) + return -ENOENT; + for (auto p = pi->snaps.begin(); + p != pi->snaps.end(); + ++p) { + snaps->push_back(p->first); + } + return 0; +} + +// sl may be unlocked. +void Objecter::_check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl) +{ + // rwlock is locked unique + + if (op->target.pool_ever_existed) { + // the pool previously existed and now it does not, which means it + // was deleted. + op->map_dne_bound = osdmap->get_epoch(); + ldout(cct, 10) << "check_op_pool_dne tid " << op->tid + << " pool previously exists but now does not" + << dendl; + } else { + ldout(cct, 10) << "check_op_pool_dne tid " << op->tid + << " current " << osdmap->get_epoch() + << " map_dne_bound " << op->map_dne_bound + << dendl; + } + if (op->map_dne_bound > 0) { + if (osdmap->get_epoch() >= op->map_dne_bound) { + // we had a new enough map + ldout(cct, 10) << "check_op_pool_dne tid " << op->tid + << " concluding pool " << op->target.base_pgid.pool() + << " dne" << dendl; + if (op->has_completion()) { + num_in_flight--; + op->complete(osdc_errc::pool_dne, -ENOENT); + } + + OSDSession *s = op->session; + if (s) { + ceph_assert(s != NULL); + ceph_assert(sl->mutex() == &s->lock); + bool session_locked = sl->owns_lock(); + if (!session_locked) { + sl->lock(); + } + _finish_op(op, 0); + if (!session_locked) { + sl->unlock(); + } + } else { + _finish_op(op, 0); // no session + } + } + } else { + _send_op_map_check(op); + } +} + +void Objecter::_send_op_map_check(Op *op) +{ + // rwlock is locked unique + // ask the monitor + if (check_latest_map_ops.count(op->tid) == 0) { + op->get(); + check_latest_map_ops[op->tid] = op; + monc->get_version("osdmap", CB_Op_Map_Latest(this, op->tid)); + } +} + +void Objecter::_op_cancel_map_check(Op *op) +{ + // rwlock is locked unique + auto iter = check_latest_map_ops.find(op->tid); + if (iter != check_latest_map_ops.end()) { + Op *op = iter->second; + op->put(); + check_latest_map_ops.erase(iter); + } +} + +// linger pool check + +void Objecter::CB_Linger_Map_Latest::operator()(bs::error_code e, + version_t latest, + version_t) +{ + if (e == bs::errc::resource_unavailable_try_again || + e == bs::errc::operation_canceled) { + // ignore callback; we will retry in resend_mon_ops() + return; + } + + unique_lock wl(objecter->rwlock); + + auto iter = objecter->check_latest_map_lingers.find(linger_id); + if (iter == objecter->check_latest_map_lingers.end()) { + return; + } + + auto op = iter->second; + objecter->check_latest_map_lingers.erase(iter); + + if (op->map_dne_bound == 0) + op->map_dne_bound = latest; + + bool unregister; + objecter->_check_linger_pool_dne(op, &unregister); + + if (unregister) { + objecter->_linger_cancel(op); + } + + op->put(); +} + +void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister) +{ + // rwlock is locked unique + + *need_unregister = false; + + if (op->register_gen > 0) { + ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id + << " pool previously existed but now does not" + << dendl; + op->map_dne_bound = osdmap->get_epoch(); + } else { + ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id + << " current " << osdmap->get_epoch() + << " map_dne_bound " << op->map_dne_bound + << dendl; + } + if (op->map_dne_bound > 0) { + if (osdmap->get_epoch() >= op->map_dne_bound) { + std::unique_lock wl{op->watch_lock}; + if (op->on_reg_commit) { + op->on_reg_commit->defer(std::move(op->on_reg_commit), + osdc_errc::pool_dne, cb::list{}); + op->on_reg_commit = nullptr; + } + if (op->on_notify_finish) { + op->on_notify_finish->defer(std::move(op->on_notify_finish), + osdc_errc::pool_dne, cb::list{}); + op->on_notify_finish = nullptr; + } + *need_unregister = true; + } + } else { + _send_linger_map_check(op); + } +} + +void Objecter::_send_linger_map_check(LingerOp *op) +{ + // ask the monitor + if (check_latest_map_lingers.count(op->linger_id) == 0) { + op->get(); + check_latest_map_lingers[op->linger_id] = op; + monc->get_version("osdmap", CB_Linger_Map_Latest(this, op->linger_id)); + } +} + +void Objecter::_linger_cancel_map_check(LingerOp *op) +{ + // rwlock is locked unique + + auto iter = check_latest_map_lingers.find(op->linger_id); + if (iter != check_latest_map_lingers.end()) { + LingerOp *op = iter->second; + op->put(); + check_latest_map_lingers.erase(iter); + } +} + +// command pool check + +void Objecter::CB_Command_Map_Latest::operator()(bs::error_code e, + version_t latest, version_t) +{ + if (e == bs::errc::resource_unavailable_try_again || + e == bs::errc::operation_canceled) { + // ignore callback; we will retry in resend_mon_ops() + return; + } + + unique_lock wl(objecter->rwlock); + + auto iter = objecter->check_latest_map_commands.find(tid); + if (iter == objecter->check_latest_map_commands.end()) { + return; + } + + auto c = iter->second; + objecter->check_latest_map_commands.erase(iter); + + if (c->map_dne_bound == 0) + c->map_dne_bound = latest; + + unique_lock sul(c->session->lock); + objecter->_check_command_map_dne(c); + sul.unlock(); + + c->put(); +} + +void Objecter::_check_command_map_dne(CommandOp *c) +{ + // rwlock is locked unique + // session is locked unique + + ldout(cct, 10) << "_check_command_map_dne tid " << c->tid + << " current " << osdmap->get_epoch() + << " map_dne_bound " << c->map_dne_bound + << dendl; + if (c->map_dne_bound > 0) { + if (osdmap->get_epoch() >= c->map_dne_bound) { + _finish_command(c, osdcode(c->map_check_error), + std::move(c->map_check_error_str), {}); + } + } else { + _send_command_map_check(c); + } +} + +void Objecter::_send_command_map_check(CommandOp *c) +{ + // rwlock is locked unique + // session is locked unique + + // ask the monitor + if (check_latest_map_commands.count(c->tid) == 0) { + c->get(); + check_latest_map_commands[c->tid] = c; + monc->get_version("osdmap", CB_Command_Map_Latest(this, c->tid)); + } +} + +void Objecter::_command_cancel_map_check(CommandOp *c) +{ + // rwlock is locked uniqe + + auto iter = check_latest_map_commands.find(c->tid); + if (iter != check_latest_map_commands.end()) { + auto c = iter->second; + c->put(); + check_latest_map_commands.erase(iter); + } +} + + +/** + * Look up OSDSession by OSD id. + * + * @returns 0 on success, or -EAGAIN if the lock context requires + * promotion to write. + */ +int Objecter::_get_session(int osd, OSDSession **session, + shunique_lock<ceph::shared_mutex>& sul) +{ + ceph_assert(sul && sul.mutex() == &rwlock); + + if (osd < 0) { + *session = homeless_session; + ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless" + << dendl; + return 0; + } + + auto p = osd_sessions.find(osd); + if (p != osd_sessions.end()) { + auto s = p->second; + s->get(); + *session = s; + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " + << s->get_nref() << dendl; + return 0; + } + if (!sul.owns_lock()) { + return -EAGAIN; + } + auto s = new OSDSession(cct, osd); + osd_sessions[osd] = s; + s->con = messenger->connect_to_osd(osdmap->get_addrs(osd)); + s->con->set_priv(RefCountedPtr{s}); + logger->inc(l_osdc_osd_session_open); + logger->set(l_osdc_osd_sessions, osd_sessions.size()); + s->get(); + *session = s; + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " + << s->get_nref() << dendl; + return 0; +} + +void Objecter::put_session(Objecter::OSDSession *s) +{ + if (s && !s->is_homeless()) { + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " + << s->get_nref() << dendl; + s->put(); + } +} + +void Objecter::get_session(Objecter::OSDSession *s) +{ + ceph_assert(s != NULL); + + if (!s->is_homeless()) { + ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " + << s->get_nref() << dendl; + s->get(); + } +} + +void Objecter::_reopen_session(OSDSession *s) +{ + // rwlock is locked unique + // s->lock is locked + + auto addrs = osdmap->get_addrs(s->osd); + ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " + << addrs << dendl; + if (s->con) { + s->con->set_priv(NULL); + s->con->mark_down(); + logger->inc(l_osdc_osd_session_close); + } + s->con = messenger->connect_to_osd(addrs); + s->con->set_priv(RefCountedPtr{s}); + s->incarnation++; + logger->inc(l_osdc_osd_session_open); +} + +void Objecter::close_session(OSDSession *s) +{ + // rwlock is locked unique + + ldout(cct, 10) << "close_session for osd." << s->osd << dendl; + if (s->con) { + s->con->set_priv(NULL); + s->con->mark_down(); + logger->inc(l_osdc_osd_session_close); + } + unique_lock sl(s->lock); + + std::list<LingerOp*> homeless_lingers; + std::list<CommandOp*> homeless_commands; + std::list<Op*> homeless_ops; + + while (!s->linger_ops.empty()) { + auto i = s->linger_ops.begin(); + ldout(cct, 10) << " linger_op " << i->first << dendl; + homeless_lingers.push_back(i->second); + _session_linger_op_remove(s, i->second); + } + + while (!s->ops.empty()) { + auto i = s->ops.begin(); + ldout(cct, 10) << " op " << i->first << dendl; + homeless_ops.push_back(i->second); + _session_op_remove(s, i->second); + } + + while (!s->command_ops.empty()) { + auto i = s->command_ops.begin(); + ldout(cct, 10) << " command_op " << i->first << dendl; + homeless_commands.push_back(i->second); + _session_command_op_remove(s, i->second); + } + + osd_sessions.erase(s->osd); + sl.unlock(); + put_session(s); + + // Assign any leftover ops to the homeless session + { + unique_lock hsl(homeless_session->lock); + for (auto i = homeless_lingers.begin(); + i != homeless_lingers.end(); ++i) { + _session_linger_op_assign(homeless_session, *i); + } + for (auto i = homeless_ops.begin(); + i != homeless_ops.end(); ++i) { + _session_op_assign(homeless_session, *i); + } + for (auto i = homeless_commands.begin(); + i != homeless_commands.end(); ++i) { + _session_command_op_assign(homeless_session, *i); + } + } + + logger->set(l_osdc_osd_sessions, osd_sessions.size()); +} + +void Objecter::wait_for_osd_map(epoch_t e) +{ + unique_lock l(rwlock); + if (osdmap->get_epoch() >= e) { + l.unlock(); + return; + } + + ca::waiter<bs::error_code> w; + waiting_for_map[e].emplace_back(OpCompletion::create( + service.get_executor(), + w.ref()), + bs::error_code{}); + l.unlock(); + w.wait(); +} + +void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, + std::unique_ptr<OpCompletion> fin, + std::unique_lock<ceph::shared_mutex>&& l) +{ + ceph_assert(fin); + if (osdmap->get_epoch() >= newest) { + ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl; + l.unlock(); + ca::defer(std::move(fin), bs::error_code{}); + } else { + ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl; + _wait_for_new_map(std::move(fin), newest, bs::error_code{}); + l.unlock(); + } +} + +void Objecter::maybe_request_map() +{ + shared_lock rl(rwlock); + _maybe_request_map(); +} + +void Objecter::_maybe_request_map() +{ + // rwlock is locked + int flag = 0; + if (_osdmap_full_flag() + || osdmap->test_flag(CEPH_OSDMAP_PAUSERD) + || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) { + ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next " + "osd map (FULL flag is set)" << dendl; + } else { + ldout(cct, 10) + << "_maybe_request_map subscribing (onetime) to next osd map" << dendl; + flag = CEPH_SUBSCRIBE_ONETIME; + } + epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0; + if (monc->sub_want("osdmap", epoch, flag)) { + monc->renew_subs(); + } +} + +void Objecter::_wait_for_new_map(std::unique_ptr<OpCompletion> c, epoch_t epoch, + bs::error_code ec) +{ + // rwlock is locked unique + waiting_for_map[epoch].emplace_back(std::move(c), ec); + _maybe_request_map(); +} + + +/** + * Use this together with wait_for_map: this is a pre-check to avoid + * allocating a Context for wait_for_map if we can see that we + * definitely already have the epoch. + * + * This does *not* replace the need to handle the return value of + * wait_for_map: just because we don't have it in this pre-check + * doesn't mean we won't have it when calling back into wait_for_map, + * since the objecter lock is dropped in between. + */ +bool Objecter::have_map(const epoch_t epoch) +{ + shared_lock rl(rwlock); + if (osdmap->get_epoch() >= epoch) { + return true; + } else { + return false; + } +} + +void Objecter::_kick_requests(OSDSession *session, + map<uint64_t, LingerOp *>& lresend) +{ + // rwlock is locked unique + + // clear backoffs + session->backoffs.clear(); + session->backoffs_by_id.clear(); + + // resend ops + map<ceph_tid_t,Op*> resend; // resend in tid order + for (auto p = session->ops.begin(); p != session->ops.end();) { + Op *op = p->second; + ++p; + if (op->should_resend) { + if (!op->target.paused) + resend[op->tid] = op; + } else { + _op_cancel_map_check(op); + _cancel_linger_op(op); + } + } + + logger->inc(l_osdc_op_resend, resend.size()); + while (!resend.empty()) { + _send_op(resend.begin()->second); + resend.erase(resend.begin()); + } + + // resend lingers + logger->inc(l_osdc_linger_resend, session->linger_ops.size()); + for (auto j = session->linger_ops.begin(); + j != session->linger_ops.end(); ++j) { + LingerOp *op = j->second; + op->get(); + ceph_assert(lresend.count(j->first) == 0); + lresend[j->first] = op; + } + + // resend commands + logger->inc(l_osdc_command_resend, session->command_ops.size()); + map<uint64_t,CommandOp*> cresend; // resend in order + for (auto k = session->command_ops.begin(); + k != session->command_ops.end(); ++k) { + cresend[k->first] = k->second; + } + while (!cresend.empty()) { + _send_command(cresend.begin()->second); + cresend.erase(cresend.begin()); + } +} + +void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend, + unique_lock<ceph::shared_mutex>& ul) +{ + ceph_assert(ul.owns_lock()); + shunique_lock sul(std::move(ul)); + while (!lresend.empty()) { + LingerOp *op = lresend.begin()->second; + if (!op->canceled) { + _send_linger(op, sul); + } + op->put(); + lresend.erase(lresend.begin()); + } + ul = sul.release_to_unique(); +} + +void Objecter::start_tick() +{ + ceph_assert(tick_event == 0); + tick_event = + timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval), + &Objecter::tick, this); +} + +void Objecter::tick() +{ + shared_lock rl(rwlock); + + ldout(cct, 10) << "tick" << dendl; + + // we are only called by C_Tick + tick_event = 0; + + if (!initialized) { + // we raced with shutdown + ldout(cct, 10) << __func__ << " raced with shutdown" << dendl; + return; + } + + set<OSDSession*> toping; + + + // look for laggy requests + auto cutoff = ceph::coarse_mono_clock::now(); + cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout + + unsigned laggy_ops = 0; + + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + auto s = siter->second; + scoped_lock l(s->lock); + bool found = false; + for (auto p = s->ops.begin(); p != s->ops.end(); ++p) { + auto op = p->second; + ceph_assert(op->session); + if (op->stamp < cutoff) { + ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd + << " is laggy" << dendl; + found = true; + ++laggy_ops; + } + } + for (auto p = s->linger_ops.begin(); + p != s->linger_ops.end(); + ++p) { + auto op = p->second; + std::unique_lock wl(op->watch_lock); + ceph_assert(op->session); + ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first + << " (osd." << op->session->osd << ")" << dendl; + found = true; + if (op->is_watch && op->registered && !op->last_error) + _send_linger_ping(op); + } + for (auto p = s->command_ops.begin(); + p != s->command_ops.end(); + ++p) { + auto op = p->second; + ceph_assert(op->session); + ldout(cct, 10) << " pinging osd that serves command tid " << p->first + << " (osd." << op->session->osd << ")" << dendl; + found = true; + } + if (found) + toping.insert(s); + } + if (num_homeless_ops || !toping.empty()) { + _maybe_request_map(); + } + + logger->set(l_osdc_op_laggy, laggy_ops); + logger->set(l_osdc_osd_laggy, toping.size()); + + if (!toping.empty()) { + // send a ping to these osds, to ensure we detect any session resets + // (osd reply message policy is lossy) + for (auto i = toping.begin(); i != toping.end(); ++i) { + (*i)->con->send_message(new MPing); + } + } + + // Make sure we don't reschedule if we wake up after shutdown + if (initialized) { + tick_event = timer.reschedule_me(ceph::make_timespan( + cct->_conf->objecter_tick_interval)); + } +} + +void Objecter::resend_mon_ops() +{ + unique_lock wl(rwlock); + + ldout(cct, 10) << "resend_mon_ops" << dendl; + + for (auto p = poolstat_ops.begin(); p != poolstat_ops.end(); ++p) { + _poolstat_submit(p->second); + logger->inc(l_osdc_poolstat_resend); + } + + for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) { + _fs_stats_submit(p->second); + logger->inc(l_osdc_statfs_resend); + } + + for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) { + _pool_op_submit(p->second); + logger->inc(l_osdc_poolop_resend); + } + + for (auto p = check_latest_map_ops.begin(); + p != check_latest_map_ops.end(); + ++p) { + monc->get_version("osdmap", CB_Op_Map_Latest(this, p->second->tid)); + } + + for (auto p = check_latest_map_lingers.begin(); + p != check_latest_map_lingers.end(); + ++p) { + monc->get_version("osdmap", CB_Linger_Map_Latest(this, p->second->linger_id)); + } + + for (auto p = check_latest_map_commands.begin(); + p != check_latest_map_commands.end(); + ++p) { + monc->get_version("osdmap", CB_Command_Map_Latest(this, p->second->tid)); + } +} + +// read | write --------------------------- + +void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget) +{ + shunique_lock rl(rwlock, ceph::acquire_shared); + ceph_tid_t tid = 0; + if (!ptid) + ptid = &tid; + op->trace.event("op submit"); + _op_submit_with_budget(op, rl, ptid, ctx_budget); +} + +void Objecter::_op_submit_with_budget(Op *op, + shunique_lock<ceph::shared_mutex>& sul, + ceph_tid_t *ptid, + int *ctx_budget) +{ + ceph_assert(initialized); + + ceph_assert(op->ops.size() == op->out_bl.size()); + ceph_assert(op->ops.size() == op->out_rval.size()); + ceph_assert(op->ops.size() == op->out_handler.size()); + + // throttle. before we look at any state, because + // _take_op_budget() may drop our lock while it blocks. + if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) { + int op_budget = _take_op_budget(op, sul); + // take and pass out the budget for the first OP + // in the context session + if (ctx_budget && (*ctx_budget == -1)) { + *ctx_budget = op_budget; + } + } + + if (osd_timeout > timespan(0)) { + if (op->tid == 0) + op->tid = ++last_tid; + auto tid = op->tid; + op->ontimeout = timer.add_event(osd_timeout, + [this, tid]() { + op_cancel(tid, -ETIMEDOUT); }); + } + + _op_submit(op, sul, ptid); +} + +void Objecter::_send_op_account(Op *op) +{ + inflight_ops++; + + // add to gather set(s) + if (op->has_completion()) { + num_in_flight++; + } else { + ldout(cct, 20) << " note: not requesting reply" << dendl; + } + + logger->inc(l_osdc_op_active); + logger->inc(l_osdc_op); + logger->inc(l_osdc_oplen_avg, op->ops.size()); + + if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) == + (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) + logger->inc(l_osdc_op_rmw); + else if (op->target.flags & CEPH_OSD_FLAG_WRITE) + logger->inc(l_osdc_op_w); + else if (op->target.flags & CEPH_OSD_FLAG_READ) + logger->inc(l_osdc_op_r); + + if (op->target.flags & CEPH_OSD_FLAG_PGOP) + logger->inc(l_osdc_op_pg); + + for (auto p = op->ops.begin(); p != op->ops.end(); ++p) { + int code = l_osdc_osdop_other; + switch (p->op.op) { + case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break; + case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break; + case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break; + case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break; + case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break; + case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break; + case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break; + case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break; + case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break; + case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break; + case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break; + case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break; + case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break; + case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break; + case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break; + case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break; + case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break; + + // OMAP read operations + case CEPH_OSD_OP_OMAPGETVALS: + case CEPH_OSD_OP_OMAPGETKEYS: + case CEPH_OSD_OP_OMAPGETHEADER: + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: + case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break; + + // OMAP write operations + case CEPH_OSD_OP_OMAPSETVALS: + case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break; + + // OMAP del operations + case CEPH_OSD_OP_OMAPCLEAR: + case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break; + + case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break; + case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break; + case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break; + } + if (code) + logger->inc(code); + } +} + +void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid) +{ + // rwlock is locked + + ldout(cct, 10) << __func__ << " op " << op << dendl; + + // pick target + ceph_assert(op->session == NULL); + OSDSession *s = NULL; + + bool check_for_latest_map = _calc_target(&op->target, nullptr) + == RECALC_OP_TARGET_POOL_DNE; + + // Try to get a session, including a retry if we need to take write lock + int r = _get_session(op->target.osd, &s, sul); + if (r == -EAGAIN || + (check_for_latest_map && sul.owns_lock_shared()) || + cct->_conf->objecter_debug_inject_relock_delay) { + epoch_t orig_epoch = osdmap->get_epoch(); + sul.unlock(); + if (cct->_conf->objecter_debug_inject_relock_delay) { + sleep(1); + } + sul.lock(); + if (orig_epoch != osdmap->get_epoch()) { + // map changed; recalculate mapping + ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target" + << dendl; + check_for_latest_map = _calc_target(&op->target, nullptr) + == RECALC_OP_TARGET_POOL_DNE; + if (s) { + put_session(s); + s = NULL; + r = -EAGAIN; + } + } + } + if (r == -EAGAIN) { + ceph_assert(s == NULL); + r = _get_session(op->target.osd, &s, sul); + } + ceph_assert(r == 0); + ceph_assert(s); // may be homeless + + _send_op_account(op); + + // send? + + ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); + + bool need_send = false; + if (op->target.paused) { + ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused" + << dendl; + _maybe_request_map(); + } else if (!s->is_homeless()) { + need_send = true; + } else { + _maybe_request_map(); + } + + unique_lock sl(s->lock); + if (op->tid == 0) + op->tid = ++last_tid; + + ldout(cct, 10) << "_op_submit oid " << op->target.base_oid + << " '" << op->target.base_oloc << "' '" + << op->target.target_oloc << "' " << op->ops << " tid " + << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1) + << dendl; + + _session_op_assign(s, op); + + if (need_send) { + _send_op(op); + } + + // Last chance to touch Op here, after giving up session lock it can + // be freed at any time by response handler. + ceph_tid_t tid = op->tid; + if (check_for_latest_map) { + _send_op_map_check(op); + } + if (ptid) + *ptid = tid; + op = NULL; + + sl.unlock(); + put_session(s); + + ldout(cct, 5) << num_in_flight << " in flight" << dendl; +} + +int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) +{ + ceph_assert(initialized); + + unique_lock sl(s->lock); + + auto p = s->ops.find(tid); + if (p == s->ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne in session " + << s->osd << dendl; + return -ENOENT; + } + +#if 0 + if (s->con) { + ldout(cct, 20) << " revoking rx ceph::buffer for " << tid + << " on " << s->con << dendl; + s->con->revoke_rx_buffer(tid); + } +#endif + + ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd + << dendl; + Op *op = p->second; + if (op->has_completion()) { + num_in_flight--; + op->complete(osdcode(r), r); + } + _op_cancel_map_check(op); + _finish_op(op, r); + sl.unlock(); + + return 0; +} + +int Objecter::op_cancel(ceph_tid_t tid, int r) +{ + int ret = 0; + + unique_lock wl(rwlock); + ret = _op_cancel(tid, r); + + return ret; +} + +int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r) +{ + unique_lock wl(rwlock); + ldout(cct,10) << __func__ << " " << tids << dendl; + for (auto tid : tids) { + _op_cancel(tid, r); + } + return 0; +} + +int Objecter::_op_cancel(ceph_tid_t tid, int r) +{ + int ret = 0; + + ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r + << dendl; + +start: + + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + OSDSession *s = siter->second; + shared_lock sl(s->lock); + if (s->ops.find(tid) != s->ops.end()) { + sl.unlock(); + ret = op_cancel(s, tid, r); + if (ret == -ENOENT) { + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; + } + return ret; + } + } + + ldout(cct, 5) << __func__ << ": tid " << tid + << " not found in live sessions" << dendl; + + // Handle case where the op is in homeless session + shared_lock sl(homeless_session->lock); + if (homeless_session->ops.find(tid) != homeless_session->ops.end()) { + sl.unlock(); + ret = op_cancel(homeless_session, tid, r); + if (ret == -ENOENT) { + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; + } else { + return ret; + } + } else { + sl.unlock(); + } + + ldout(cct, 5) << __func__ << ": tid " << tid + << " not found in homeless session" << dendl; + + return ret; +} + + +epoch_t Objecter::op_cancel_writes(int r, int64_t pool) +{ + unique_lock wl(rwlock); + + std::vector<ceph_tid_t> to_cancel; + bool found = false; + + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + OSDSession *s = siter->second; + shared_lock sl(s->lock); + for (auto op_i = s->ops.begin(); + op_i != s->ops.end(); ++op_i) { + if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE + && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) { + to_cancel.push_back(op_i->first); + } + } + sl.unlock(); + + for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) { + int cancel_result = op_cancel(s, *titer, r); + // We hold rwlock across search and cancellation, so cancels + // should always succeed + ceph_assert(cancel_result == 0); + } + if (!found && to_cancel.size()) + found = true; + to_cancel.clear(); + } + + const epoch_t epoch = osdmap->get_epoch(); + + wl.unlock(); + + if (found) { + return epoch; + } else { + return -1; + } +} + +bool Objecter::is_pg_changed( + int oldprimary, + const vector<int>& oldacting, + int newprimary, + const vector<int>& newacting, + bool any_change) +{ + if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213 + oldprimary, + oldacting, + newprimary, + newacting)) + return true; + if (any_change && oldacting != newacting) + return true; + return false; // same primary (tho replicas may have changed) +} + +bool Objecter::target_should_be_paused(op_target_t *t) +{ + const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool); + bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); + bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || + (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi))); + + return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || + (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) || + (osdmap->get_epoch() < epoch_barrier); +} + +/** + * Locking public accessor for _osdmap_full_flag + */ +bool Objecter::osdmap_full_flag() const +{ + shared_lock rl(rwlock); + + return _osdmap_full_flag(); +} + +bool Objecter::osdmap_pool_full(const int64_t pool_id) const +{ + shared_lock rl(rwlock); + + if (_osdmap_full_flag()) { + return true; + } + + return _osdmap_pool_full(pool_id); +} + +bool Objecter::_osdmap_pool_full(const int64_t pool_id) const +{ + const pg_pool_t *pool = osdmap->get_pg_pool(pool_id); + if (pool == NULL) { + ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl; + return false; + } + + return _osdmap_pool_full(*pool); +} + +bool Objecter::_osdmap_has_pool_full() const +{ + for (auto it = osdmap->get_pools().begin(); + it != osdmap->get_pools().end(); ++it) { + if (_osdmap_pool_full(it->second)) + return true; + } + return false; +} + +/** + * Wrapper around osdmap->test_flag for special handling of the FULL flag. + */ +bool Objecter::_osdmap_full_flag() const +{ + // Ignore the FULL flag if the caller does not have honor_osdmap_full + return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full; +} + +void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map) +{ + for (map<int64_t, pg_pool_t>::const_iterator it + = osdmap->get_pools().begin(); + it != osdmap->get_pools().end(); ++it) { + if (pool_full_map.find(it->first) == pool_full_map.end()) { + pool_full_map[it->first] = _osdmap_pool_full(it->second); + } else { + pool_full_map[it->first] = _osdmap_pool_full(it->second) || + pool_full_map[it->first]; + } + } +} + +int64_t Objecter::get_object_hash_position(int64_t pool, const string& key, + const string& ns) +{ + shared_lock rl(rwlock); + const pg_pool_t *p = osdmap->get_pg_pool(pool); + if (!p) + return -ENOENT; + return p->hash_key(key, ns); +} + +int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key, + const string& ns) +{ + shared_lock rl(rwlock); + const pg_pool_t *p = osdmap->get_pg_pool(pool); + if (!p) + return -ENOENT; + return p->raw_hash_to_pg(p->hash_key(key, ns)); +} + +void Objecter::_prune_snapc( + const mempool::osdmap::map<int64_t, + snap_interval_set_t>& new_removed_snaps, + Op *op) +{ + bool match = false; + auto i = new_removed_snaps.find(op->target.base_pgid.pool()); + if (i != new_removed_snaps.end()) { + for (auto s : op->snapc.snaps) { + if (i->second.contains(s)) { + match = true; + break; + } + } + if (match) { + vector<snapid_t> new_snaps; + for (auto s : op->snapc.snaps) { + if (!i->second.contains(s)) { + new_snaps.push_back(s); + } + } + op->snapc.snaps.swap(new_snaps); + ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc + << " (was " << new_snaps << ")" << dendl; + } + } +} + +int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change) +{ + // rwlock is locked + bool is_read = t->flags & CEPH_OSD_FLAG_READ; + bool is_write = t->flags & CEPH_OSD_FLAG_WRITE; + t->epoch = osdmap->get_epoch(); + ldout(cct,20) << __func__ << " epoch " << t->epoch + << " base " << t->base_oid << " " << t->base_oloc + << " precalc_pgid " << (int)t->precalc_pgid + << " pgid " << t->base_pgid + << (is_read ? " is_read" : "") + << (is_write ? " is_write" : "") + << dendl; + + const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool); + if (!pi) { + t->osd = -1; + return RECALC_OP_TARGET_POOL_DNE; + } + ldout(cct,30) << __func__ << " base pi " << pi + << " pg_num " << pi->get_pg_num() << dendl; + + bool force_resend = false; + if (osdmap->get_epoch() == pi->last_force_op_resend) { + if (t->last_force_resend < pi->last_force_op_resend) { + t->last_force_resend = pi->last_force_op_resend; + force_resend = true; + } else if (t->last_force_resend == 0) { + force_resend = true; + } + } + + // apply tiering + t->target_oid = t->base_oid; + t->target_oloc = t->base_oloc; + if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + if (is_read && pi->has_read_tier()) + t->target_oloc.pool = pi->read_tier; + if (is_write && pi->has_write_tier()) + t->target_oloc.pool = pi->write_tier; + pi = osdmap->get_pg_pool(t->target_oloc.pool); + if (!pi) { + t->osd = -1; + return RECALC_OP_TARGET_POOL_DNE; + } + } + + pg_t pgid; + if (t->precalc_pgid) { + ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY); + ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op + ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool()); + pgid = t->base_pgid; + } else { + int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc, + pgid); + if (ret == -ENOENT) { + t->osd = -1; + return RECALC_OP_TARGET_POOL_DNE; + } + } + ldout(cct,20) << __func__ << " target " << t->target_oid << " " + << t->target_oloc << " -> pgid " << pgid << dendl; + ldout(cct,30) << __func__ << " target pi " << pi + << " pg_num " << pi->get_pg_num() << dendl; + t->pool_ever_existed = true; + + int size = pi->size; + int min_size = pi->min_size; + unsigned pg_num = pi->get_pg_num(); + unsigned pg_num_mask = pi->get_pg_num_mask(); + unsigned pg_num_pending = pi->get_pg_num_pending(); + int up_primary, acting_primary; + vector<int> up, acting; + ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask); + pg_t actual_pgid(actual_ps, pgid.pool()); + pg_mapping_t pg_mapping; + pg_mapping.epoch = osdmap->get_epoch(); + if (lookup_pg_mapping(actual_pgid, &pg_mapping)) { + up = pg_mapping.up; + up_primary = pg_mapping.up_primary; + acting = pg_mapping.acting; + acting_primary = pg_mapping.acting_primary; + } else { + osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary, + &acting, &acting_primary); + pg_mapping_t pg_mapping(osdmap->get_epoch(), + up, up_primary, acting, acting_primary); + update_pg_mapping(actual_pgid, std::move(pg_mapping)); + } + bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); + bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES); + unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask); + pg_t prev_pgid(prev_seed, pgid.pool()); + if (any_change && PastIntervals::is_new_interval( + t->acting_primary, + acting_primary, + t->acting, + acting, + t->up_primary, + up_primary, + t->up, + up, + t->size, + size, + t->min_size, + min_size, + t->pg_num, + pg_num, + t->pg_num_pending, + pg_num_pending, + t->sort_bitwise, + sort_bitwise, + t->recovery_deletes, + recovery_deletes, + t->peering_crush_bucket_count, + pi->peering_crush_bucket_count, + t->peering_crush_bucket_target, + pi->peering_crush_bucket_target, + t->peering_crush_bucket_barrier, + pi->peering_crush_bucket_barrier, + t->peering_crush_mandatory_member, + pi->peering_crush_mandatory_member, + prev_pgid)) { + force_resend = true; + } + + bool unpaused = false; + bool should_be_paused = target_should_be_paused(t); + if (t->paused && !should_be_paused) { + unpaused = true; + } + if (t->paused != should_be_paused) { + ldout(cct, 10) << __func__ << " paused " << t->paused + << " -> " << should_be_paused << dendl; + t->paused = should_be_paused; + } + + bool legacy_change = + t->pgid != pgid || + is_pg_changed( + t->acting_primary, t->acting, acting_primary, acting, + t->used_replica || any_change); + bool split_or_merge = false; + if (t->pg_num) { + split_or_merge = + prev_pgid.is_split(t->pg_num, pg_num, nullptr) || + prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) || + prev_pgid.is_merge_target(t->pg_num, pg_num); + } + + if (legacy_change || split_or_merge || force_resend) { + t->pgid = pgid; + t->acting = acting; + t->acting_primary = acting_primary; + t->up_primary = up_primary; + t->up = up; + t->size = size; + t->min_size = min_size; + t->pg_num = pg_num; + t->pg_num_mask = pg_num_mask; + t->pg_num_pending = pg_num_pending; + spg_t spgid(actual_pgid); + if (pi->is_erasure()) { + for (uint8_t i = 0; i < acting.size(); ++i) { + if (acting[i] == acting_primary) { + spgid.reset_shard(shard_id_t(i)); + break; + } + } + } + t->actual_pgid = spgid; + t->sort_bitwise = sort_bitwise; + t->recovery_deletes = recovery_deletes; + t->peering_crush_bucket_count = pi->peering_crush_bucket_count; + t->peering_crush_bucket_target = pi->peering_crush_bucket_target; + t->peering_crush_bucket_barrier = pi->peering_crush_bucket_barrier; + t->peering_crush_mandatory_member = pi->peering_crush_mandatory_member; + ldout(cct, 10) << __func__ << " " + << " raw pgid " << pgid << " -> actual " << t->actual_pgid + << " acting " << acting + << " primary " << acting_primary << dendl; + t->used_replica = false; + if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS | + CEPH_OSD_FLAG_LOCALIZE_READS)) && + !is_write && pi->is_replicated() && acting.size() > 1) { + int osd; + ceph_assert(is_read && acting[0] == acting_primary); + if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) { + int p = rand() % acting.size(); + if (p) + t->used_replica = true; + osd = acting[p]; + ldout(cct, 10) << " chose random osd." << osd << " of " << acting + << dendl; + } else { + // look for a local replica. prefer the primary if the + // distance is the same. + int best = -1; + int best_locality = 0; + for (unsigned i = 0; i < acting.size(); ++i) { + int locality = osdmap->crush->get_common_ancestor_distance( + cct, acting[i], crush_location); + ldout(cct, 20) << __func__ << " localize: rank " << i + << " osd." << acting[i] + << " locality " << locality << dendl; + if (i == 0 || + (locality >= 0 && best_locality >= 0 && + locality < best_locality) || + (best_locality < 0 && locality >= 0)) { + best = i; + best_locality = locality; + if (i) + t->used_replica = true; + } + } + ceph_assert(best >= 0); + osd = acting[best]; + } + t->osd = osd; + } else { + t->osd = acting_primary; + } + } + if (legacy_change || unpaused || force_resend) { + return RECALC_OP_TARGET_NEED_RESEND; + } + if (split_or_merge && + (osdmap->require_osd_release >= ceph_release_t::luminous || + HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features, + RESEND_ON_SPLIT))) { + return RECALC_OP_TARGET_NEED_RESEND; + } + return RECALC_OP_TARGET_NO_ACTION; +} + +int Objecter::_map_session(op_target_t *target, OSDSession **s, + shunique_lock<ceph::shared_mutex>& sul) +{ + _calc_target(target, nullptr); + return _get_session(target->osd, s, sul); +} + +void Objecter::_session_op_assign(OSDSession *to, Op *op) +{ + // to->lock is locked + ceph_assert(op->session == NULL); + ceph_assert(op->tid); + + get_session(to); + op->session = to; + to->ops[op->tid] = op; + + if (to->is_homeless()) { + num_homeless_ops++; + } + + ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl; +} + +void Objecter::_session_op_remove(OSDSession *from, Op *op) +{ + ceph_assert(op->session == from); + // from->lock is locked + + if (from->is_homeless()) { + num_homeless_ops--; + } + + from->ops.erase(op->tid); + put_session(from); + op->session = NULL; + + ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl; +} + +void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op) +{ + // to lock is locked unique + ceph_assert(op->session == NULL); + + if (to->is_homeless()) { + num_homeless_ops++; + } + + get_session(to); + op->session = to; + to->linger_ops[op->linger_id] = op; + + ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id + << dendl; +} + +void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op) +{ + ceph_assert(from == op->session); + // from->lock is locked unique + + if (from->is_homeless()) { + num_homeless_ops--; + } + + from->linger_ops.erase(op->linger_id); + put_session(from); + op->session = NULL; + + ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id + << dendl; +} + +void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op) +{ + ceph_assert(from == op->session); + // from->lock is locked + + if (from->is_homeless()) { + num_homeless_ops--; + } + + from->command_ops.erase(op->tid); + put_session(from); + op->session = NULL; + + ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl; +} + +void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op) +{ + // to->lock is locked + ceph_assert(op->session == NULL); + ceph_assert(op->tid); + + if (to->is_homeless()) { + num_homeless_ops++; + } + + get_session(to); + op->session = to; + to->command_ops[op->tid] = op; + + ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl; +} + +int Objecter::_recalc_linger_op_target(LingerOp *linger_op, + shunique_lock<ceph::shared_mutex>& sul) +{ + // rwlock is locked unique + + int r = _calc_target(&linger_op->target, nullptr, true); + if (r == RECALC_OP_TARGET_NEED_RESEND) { + ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id + << " pgid " << linger_op->target.pgid + << " acting " << linger_op->target.acting << dendl; + + OSDSession *s = NULL; + r = _get_session(linger_op->target.osd, &s, sul); + ceph_assert(r == 0); + + if (linger_op->session != s) { + // NB locking two sessions (s and linger_op->session) at the + // same time here is only safe because we are the only one that + // takes two, and we are holding rwlock for write. We use + // std::shared_mutex in OSDSession because lockdep doesn't know + // that. + unique_lock sl(s->lock); + _session_linger_op_remove(linger_op->session, linger_op); + _session_linger_op_assign(s, linger_op); + } + + put_session(s); + return RECALC_OP_TARGET_NEED_RESEND; + } + return r; +} + +void Objecter::_cancel_linger_op(Op *op) +{ + ldout(cct, 15) << "cancel_op " << op->tid << dendl; + + ceph_assert(!op->should_resend); + if (op->has_completion()) { + op->onfinish = nullptr; + num_in_flight--; + } + + _finish_op(op, 0); +} + +void Objecter::_finish_op(Op *op, int r) +{ + ldout(cct, 15) << __func__ << " " << op->tid << dendl; + + // op->session->lock is locked unique or op->session is null + + if (!op->ctx_budgeted && op->budget >= 0) { + put_op_budget_bytes(op->budget); + op->budget = -1; + } + + if (op->ontimeout && r != -ETIMEDOUT) + timer.cancel_event(op->ontimeout); + + if (op->session) { + _session_op_remove(op->session, op); + } + + logger->dec(l_osdc_op_active); + + ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end()); + + inflight_ops--; + + op->put(); +} + +Objecter::MOSDOp *Objecter::_prepare_osd_op(Op *op) +{ + // rwlock is locked + + int flags = op->target.flags; + flags |= CEPH_OSD_FLAG_KNOWN_REDIR; + + // Nothing checks this any longer, but needed for compatibility with + // pre-luminous osds + flags |= CEPH_OSD_FLAG_ONDISK; + + if (!honor_pool_full) + flags |= CEPH_OSD_FLAG_FULL_FORCE; + + op->target.paused = false; + op->stamp = ceph::coarse_mono_clock::now(); + + hobject_t hobj = op->target.get_hobj(); + auto m = new MOSDOp(client_inc, op->tid, + hobj, op->target.actual_pgid, + osdmap->get_epoch(), + flags, op->features); + + m->set_snapid(op->snapid); + m->set_snap_seq(op->snapc.seq); + m->set_snaps(op->snapc.snaps); + + m->ops = op->ops; + m->set_mtime(op->mtime); + m->set_retry_attempt(op->attempts++); + + if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) { + op->trace.init("op", &trace_endpoint); + } + + if (op->priority) + m->set_priority(op->priority); + else + m->set_priority(cct->_conf->osd_client_op_priority); + + if (op->reqid != osd_reqid_t()) { + m->set_reqid(op->reqid); + } + + logger->inc(l_osdc_op_send); + ssize_t sum = 0; + for (unsigned i = 0; i < m->ops.size(); i++) { + sum += m->ops[i].indata.length(); + } + logger->inc(l_osdc_op_send_bytes, sum); + + return m; +} + +void Objecter::_send_op(Op *op) +{ + // rwlock is locked + // op->session->lock is locked + + // backoff? + auto p = op->session->backoffs.find(op->target.actual_pgid); + if (p != op->session->backoffs.end()) { + hobject_t hoid = op->target.get_hobj(); + auto q = p->second.lower_bound(hoid); + if (q != p->second.begin()) { + --q; + if (hoid >= q->second.end) { + ++q; + } + } + if (q != p->second.end()) { + ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin + << "," << q->second.end << ")" << dendl; + int r = cmp(hoid, q->second.begin); + if (r == 0 || (r > 0 && hoid < q->second.end)) { + ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid + << " id " << q->second.id << " on " << hoid + << ", queuing " << op << " tid " << op->tid << dendl; + return; + } + } + } + + ceph_assert(op->tid > 0); + MOSDOp *m = _prepare_osd_op(op); + + if (op->target.actual_pgid != m->get_spg()) { + ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from " + << m->get_spg() << " to " << op->target.actual_pgid + << ", updating and reencoding" << dendl; + m->set_spg(op->target.actual_pgid); + m->clear_payload(); // reencode + } + + ldout(cct, 15) << "_send_op " << op->tid << " to " + << op->target.actual_pgid << " on osd." << op->session->osd + << dendl; + + ConnectionRef con = op->session->con; + ceph_assert(con); + +#if 0 + // preallocated rx ceph::buffer? + if (op->con) { + ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on " + << op->con << dendl; + op->con->revoke_rx_buffer(op->tid); + } + if (op->outbl && + op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582 + op->outbl->length()) { + op->outbl->invalidate_crc(); // messenger writes through c_str() + ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con + << dendl; + op->con = con; + op->con->post_rx_buffer(op->tid, *op->outbl); + } +#endif + + op->incarnation = op->session->incarnation; + + if (op->trace.valid()) { + m->trace.init("op msg", nullptr, &op->trace); + } + op->session->con->send_message(m); +} + +int Objecter::calc_op_budget(const bc::small_vector_base<OSDOp>& ops) +{ + int op_budget = 0; + for (auto i = ops.begin(); i != ops.end(); ++i) { + if (i->op.op & CEPH_OSD_OP_MODE_WR) { + op_budget += i->indata.length(); + } else if (ceph_osd_op_mode_read(i->op.op)) { + if (ceph_osd_op_uses_extent(i->op.op)) { + if ((int64_t)i->op.extent.length > 0) + op_budget += (int64_t)i->op.extent.length; + } else if (ceph_osd_op_type_attr(i->op.op)) { + op_budget += i->op.xattr.name_len + i->op.xattr.value_len; + } + } + } + return op_budget; +} + +void Objecter::_throttle_op(Op *op, + shunique_lock<ceph::shared_mutex>& sul, + int op_budget) +{ + ceph_assert(sul && sul.mutex() == &rwlock); + bool locked_for_write = sul.owns_lock(); + + if (!op_budget) + op_budget = calc_op_budget(op->ops); + if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now + sul.unlock(); + op_throttle_bytes.get(op_budget); + if (locked_for_write) + sul.lock(); + else + sul.lock_shared(); + } + if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now + sul.unlock(); + op_throttle_ops.get(1); + if (locked_for_write) + sul.lock(); + else + sul.lock_shared(); + } +} + +int Objecter::take_linger_budget(LingerOp *info) +{ + return 1; +} + +/* This function DOES put the passed message before returning */ +void Objecter::handle_osd_op_reply(MOSDOpReply *m) +{ + ldout(cct, 10) << "in handle_osd_op_reply" << dendl; + + // get pio + ceph_tid_t tid = m->get_tid(); + + shunique_lock sul(rwlock, ceph::acquire_shared); + if (!initialized) { + m->put(); + return; + } + + ConnectionRef con = m->get_connection(); + auto priv = con->get_priv(); + auto s = static_cast<OSDSession*>(priv.get()); + if (!s || s->con != con) { + ldout(cct, 7) << __func__ << " no session on con " << con << dendl; + m->put(); + return; + } + + unique_lock sl(s->lock); + + map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid); + if (iter == s->ops.end()) { + ldout(cct, 7) << "handle_osd_op_reply " << tid + << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ? + " onnvram" : " ack")) + << " ... stray" << dendl; + sl.unlock(); + m->put(); + return; + } + + ldout(cct, 7) << "handle_osd_op_reply " << tid + << (m->is_ondisk() ? " ondisk" : + (m->is_onnvram() ? " onnvram" : " ack")) + << " uv " << m->get_user_version() + << " in " << m->get_pg() + << " attempt " << m->get_retry_attempt() + << dendl; + Op *op = iter->second; + op->trace.event("osd op reply"); + + if (retry_writes_after_first_reply && op->attempts == 1 && + (op->target.flags & CEPH_OSD_FLAG_WRITE)) { + ldout(cct, 7) << "retrying write after first reply: " << tid << dendl; + if (op->has_completion()) { + num_in_flight--; + } + _session_op_remove(s, op); + sl.unlock(); + + _op_submit(op, sul, NULL); + m->put(); + return; + } + + if (m->get_retry_attempt() >= 0) { + if (m->get_retry_attempt() != (op->attempts - 1)) { + ldout(cct, 7) << " ignoring reply from attempt " + << m->get_retry_attempt() + << " from " << m->get_source_inst() + << "; last attempt " << (op->attempts - 1) << " sent to " + << op->session->con->get_peer_addr() << dendl; + m->put(); + sl.unlock(); + return; + } + } else { + // we don't know the request attempt because the server is old, so + // just accept this one. we may do ACK callbacks we shouldn't + // have, but that is better than doing callbacks out of order. + } + + decltype(op->onfinish) onfinish; + + int rc = m->get_result(); + + if (m->is_redirect_reply()) { + ldout(cct, 5) << " got redirect reply; redirecting" << dendl; + if (op->has_completion()) + num_in_flight--; + _session_op_remove(s, op); + sl.unlock(); + + // FIXME: two redirects could race and reorder + + op->tid = 0; + m->get_redirect().combine_with_locator(op->target.target_oloc, + op->target.target_oid.name); + op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED | + CEPH_OSD_FLAG_IGNORE_CACHE | + CEPH_OSD_FLAG_IGNORE_OVERLAY); + _op_submit(op, sul, NULL); + m->put(); + return; + } + + if (rc == -EAGAIN) { + ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl; + if (op->has_completion()) + num_in_flight--; + _session_op_remove(s, op); + sl.unlock(); + + op->tid = 0; + op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS | + CEPH_OSD_FLAG_LOCALIZE_READS); + op->target.pgid = pg_t(); + _op_submit(op, sul, NULL); + m->put(); + return; + } + + sul.unlock(); + + if (op->objver) + *op->objver = m->get_user_version(); + if (op->reply_epoch) + *op->reply_epoch = m->get_map_epoch(); + if (op->data_offset) + *op->data_offset = m->get_header().data_off; + + // got data? + if (op->outbl) { +#if 0 + if (op->con) + op->con->revoke_rx_buffer(op->tid); +#endif + auto& bl = m->get_data(); + if (op->outbl->length() == bl.length() && + bl.get_num_buffers() <= 1) { + // this is here to keep previous users to *relied* on getting data + // read into existing buffers happy. Notably, + // libradosstriper::RadosStriperImpl::aio_read(). + ldout(cct,10) << __func__ << " copying resulting " << bl.length() + << " into existing ceph::buffer of length " << op->outbl->length() + << dendl; + cb::list t; + t = std::move(*op->outbl); + t.invalidate_crc(); // we're overwriting the raw buffers via c_str() + bl.begin().copy(bl.length(), t.c_str()); + op->outbl->substr_of(t, 0, bl.length()); + } else { + m->claim_data(*op->outbl); + } + op->outbl = 0; + } + + // per-op result demuxing + vector<OSDOp> out_ops; + m->claim_ops(out_ops); + + if (out_ops.size() != op->ops.size()) + ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops + << " != request ops " << op->ops + << " from " << m->get_source_inst() << dendl; + + ceph_assert(op->ops.size() == op->out_bl.size()); + ceph_assert(op->ops.size() == op->out_rval.size()); + ceph_assert(op->ops.size() == op->out_ec.size()); + ceph_assert(op->ops.size() == op->out_handler.size()); + auto pb = op->out_bl.begin(); + auto pr = op->out_rval.begin(); + auto pe = op->out_ec.begin(); + auto ph = op->out_handler.begin(); + ceph_assert(op->out_bl.size() == op->out_rval.size()); + ceph_assert(op->out_bl.size() == op->out_handler.size()); + auto p = out_ops.begin(); + for (unsigned i = 0; + p != out_ops.end() && pb != op->out_bl.end(); + ++i, ++p, ++pb, ++pr, ++pe, ++ph) { + ldout(cct, 10) << " op " << i << " rval " << p->rval + << " len " << p->outdata.length() << dendl; + if (*pb) + **pb = p->outdata; + // set rval before running handlers so that handlers + // can change it if e.g. decoding fails + if (*pr) + **pr = ceph_to_hostos_errno(p->rval); + if (*pe) + **pe = p->rval < 0 ? bs::error_code(-p->rval, osd_category()) : + bs::error_code(); + if (*ph) { + std::move((*ph))(p->rval < 0 ? + bs::error_code(-p->rval, osd_category()) : + bs::error_code(), + p->rval, p->outdata); + } + } + + // NOTE: we assume that since we only request ONDISK ever we will + // only ever get back one (type of) ack ever. + + if (op->has_completion()) { + num_in_flight--; + onfinish = std::move(op->onfinish); + op->onfinish = nullptr; + } + logger->inc(l_osdc_op_reply); + + /* get it before we call _finish_op() */ + auto completion_lock = s->get_lock(op->target.base_oid); + + ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl; + _finish_op(op, 0); + + ldout(cct, 5) << num_in_flight << " in flight" << dendl; + + // serialize completions + if (completion_lock.mutex()) { + completion_lock.lock(); + } + sl.unlock(); + + // do callbacks + if (Op::has_completion(onfinish)) { + Op::complete(std::move(onfinish), osdcode(rc), rc); + } + if (completion_lock.mutex()) { + completion_lock.unlock(); + } + + m->put(); +} + +void Objecter::handle_osd_backoff(MOSDBackoff *m) +{ + ldout(cct, 10) << __func__ << " " << *m << dendl; + shunique_lock sul(rwlock, ceph::acquire_shared); + if (!initialized) { + m->put(); + return; + } + + ConnectionRef con = m->get_connection(); + auto priv = con->get_priv(); + auto s = static_cast<OSDSession*>(priv.get()); + if (!s || s->con != con) { + ldout(cct, 7) << __func__ << " no session on con " << con << dendl; + m->put(); + return; + } + + get_session(s); + + unique_lock sl(s->lock); + + switch (m->op) { + case CEPH_OSD_BACKOFF_OP_BLOCK: + { + // register + OSDBackoff& b = s->backoffs[m->pgid][m->begin]; + s->backoffs_by_id.insert(make_pair(m->id, &b)); + b.pgid = m->pgid; + b.id = m->id; + b.begin = m->begin; + b.end = m->end; + + // ack with original backoff's epoch so that the osd can discard this if + // there was a pg split. + auto r = new MOSDBackoff(m->pgid, m->map_epoch, + CEPH_OSD_BACKOFF_OP_ACK_BLOCK, + m->id, m->begin, m->end); + // this priority must match the MOSDOps from _prepare_osd_op + r->set_priority(cct->_conf->osd_client_op_priority); + con->send_message(r); + } + break; + + case CEPH_OSD_BACKOFF_OP_UNBLOCK: + { + auto p = s->backoffs_by_id.find(m->id); + if (p != s->backoffs_by_id.end()) { + OSDBackoff *b = p->second; + if (b->begin != m->begin && + b->end != m->end) { + lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id + << " unblock on [" + << m->begin << "," << m->end << ") but backoff is [" + << b->begin << "," << b->end << ")" << dendl; + // hrmpf, unblock it anyway. + } + ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid + << " id " << b->id + << " [" << b->begin << "," << b->end + << ")" << dendl; + auto spgp = s->backoffs.find(b->pgid); + ceph_assert(spgp != s->backoffs.end()); + spgp->second.erase(b->begin); + if (spgp->second.empty()) { + s->backoffs.erase(spgp); + } + s->backoffs_by_id.erase(p); + + // check for any ops to resend + for (auto& q : s->ops) { + if (q.second->target.actual_pgid == m->pgid) { + int r = q.second->target.contained_by(m->begin, m->end); + ldout(cct, 20) << __func__ << " contained_by " << r << " on " + << q.second->target.get_hobj() << dendl; + if (r) { + _send_op(q.second); + } + } + } + } else { + lderr(cct) << __func__ << " " << m->pgid << " id " << m->id + << " unblock on [" + << m->begin << "," << m->end << ") but backoff dne" << dendl; + } + } + break; + + default: + ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl; + } + + sul.unlock(); + sl.unlock(); + + m->put(); + put_session(s); +} + +uint32_t Objecter::list_nobjects_seek(NListContext *list_context, + uint32_t pos) +{ + shared_lock rl(rwlock); + list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP, + pos, list_context->pool_id, string()); + ldout(cct, 10) << __func__ << " " << list_context + << " pos " << pos << " -> " << list_context->pos << dendl; + pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id)); + list_context->current_pg = actual.ps(); + list_context->at_end_of_pool = false; + return pos; +} + +uint32_t Objecter::list_nobjects_seek(NListContext *list_context, + const hobject_t& cursor) +{ + shared_lock rl(rwlock); + ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl; + list_context->pos = cursor; + list_context->at_end_of_pool = false; + pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id)); + list_context->current_pg = actual.ps(); + list_context->sort_bitwise = true; + return list_context->current_pg; +} + +void Objecter::list_nobjects_get_cursor(NListContext *list_context, + hobject_t *cursor) +{ + shared_lock rl(rwlock); + if (list_context->list.empty()) { + *cursor = list_context->pos; + } else { + const librados::ListObjectImpl& entry = list_context->list.front(); + const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator); + uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace); + *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace); + } +} + +void Objecter::list_nobjects(NListContext *list_context, Context *onfinish) +{ + ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id + << " pool_snap_seq " << list_context->pool_snap_seq + << " max_entries " << list_context->max_entries + << " list_context " << list_context + << " onfinish " << onfinish + << " current_pg " << list_context->current_pg + << " pos " << list_context->pos << dendl; + + shared_lock rl(rwlock); + const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id); + if (!pool) { // pool is gone + rl.unlock(); + put_nlist_context_budget(list_context); + onfinish->complete(-ENOENT); + return; + } + int pg_num = pool->get_pg_num(); + bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); + + if (list_context->pos.is_min()) { + list_context->starting_pg_num = 0; + list_context->sort_bitwise = sort_bitwise; + list_context->starting_pg_num = pg_num; + } + if (list_context->sort_bitwise != sort_bitwise) { + list_context->pos = hobject_t( + object_t(), string(), CEPH_NOSNAP, + list_context->current_pg, list_context->pool_id, string()); + list_context->sort_bitwise = sort_bitwise; + ldout(cct, 10) << " hobject sort order changed, restarting this pg at " + << list_context->pos << dendl; + } + if (list_context->starting_pg_num != pg_num) { + if (!sort_bitwise) { + // start reading from the beginning; the pgs have changed + ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl; + list_context->pos = collection_list_handle_t(); + } + list_context->starting_pg_num = pg_num; + } + + if (list_context->pos.is_max()) { + ldout(cct, 20) << __func__ << " end of pool, list " + << list_context->list << dendl; + if (list_context->list.empty()) { + list_context->at_end_of_pool = true; + } + // release the listing context's budget once all + // OPs (in the session) are finished + put_nlist_context_budget(list_context); + onfinish->complete(0); + return; + } + + ObjectOperation op; + op.pg_nls(list_context->max_entries, list_context->filter, + list_context->pos, osdmap->get_epoch()); + list_context->bl.clear(); + auto onack = new C_NList(list_context, onfinish, this); + object_locator_t oloc(list_context->pool_id, list_context->nspace); + + // note current_pg in case we don't have (or lose) SORTBITWISE + list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash()); + rl.unlock(); + + pg_read(list_context->current_pg, oloc, op, + &list_context->bl, 0, onack, &onack->epoch, + &list_context->ctx_budget); +} + +void Objecter::_nlist_reply(NListContext *list_context, int r, + Context *final_finish, epoch_t reply_epoch) +{ + ldout(cct, 10) << __func__ << " " << list_context << dendl; + + auto iter = list_context->bl.cbegin(); + pg_nls_response_t response; + decode(response, iter); + if (!iter.end()) { + // we do this as legacy. + cb::list legacy_extra_info; + decode(legacy_extra_info, iter); + } + + // if the osd returns 1 (newer code), or handle MAX, it means we + // hit the end of the pg. + if ((response.handle.is_max() || r == 1) && + !list_context->sort_bitwise) { + // legacy OSD and !sortbitwise, figure out the next PG on our own + ++list_context->current_pg; + if (list_context->current_pg == list_context->starting_pg_num) { + // end of pool + list_context->pos = hobject_t::get_max(); + } else { + // next pg + list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP, + list_context->current_pg, + list_context->pool_id, string()); + } + } else { + list_context->pos = response.handle; + } + + int response_size = response.entries.size(); + ldout(cct, 20) << " response.entries.size " << response_size + << ", response.entries " << response.entries + << ", handle " << response.handle + << ", tentative new pos " << list_context->pos << dendl; + if (response_size) { + std::move(response.entries.begin(), response.entries.end(), + std::back_inserter(list_context->list)); + response.entries.clear(); + } + + if (list_context->list.size() >= list_context->max_entries) { + ldout(cct, 20) << " hit max, returning results so far, " + << list_context->list << dendl; + // release the listing context's budget once all + // OPs (in the session) are finished + put_nlist_context_budget(list_context); + final_finish->complete(0); + return; + } + + // continue! + list_nobjects(list_context, final_finish); +} + +void Objecter::put_nlist_context_budget(NListContext *list_context) +{ + if (list_context->ctx_budget >= 0) { + ldout(cct, 10) << " release listing context's budget " << + list_context->ctx_budget << dendl; + put_op_budget_bytes(list_context->ctx_budget); + list_context->ctx_budget = -1; + } +} + +// snapshots + +void Objecter::create_pool_snap(int64_t pool, std::string_view snap_name, + decltype(PoolOp::onfinish)&& onfinish) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: " + << snap_name << dendl; + + const pg_pool_t *p = osdmap->get_pg_pool(pool); + if (!p) { + onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + return; + } + if (p->snap_exists(snap_name)) { + onfinish->defer(std::move(onfinish), osdc_errc::snapshot_exists, + cb::list{}); + return; + } + + auto op = new PoolOp; + op->tid = ++last_tid; + op->pool = pool; + op->name = snap_name; + op->onfinish = std::move(onfinish); + op->pool_op = POOL_OP_CREATE_SNAP; + pool_ops[op->tid] = op; + + pool_op_submit(op); +} + +struct CB_SelfmanagedSnap { + std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> fin; + CB_SelfmanagedSnap(decltype(fin)&& fin) + : fin(std::move(fin)) {} + void operator()(bs::error_code ec, const cb::list& bl) { + snapid_t snapid = 0; + if (!ec) { + try { + auto p = bl.cbegin(); + decode(snapid, p); + } catch (const cb::error& e) { + ec = e.code(); + } + } + fin->defer(std::move(fin), ec, snapid); + } +}; + +void Objecter::allocate_selfmanaged_snap( + int64_t pool, + std::unique_ptr<ca::Completion<void(bs::error_code, snapid_t)>> onfinish) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl; + auto op = new PoolOp; + op->tid = ++last_tid; + op->pool = pool; + op->onfinish = PoolOp::OpComp::create( + service.get_executor(), + CB_SelfmanagedSnap(std::move(onfinish))); + op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP; + pool_ops[op->tid] = op; + + pool_op_submit(op); +} + +void Objecter::delete_pool_snap( + int64_t pool, std::string_view snap_name, + decltype(PoolOp::onfinish)&& onfinish) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: " + << snap_name << dendl; + + const pg_pool_t *p = osdmap->get_pg_pool(pool); + if (!p) { + onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + return; + } + + if (!p->snap_exists(snap_name)) { + onfinish->defer(std::move(onfinish), osdc_errc::snapshot_dne, cb::list{}); + return; + } + + auto op = new PoolOp; + op->tid = ++last_tid; + op->pool = pool; + op->name = snap_name; + op->onfinish = std::move(onfinish); + op->pool_op = POOL_OP_DELETE_SNAP; + pool_ops[op->tid] = op; + + pool_op_submit(op); +} + +void Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap, + decltype(PoolOp::onfinish)&& onfinish) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: " + << snap << dendl; + auto op = new PoolOp; + op->tid = ++last_tid; + op->pool = pool; + op->onfinish = std::move(onfinish); + op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP; + op->snapid = snap; + pool_ops[op->tid] = op; + + pool_op_submit(op); +} + +void Objecter::create_pool(std::string_view name, + decltype(PoolOp::onfinish)&& onfinish, + int crush_rule) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "create_pool name=" << name << dendl; + + if (osdmap->lookup_pg_pool_name(name) >= 0) { + onfinish->defer(std::move(onfinish), osdc_errc::pool_exists, cb::list{}); + return; + } + + auto op = new PoolOp; + op->tid = ++last_tid; + op->pool = 0; + op->name = name; + op->onfinish = std::move(onfinish); + op->pool_op = POOL_OP_CREATE; + pool_ops[op->tid] = op; + op->crush_rule = crush_rule; + + pool_op_submit(op); +} + +void Objecter::delete_pool(int64_t pool, + decltype(PoolOp::onfinish)&& onfinish) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "delete_pool " << pool << dendl; + + if (!osdmap->have_pg_pool(pool)) + onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + else + _do_delete_pool(pool, std::move(onfinish)); +} + +void Objecter::delete_pool(std::string_view pool_name, + decltype(PoolOp::onfinish)&& onfinish) +{ + unique_lock wl(rwlock); + ldout(cct, 10) << "delete_pool " << pool_name << dendl; + + int64_t pool = osdmap->lookup_pg_pool_name(pool_name); + if (pool < 0) + // This only returns one error: -ENOENT. + onfinish->defer(std::move(onfinish), osdc_errc::pool_dne, cb::list{}); + else + _do_delete_pool(pool, std::move(onfinish)); +} + +void Objecter::_do_delete_pool(int64_t pool, + decltype(PoolOp::onfinish)&& onfinish) + +{ + auto op = new PoolOp; + op->tid = ++last_tid; + op->pool = pool; + op->name = "delete"; + op->onfinish = std::move(onfinish); + op->pool_op = POOL_OP_DELETE; + pool_ops[op->tid] = op; + pool_op_submit(op); +} + +void Objecter::pool_op_submit(PoolOp *op) +{ + // rwlock is locked + if (mon_timeout > timespan(0)) { + op->ontimeout = timer.add_event(mon_timeout, + [this, op]() { + pool_op_cancel(op->tid, -ETIMEDOUT); }); + } + _pool_op_submit(op); +} + +void Objecter::_pool_op_submit(PoolOp *op) +{ + // rwlock is locked unique + + ldout(cct, 10) << "pool_op_submit " << op->tid << dendl; + auto m = new MPoolOp(monc->get_fsid(), op->tid, op->pool, + op->name, op->pool_op, + last_seen_osdmap_version); + if (op->snapid) m->snapid = op->snapid; + if (op->crush_rule) m->crush_rule = op->crush_rule; + monc->send_mon_message(m); + op->last_submit = ceph::coarse_mono_clock::now(); + + logger->inc(l_osdc_poolop_send); +} + +/** + * Handle a reply to a PoolOp message. Check that we sent the message + * and give the caller responsibility for the returned cb::list. + * Then either call the finisher or stash the PoolOp, depending on if we + * have a new enough map. + * Lastly, clean up the message and PoolOp. + */ +void Objecter::handle_pool_op_reply(MPoolOpReply *m) +{ + int rc = m->replyCode; + auto ec = rc < 0 ? bs::error_code(-rc, mon_category()) : bs::error_code(); + FUNCTRACE(cct); + shunique_lock sul(rwlock, acquire_shared); + if (!initialized) { + sul.unlock(); + m->put(); + return; + } + + ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl; + ceph_tid_t tid = m->get_tid(); + auto iter = pool_ops.find(tid); + if (iter != pool_ops.end()) { + PoolOp *op = iter->second; + ldout(cct, 10) << "have request " << tid << " at " << op << " Op: " + << ceph_pool_op_name(op->pool_op) << dendl; + cb::list bl{std::move(m->response_data)}; + if (m->version > last_seen_osdmap_version) + last_seen_osdmap_version = m->version; + if (osdmap->get_epoch() < m->epoch) { + sul.unlock(); + sul.lock(); + // recheck op existence since we have let go of rwlock + // (for promotion) above. + iter = pool_ops.find(tid); + if (iter == pool_ops.end()) + goto done; // op is gone. + if (osdmap->get_epoch() < m->epoch) { + ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch + << " before calling back" << dendl; + _wait_for_new_map(OpCompletion::create( + service.get_executor(), + [o = std::move(op->onfinish), + bl = std::move(bl)]( + bs::error_code ec) mutable { + o->defer(std::move(o), ec, bl); + }), + m->epoch, + ec); + } else { + // map epoch changed, probably because a MOSDMap message + // sneaked in. Do caller-specified callback now or else + // we lose it forever. + ceph_assert(op->onfinish); + op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl)); + } + } else { + ceph_assert(op->onfinish); + op->onfinish->defer(std::move(op->onfinish), ec, std::move(bl)); + } + op->onfinish = nullptr; + if (!sul.owns_lock()) { + sul.unlock(); + sul.lock(); + } + iter = pool_ops.find(tid); + if (iter != pool_ops.end()) { + _finish_pool_op(op, 0); + } + } else { + ldout(cct, 10) << "unknown request " << tid << dendl; + } + +done: + // Not strictly necessary, since we'll release it on return. + sul.unlock(); + + ldout(cct, 10) << "done" << dendl; + m->put(); +} + +int Objecter::pool_op_cancel(ceph_tid_t tid, int r) +{ + ceph_assert(initialized); + + unique_lock wl(rwlock); + + auto it = pool_ops.find(tid); + if (it == pool_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + PoolOp *op = it->second; + if (op->onfinish) + op->onfinish->defer(std::move(op->onfinish), osdcode(r), cb::list{}); + + _finish_pool_op(op, r); + return 0; +} + +void Objecter::_finish_pool_op(PoolOp *op, int r) +{ + // rwlock is locked unique + pool_ops.erase(op->tid); + logger->set(l_osdc_poolop_active, pool_ops.size()); + + if (op->ontimeout && r != -ETIMEDOUT) { + timer.cancel_event(op->ontimeout); + } + + delete op; +} + +// pool stats + +void Objecter::get_pool_stats( + const std::vector<std::string>& pools, + decltype(PoolStatOp::onfinish)&& onfinish) +{ + ldout(cct, 10) << "get_pool_stats " << pools << dendl; + + auto op = new PoolStatOp; + op->tid = ++last_tid; + op->pools = pools; + op->onfinish = std::move(onfinish); + if (mon_timeout > timespan(0)) { + op->ontimeout = timer.add_event(mon_timeout, + [this, op]() { + pool_stat_op_cancel(op->tid, + -ETIMEDOUT); }); + } else { + op->ontimeout = 0; + } + + unique_lock wl(rwlock); + + poolstat_ops[op->tid] = op; + + logger->set(l_osdc_poolstat_active, poolstat_ops.size()); + + _poolstat_submit(op); +} + +void Objecter::_poolstat_submit(PoolStatOp *op) +{ + ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl; + monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid, + op->pools, + last_seen_pgmap_version)); + op->last_submit = ceph::coarse_mono_clock::now(); + + logger->inc(l_osdc_poolstat_send); +} + +void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m) +{ + ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl; + ceph_tid_t tid = m->get_tid(); + + unique_lock wl(rwlock); + if (!initialized) { + m->put(); + return; + } + + auto iter = poolstat_ops.find(tid); + if (iter != poolstat_ops.end()) { + PoolStatOp *op = poolstat_ops[tid]; + ldout(cct, 10) << "have request " << tid << " at " << op << dendl; + if (m->version > last_seen_pgmap_version) { + last_seen_pgmap_version = m->version; + } + op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, + std::move(m->pool_stats), m->per_pool); + _finish_pool_stat_op(op, 0); + } else { + ldout(cct, 10) << "unknown request " << tid << dendl; + } + ldout(cct, 10) << "done" << dendl; + m->put(); +} + +int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r) +{ + ceph_assert(initialized); + + unique_lock wl(rwlock); + + auto it = poolstat_ops.find(tid); + if (it == poolstat_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + auto op = it->second; + if (op->onfinish) + op->onfinish->defer(std::move(op->onfinish), osdcode(r), + bc::flat_map<std::string, pool_stat_t>{}, false); + _finish_pool_stat_op(op, r); + return 0; +} + +void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r) +{ + // rwlock is locked unique + + poolstat_ops.erase(op->tid); + logger->set(l_osdc_poolstat_active, poolstat_ops.size()); + + if (op->ontimeout && r != -ETIMEDOUT) + timer.cancel_event(op->ontimeout); + + delete op; +} + +void Objecter::get_fs_stats(boost::optional<int64_t> poolid, + decltype(StatfsOp::onfinish)&& onfinish) +{ + ldout(cct, 10) << "get_fs_stats" << dendl; + unique_lock l(rwlock); + + auto op = new StatfsOp; + op->tid = ++last_tid; + op->data_pool = poolid; + op->onfinish = std::move(onfinish); + if (mon_timeout > timespan(0)) { + op->ontimeout = timer.add_event(mon_timeout, + [this, op]() { + statfs_op_cancel(op->tid, + -ETIMEDOUT); }); + } else { + op->ontimeout = 0; + } + statfs_ops[op->tid] = op; + + logger->set(l_osdc_statfs_active, statfs_ops.size()); + + _fs_stats_submit(op); +} + +void Objecter::_fs_stats_submit(StatfsOp *op) +{ + // rwlock is locked unique + + ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl; + monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid, + op->data_pool, + last_seen_pgmap_version)); + op->last_submit = ceph::coarse_mono_clock::now(); + + logger->inc(l_osdc_statfs_send); +} + +void Objecter::handle_fs_stats_reply(MStatfsReply *m) +{ + unique_lock wl(rwlock); + if (!initialized) { + m->put(); + return; + } + + ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl; + ceph_tid_t tid = m->get_tid(); + + if (statfs_ops.count(tid)) { + StatfsOp *op = statfs_ops[tid]; + ldout(cct, 10) << "have request " << tid << " at " << op << dendl; + if (m->h.version > last_seen_pgmap_version) + last_seen_pgmap_version = m->h.version; + op->onfinish->defer(std::move(op->onfinish), bs::error_code{}, m->h.st); + _finish_statfs_op(op, 0); + } else { + ldout(cct, 10) << "unknown request " << tid << dendl; + } + m->put(); + ldout(cct, 10) << "done" << dendl; +} + +int Objecter::statfs_op_cancel(ceph_tid_t tid, int r) +{ + ceph_assert(initialized); + + unique_lock wl(rwlock); + + auto it = statfs_ops.find(tid); + if (it == statfs_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + auto op = it->second; + if (op->onfinish) + op->onfinish->defer(std::move(op->onfinish), osdcode(r), ceph_statfs{}); + _finish_statfs_op(op, r); + return 0; +} + +void Objecter::_finish_statfs_op(StatfsOp *op, int r) +{ + // rwlock is locked unique + + statfs_ops.erase(op->tid); + logger->set(l_osdc_statfs_active, statfs_ops.size()); + + if (op->ontimeout && r != -ETIMEDOUT) + timer.cancel_event(op->ontimeout); + + delete op; +} + +// scatter/gather + +void Objecter::_sg_read_finish(vector<ObjectExtent>& extents, + vector<cb::list>& resultbl, + cb::list *bl, Context *onfinish) +{ + // all done + ldout(cct, 15) << "_sg_read_finish" << dendl; + + if (extents.size() > 1) { + Striper::StripedReadResult r; + auto bit = resultbl.begin(); + for (auto eit = extents.begin(); + eit != extents.end(); + ++eit, ++bit) { + r.add_partial_result(cct, *bit, eit->buffer_extents); + } + bl->clear(); + r.assemble_result(cct, *bl, false); + } else { + ldout(cct, 15) << " only one frag" << dendl; + *bl = std::move(resultbl[0]); + } + + // done + uint64_t bytes_read = bl->length(); + ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl; + + if (onfinish) { + onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result()); + } +} + + +void Objecter::ms_handle_connect(Connection *con) +{ + ldout(cct, 10) << "ms_handle_connect " << con << dendl; + if (!initialized) + return; + + if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) + resend_mon_ops(); +} + +bool Objecter::ms_handle_reset(Connection *con) +{ + if (!initialized) + return false; + if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { + unique_lock wl(rwlock); + + auto priv = con->get_priv(); + auto session = static_cast<OSDSession*>(priv.get()); + if (session) { + ldout(cct, 1) << "ms_handle_reset " << con << " session " << session + << " osd." << session->osd << dendl; + // the session maybe had been closed if new osdmap just handled + // says the osd down + if (!(initialized && osdmap->is_up(session->osd))) { + ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl; + wl.unlock(); + return false; + } + map<uint64_t, LingerOp *> lresend; + unique_lock sl(session->lock); + _reopen_session(session); + _kick_requests(session, lresend); + sl.unlock(); + _linger_ops_resend(lresend, wl); + wl.unlock(); + maybe_request_map(); + } + return true; + } + return false; +} + +void Objecter::ms_handle_remote_reset(Connection *con) +{ + /* + * treat these the same. + */ + ms_handle_reset(con); +} + +bool Objecter::ms_handle_refused(Connection *con) +{ + // just log for now + if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) { + int osd = osdmap->identify_osd(con->get_peer_addr()); + if (osd >= 0) { + ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl; + } + } + return false; +} + +void Objecter::op_target_t::dump(Formatter *f) const +{ + f->dump_stream("pg") << pgid; + f->dump_int("osd", osd); + f->dump_stream("object_id") << base_oid; + f->dump_stream("object_locator") << base_oloc; + f->dump_stream("target_object_id") << target_oid; + f->dump_stream("target_object_locator") << target_oloc; + f->dump_int("paused", (int)paused); + f->dump_int("used_replica", (int)used_replica); + f->dump_int("precalc_pgid", (int)precalc_pgid); +} + +void Objecter::_dump_active(OSDSession *s) +{ + for (auto p = s->ops.begin(); p != s->ops.end(); ++p) { + Op *op = p->second; + ldout(cct, 20) << op->tid << "\t" << op->target.pgid + << "\tosd." << (op->session ? op->session->osd : -1) + << "\t" << op->target.base_oid + << "\t" << op->ops << dendl; + } +} + +void Objecter::_dump_active() +{ + ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless" + << dendl; + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + auto s = siter->second; + shared_lock sl(s->lock); + _dump_active(s); + sl.unlock(); + } + _dump_active(homeless_session); +} + +void Objecter::dump_active() +{ + shared_lock rl(rwlock); + _dump_active(); + rl.unlock(); +} + +void Objecter::dump_requests(Formatter *fmt) +{ + // Read-lock on Objecter held here + fmt->open_object_section("requests"); + dump_ops(fmt); + dump_linger_ops(fmt); + dump_pool_ops(fmt); + dump_pool_stat_ops(fmt); + dump_statfs_ops(fmt); + dump_command_ops(fmt); + fmt->close_section(); // requests object +} + +void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt) +{ + for (auto p = s->ops.begin(); p != s->ops.end(); ++p) { + Op *op = p->second; + auto age = std::chrono::duration<double>(ceph::coarse_mono_clock::now() - op->stamp); + fmt->open_object_section("op"); + fmt->dump_unsigned("tid", op->tid); + op->target.dump(fmt); + fmt->dump_stream("last_sent") << op->stamp; + fmt->dump_float("age", age.count()); + fmt->dump_int("attempts", op->attempts); + fmt->dump_stream("snapid") << op->snapid; + fmt->dump_stream("snap_context") << op->snapc; + fmt->dump_stream("mtime") << op->mtime; + + fmt->open_array_section("osd_ops"); + for (auto it = op->ops.begin(); it != op->ops.end(); ++it) { + fmt->dump_stream("osd_op") << *it; + } + fmt->close_section(); // osd_ops array + + fmt->close_section(); // op object + } +} + +void Objecter::dump_ops(Formatter *fmt) +{ + // Read-lock on Objecter held + fmt->open_array_section("ops"); + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + OSDSession *s = siter->second; + shared_lock sl(s->lock); + _dump_ops(s, fmt); + sl.unlock(); + } + _dump_ops(homeless_session, fmt); + fmt->close_section(); // ops array +} + +void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt) +{ + for (auto p = s->linger_ops.begin(); p != s->linger_ops.end(); ++p) { + auto op = p->second; + fmt->open_object_section("linger_op"); + fmt->dump_unsigned("linger_id", op->linger_id); + op->target.dump(fmt); + fmt->dump_stream("snapid") << op->snap; + fmt->dump_stream("registered") << op->registered; + fmt->close_section(); // linger_op object + } +} + +void Objecter::dump_linger_ops(Formatter *fmt) +{ + // We have a read-lock on the objecter + fmt->open_array_section("linger_ops"); + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + auto s = siter->second; + shared_lock sl(s->lock); + _dump_linger_ops(s, fmt); + sl.unlock(); + } + _dump_linger_ops(homeless_session, fmt); + fmt->close_section(); // linger_ops array +} + +void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt) +{ + for (auto p = s->command_ops.begin(); p != s->command_ops.end(); ++p) { + auto op = p->second; + fmt->open_object_section("command_op"); + fmt->dump_unsigned("command_id", op->tid); + fmt->dump_int("osd", op->session ? op->session->osd : -1); + fmt->open_array_section("command"); + for (auto q = op->cmd.begin(); q != op->cmd.end(); ++q) + fmt->dump_string("word", *q); + fmt->close_section(); + if (op->target_osd >= 0) + fmt->dump_int("target_osd", op->target_osd); + else + fmt->dump_stream("target_pg") << op->target_pg; + fmt->close_section(); // command_op object + } +} + +void Objecter::dump_command_ops(Formatter *fmt) +{ + // We have a read-lock on the Objecter here + fmt->open_array_section("command_ops"); + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + auto s = siter->second; + shared_lock sl(s->lock); + _dump_command_ops(s, fmt); + sl.unlock(); + } + _dump_command_ops(homeless_session, fmt); + fmt->close_section(); // command_ops array +} + +void Objecter::dump_pool_ops(Formatter *fmt) const +{ + fmt->open_array_section("pool_ops"); + for (auto p = pool_ops.begin(); p != pool_ops.end(); ++p) { + auto op = p->second; + fmt->open_object_section("pool_op"); + fmt->dump_unsigned("tid", op->tid); + fmt->dump_int("pool", op->pool); + fmt->dump_string("name", op->name); + fmt->dump_int("operation_type", op->pool_op); + fmt->dump_unsigned("crush_rule", op->crush_rule); + fmt->dump_stream("snapid") << op->snapid; + fmt->dump_stream("last_sent") << op->last_submit; + fmt->close_section(); // pool_op object + } + fmt->close_section(); // pool_ops array +} + +void Objecter::dump_pool_stat_ops(Formatter *fmt) const +{ + fmt->open_array_section("pool_stat_ops"); + for (auto p = poolstat_ops.begin(); + p != poolstat_ops.end(); + ++p) { + PoolStatOp *op = p->second; + fmt->open_object_section("pool_stat_op"); + fmt->dump_unsigned("tid", op->tid); + fmt->dump_stream("last_sent") << op->last_submit; + + fmt->open_array_section("pools"); + for (const auto& it : op->pools) { + fmt->dump_string("pool", it); + } + fmt->close_section(); // pools array + + fmt->close_section(); // pool_stat_op object + } + fmt->close_section(); // pool_stat_ops array +} + +void Objecter::dump_statfs_ops(Formatter *fmt) const +{ + fmt->open_array_section("statfs_ops"); + for (auto p = statfs_ops.begin(); p != statfs_ops.end(); ++p) { + auto op = p->second; + fmt->open_object_section("statfs_op"); + fmt->dump_unsigned("tid", op->tid); + fmt->dump_stream("last_sent") << op->last_submit; + fmt->close_section(); // statfs_op object + } + fmt->close_section(); // statfs_ops array +} + +Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) : + m_objecter(objecter) +{ +} + +int Objecter::RequestStateHook::call(std::string_view command, + const cmdmap_t& cmdmap, + Formatter *f, + std::ostream& ss, + cb::list& out) +{ + shared_lock rl(m_objecter->rwlock); + m_objecter->dump_requests(f); + return 0; +} + +void Objecter::blocklist_self(bool set) +{ + ldout(cct, 10) << "blocklist_self " << (set ? "add" : "rm") << dendl; + + vector<string> cmd; + cmd.push_back("{\"prefix\":\"osd blocklist\", "); + if (set) + cmd.push_back("\"blocklistop\":\"add\","); + else + cmd.push_back("\"blocklistop\":\"rm\","); + stringstream ss; + // this is somewhat imprecise in that we are blocklisting our first addr only + ss << messenger->get_myaddrs().front().get_legacy_str(); + cmd.push_back("\"addr\":\"" + ss.str() + "\""); + + auto m = new MMonCommand(monc->get_fsid()); + m->cmd = cmd; + + // NOTE: no fallback to legacy blacklist command implemented here + // since this is only used for test code. + + monc->send_mon_message(m); +} + +// commands + +void Objecter::handle_command_reply(MCommandReply *m) +{ + unique_lock wl(rwlock); + if (!initialized) { + m->put(); + return; + } + + ConnectionRef con = m->get_connection(); + auto priv = con->get_priv(); + auto s = static_cast<OSDSession*>(priv.get()); + if (!s || s->con != con) { + ldout(cct, 7) << __func__ << " no session on con " << con << dendl; + m->put(); + return; + } + + shared_lock sl(s->lock); + auto p = s->command_ops.find(m->get_tid()); + if (p == s->command_ops.end()) { + ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() + << " not found" << dendl; + m->put(); + sl.unlock(); + return; + } + + CommandOp *c = p->second; + if (!c->session || + m->get_connection() != c->session->con) { + ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() + << " got reply from wrong connection " + << m->get_connection() << " " << m->get_source_inst() + << dendl; + m->put(); + sl.unlock(); + return; + } + + if (m->r == -EAGAIN) { + ldout(cct,10) << __func__ << " tid " << m->get_tid() + << " got EAGAIN, requesting map and resending" << dendl; + // NOTE: This might resend twice... once now, and once again when + // we get an updated osdmap and the PG is found to have moved. + _maybe_request_map(); + _send_command(c); + m->put(); + sl.unlock(); + return; + } + + sl.unlock(); + + unique_lock sul(s->lock); + _finish_command(c, m->r < 0 ? bs::error_code(-m->r, osd_category()) : + bs::error_code(), std::move(m->rs), + std::move(m->get_data())); + sul.unlock(); + + m->put(); +} + +Objecter::LingerOp::LingerOp(Objecter *o, uint64_t linger_id) + : objecter(o), + linger_id(linger_id), + watch_lock(ceph::make_shared_mutex( + fmt::format("LingerOp::watch_lock #{}", linger_id))) +{} + +void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) +{ + shunique_lock sul(rwlock, ceph::acquire_unique); + + ceph_tid_t tid = ++last_tid; + ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl; + c->tid = tid; + + { + unique_lock hs_wl(homeless_session->lock); + _session_command_op_assign(homeless_session, c); + } + + _calc_command_target(c, sul); + _assign_command_session(c, sul); + if (osd_timeout > timespan(0)) { + c->ontimeout = timer.add_event(osd_timeout, + [this, c, tid]() { + command_op_cancel( + c->session, tid, + osdc_errc::timed_out); }); + } + + if (!c->session->is_homeless()) { + _send_command(c); + } else { + _maybe_request_map(); + } + if (c->map_check_error) + _send_command_map_check(c); + if (ptid) + *ptid = tid; + + logger->inc(l_osdc_command_active); +} + +int Objecter::_calc_command_target(CommandOp *c, + shunique_lock<ceph::shared_mutex>& sul) +{ + ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock); + + c->map_check_error = 0; + + // ignore overlays, just like we do with pg ops + c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY; + + if (c->target_osd >= 0) { + if (!osdmap->exists(c->target_osd)) { + c->map_check_error = -ENOENT; + c->map_check_error_str = "osd dne"; + c->target.osd = -1; + return RECALC_OP_TARGET_OSD_DNE; + } + if (osdmap->is_down(c->target_osd)) { + c->map_check_error = -ENXIO; + c->map_check_error_str = "osd down"; + c->target.osd = -1; + return RECALC_OP_TARGET_OSD_DOWN; + } + c->target.osd = c->target_osd; + } else { + int ret = _calc_target(&(c->target), nullptr, true); + if (ret == RECALC_OP_TARGET_POOL_DNE) { + c->map_check_error = -ENOENT; + c->map_check_error_str = "pool dne"; + c->target.osd = -1; + return ret; + } else if (ret == RECALC_OP_TARGET_OSD_DOWN) { + c->map_check_error = -ENXIO; + c->map_check_error_str = "osd down"; + c->target.osd = -1; + return ret; + } + } + + OSDSession *s; + int r = _get_session(c->target.osd, &s, sul); + ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ + + if (c->session != s) { + put_session(s); + return RECALC_OP_TARGET_NEED_RESEND; + } + + put_session(s); + + ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " + << c->session << dendl; + + return RECALC_OP_TARGET_NO_ACTION; +} + +void Objecter::_assign_command_session(CommandOp *c, + shunique_lock<ceph::shared_mutex>& sul) +{ + ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock); + + OSDSession *s; + int r = _get_session(c->target.osd, &s, sul); + ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ + + if (c->session != s) { + if (c->session) { + OSDSession *cs = c->session; + unique_lock csl(cs->lock); + _session_command_op_remove(c->session, c); + csl.unlock(); + } + unique_lock sl(s->lock); + _session_command_op_assign(s, c); + } + + put_session(s); +} + +void Objecter::_send_command(CommandOp *c) +{ + ldout(cct, 10) << "_send_command " << c->tid << dendl; + ceph_assert(c->session); + ceph_assert(c->session->con); + auto m = new MCommand(monc->monmap.fsid); + m->cmd = c->cmd; + m->set_data(c->inbl); + m->set_tid(c->tid); + c->session->con->send_message(m); + logger->inc(l_osdc_command_send); +} + +int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, + bs::error_code ec) +{ + ceph_assert(initialized); + + unique_lock wl(rwlock); + + auto it = s->command_ops.find(tid); + if (it == s->command_ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + + CommandOp *op = it->second; + _command_cancel_map_check(op); + unique_lock sl(op->session->lock); + _finish_command(op, ec, {}, {}); + sl.unlock(); + return 0; +} + +void Objecter::_finish_command(CommandOp *c, bs::error_code ec, + string&& rs, cb::list&& bl) +{ + // rwlock is locked unique + // session lock is locked + + ldout(cct, 10) << "_finish_command " << c->tid << " = " << ec << " " + << rs << dendl; + + if (c->onfinish) + c->onfinish->defer(std::move(c->onfinish), ec, std::move(rs), std::move(bl)); + + if (c->ontimeout && ec != bs::errc::timed_out) + timer.cancel_event(c->ontimeout); + + _session_command_op_remove(c->session, c); + + c->put(); + + logger->dec(l_osdc_command_active); +} + +Objecter::OSDSession::~OSDSession() +{ + // Caller is responsible for re-assigning or + // destroying any ops that were assigned to us + ceph_assert(ops.empty()); + ceph_assert(linger_ops.empty()); + ceph_assert(command_ops.empty()); +} + +Objecter::Objecter(CephContext *cct, + Messenger *m, MonClient *mc, + boost::asio::io_context& service) : + Dispatcher(cct), messenger(m), monc(mc), service(service) +{ + mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout"); + osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout"); +} + +Objecter::~Objecter() +{ + ceph_assert(homeless_session->get_nref() == 1); + ceph_assert(num_homeless_ops == 0); + homeless_session->put(); + + ceph_assert(osd_sessions.empty()); + ceph_assert(poolstat_ops.empty()); + ceph_assert(statfs_ops.empty()); + ceph_assert(pool_ops.empty()); + ceph_assert(waiting_for_map.empty()); + ceph_assert(linger_ops.empty()); + ceph_assert(check_latest_map_lingers.empty()); + ceph_assert(check_latest_map_ops.empty()); + ceph_assert(check_latest_map_commands.empty()); + + ceph_assert(!m_request_state_hook); + ceph_assert(!logger); +} + +/** + * Wait until this OSD map epoch is received before + * sending any more operations to OSDs. Use this + * when it is known that the client can't trust + * anything from before this epoch (e.g. due to + * client blocklist at this epoch). + */ +void Objecter::set_epoch_barrier(epoch_t epoch) +{ + unique_lock wl(rwlock); + + ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was " + << epoch_barrier << ") current epoch " << osdmap->get_epoch() + << dendl; + if (epoch > epoch_barrier) { + epoch_barrier = epoch; + _maybe_request_map(); + } +} + + + +hobject_t Objecter::enumerate_objects_begin() +{ + return hobject_t(); +} + +hobject_t Objecter::enumerate_objects_end() +{ + return hobject_t::get_max(); +} + +template<typename T> +struct EnumerationContext { + Objecter* objecter; + const hobject_t end; + const cb::list filter; + uint32_t max; + const object_locator_t oloc; + std::vector<T> ls; +private: + fu2::unique_function<void(bs::error_code, + std::vector<T>, + hobject_t) &&> on_finish; +public: + epoch_t epoch = 0; + int budget = -1; + + EnumerationContext(Objecter* objecter, + hobject_t end, cb::list filter, + uint32_t max, object_locator_t oloc, + decltype(on_finish) on_finish) + : objecter(objecter), end(std::move(end)), filter(std::move(filter)), + max(max), oloc(std::move(oloc)), on_finish(std::move(on_finish)) {} + + void operator()(bs::error_code ec, + std::vector<T> v, + hobject_t h) && { + if (budget >= 0) { + objecter->put_op_budget_bytes(budget); + budget = -1; + } + + std::move(on_finish)(ec, std::move(v), std::move(h)); + } +}; + +template<typename T> +struct CB_EnumerateReply { + cb::list bl; + + Objecter* objecter; + std::unique_ptr<EnumerationContext<T>> ctx; + + CB_EnumerateReply(Objecter* objecter, + std::unique_ptr<EnumerationContext<T>>&& ctx) : + objecter(objecter), ctx(std::move(ctx)) {} + + void operator()(bs::error_code ec) { + objecter->_enumerate_reply(std::move(bl), ec, std::move(ctx)); + } +}; + +template<typename T> +void Objecter::enumerate_objects( + int64_t pool_id, + std::string_view ns, + hobject_t start, + hobject_t end, + const uint32_t max, + const cb::list& filter_bl, + fu2::unique_function<void(bs::error_code, + std::vector<T>, + hobject_t) &&> on_finish) { + if (!end.is_max() && start > end) { + lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl; + std::move(on_finish)(osdc_errc::precondition_violated, {}, {}); + return; + } + + if (max < 1) { + lderr(cct) << __func__ << ": result size may not be zero" << dendl; + std::move(on_finish)(osdc_errc::precondition_violated, {}, {}); + return; + } + + if (start.is_max()) { + std::move(on_finish)({}, {}, {}); + return; + } + + shared_lock rl(rwlock); + ceph_assert(osdmap->get_epoch()); + if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + rl.unlock(); + lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl; + std::move(on_finish)(osdc_errc::not_supported, {}, {}); + return; + } + const pg_pool_t* p = osdmap->get_pg_pool(pool_id); + if (!p) { + lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch " + << osdmap->get_epoch() << dendl; + rl.unlock(); + std::move(on_finish)(osdc_errc::pool_dne, {}, {}); + return; + } else { + rl.unlock(); + } + + _issue_enumerate(start, + std::make_unique<EnumerationContext<T>>( + this, std::move(end), filter_bl, + max, object_locator_t{pool_id, ns}, + std::move(on_finish))); +} + +template +void Objecter::enumerate_objects<librados::ListObjectImpl>( + int64_t pool_id, + std::string_view ns, + hobject_t start, + hobject_t end, + const uint32_t max, + const cb::list& filter_bl, + fu2::unique_function<void(bs::error_code, + std::vector<librados::ListObjectImpl>, + hobject_t) &&> on_finish); + +template +void Objecter::enumerate_objects<neorados::Entry>( + int64_t pool_id, + std::string_view ns, + hobject_t start, + hobject_t end, + const uint32_t max, + const cb::list& filter_bl, + fu2::unique_function<void(bs::error_code, + std::vector<neorados::Entry>, + hobject_t) &&> on_finish); + + + +template<typename T> +void Objecter::_issue_enumerate(hobject_t start, + std::unique_ptr<EnumerationContext<T>> ctx) { + ObjectOperation op; + auto c = ctx.get(); + op.pg_nls(c->max, c->filter, start, osdmap->get_epoch()); + auto on_ack = std::make_unique<CB_EnumerateReply<T>>(this, std::move(ctx)); + // I hate having to do this. Try to find a cleaner way + // later. + auto epoch = &c->epoch; + auto budget = &c->budget; + auto pbl = &on_ack->bl; + + // Issue. See you later in _enumerate_reply + pg_read(start.get_hash(), + c->oloc, op, pbl, 0, + Op::OpComp::create(service.get_executor(), + [c = std::move(on_ack)] + (bs::error_code ec) mutable { + (*c)(ec); + }), epoch, budget); +} + +template +void Objecter::_issue_enumerate<librados::ListObjectImpl>( + hobject_t start, + std::unique_ptr<EnumerationContext<librados::ListObjectImpl>> ctx); +template +void Objecter::_issue_enumerate<neorados::Entry>( + hobject_t start, std::unique_ptr<EnumerationContext<neorados::Entry>> ctx); + +template<typename T> +void Objecter::_enumerate_reply( + cb::list&& bl, + bs::error_code ec, + std::unique_ptr<EnumerationContext<T>>&& ctx) +{ + if (ec) { + std::move(*ctx)(ec, {}, {}); + return; + } + + // Decode the results + auto iter = bl.cbegin(); + pg_nls_response_template<T> response; + + try { + response.decode(iter); + if (!iter.end()) { + // extra_info isn't used anywhere. We do this solely to preserve + // backward compatibility + cb::list legacy_extra_info; + decode(legacy_extra_info, iter); + } + } catch (const bs::system_error& e) { + std::move(*ctx)(e.code(), {}, {}); + return; + } + + shared_lock rl(rwlock); + auto pool = osdmap->get_pg_pool(ctx->oloc.get_pool()); + rl.unlock(); + if (!pool) { + // pool is gone, drop any results which are now meaningless. + std::move(*ctx)(osdc_errc::pool_dne, {}, {}); + return; + } + + hobject_t next; + if ((response.handle <= ctx->end)) { + next = response.handle; + } else { + next = ctx->end; + + // drop anything after 'end' + while (!response.entries.empty()) { + uint32_t hash = response.entries.back().locator.empty() ? + pool->hash_key(response.entries.back().oid, + response.entries.back().nspace) : + pool->hash_key(response.entries.back().locator, + response.entries.back().nspace); + hobject_t last(response.entries.back().oid, + response.entries.back().locator, + CEPH_NOSNAP, + hash, + ctx->oloc.get_pool(), + response.entries.back().nspace); + if (last < ctx->end) + break; + response.entries.pop_back(); + } + } + + if (response.entries.size() <= ctx->max) { + ctx->max -= response.entries.size(); + std::move(response.entries.begin(), response.entries.end(), + std::back_inserter(ctx->ls)); + } else { + auto i = response.entries.begin(); + while (ctx->max > 0) { + ctx->ls.push_back(std::move(*i)); + --(ctx->max); + ++i; + } + uint32_t hash = + i->locator.empty() ? + pool->hash_key(i->oid, i->nspace) : + pool->hash_key(i->locator, i->nspace); + + next = hobject_t{i->oid, i->locator, + CEPH_NOSNAP, + hash, + ctx->oloc.get_pool(), + i->nspace}; + } + + if (next == ctx->end || ctx->max == 0) { + std::move(*ctx)(ec, std::move(ctx->ls), std::move(next)); + } else { + _issue_enumerate(next, std::move(ctx)); + } +} + +template +void Objecter::_enumerate_reply<librados::ListObjectImpl>( + cb::list&& bl, + bs::error_code ec, + std::unique_ptr<EnumerationContext<librados::ListObjectImpl>>&& ctx); + +template +void Objecter::_enumerate_reply<neorados::Entry>( + cb::list&& bl, + bs::error_code ec, + std::unique_ptr<EnumerationContext<neorados::Entry>>&& ctx); + +namespace { + using namespace librados; + + template <typename T> + void do_decode(std::vector<T>& items, std::vector<cb::list>& bls) + { + for (auto bl : bls) { + auto p = bl.cbegin(); + T t; + decode(t, p); + items.push_back(t); + } + } + + struct C_ObjectOperation_scrub_ls : public Context { + cb::list bl; + uint32_t* interval; + std::vector<inconsistent_obj_t> *objects = nullptr; + std::vector<inconsistent_snapset_t> *snapsets = nullptr; + int* rval; + + C_ObjectOperation_scrub_ls(uint32_t* interval, + std::vector<inconsistent_obj_t>* objects, + int* rval) + : interval(interval), objects(objects), rval(rval) {} + C_ObjectOperation_scrub_ls(uint32_t* interval, + std::vector<inconsistent_snapset_t>* snapsets, + int* rval) + : interval(interval), snapsets(snapsets), rval(rval) {} + void finish(int r) override { + if (r < 0 && r != -EAGAIN) { + if (rval) + *rval = r; + return; + } + + if (rval) + *rval = 0; + + try { + decode(); + } catch (cb::error&) { + if (rval) + *rval = -EIO; + } + } + private: + void decode() { + scrub_ls_result_t result; + auto p = bl.cbegin(); + result.decode(p); + *interval = result.interval; + if (objects) { + do_decode(*objects, result.vals); + } else { + do_decode(*snapsets, result.vals); + } + } + }; + + template <typename T> + void do_scrub_ls(::ObjectOperation* op, + const scrub_ls_arg_t& arg, + std::vector<T> *items, + uint32_t* interval, + int* rval) + { + OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS); + op->flags |= CEPH_OSD_FLAG_PGOP; + ceph_assert(interval); + arg.encode(osd_op.indata); + unsigned p = op->ops.size() - 1; + auto h = new C_ObjectOperation_scrub_ls{interval, items, rval}; + op->set_handler(h); + op->out_bl[p] = &h->bl; + op->out_rval[p] = rval; + } +} + +void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after, + uint64_t max_to_get, + std::vector<librados::inconsistent_obj_t>* objects, + uint32_t* interval, + int* rval) +{ + scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get}; + do_scrub_ls(this, arg, objects, interval, rval); +} + +void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after, + uint64_t max_to_get, + std::vector<librados::inconsistent_snapset_t> *snapsets, + uint32_t *interval, + int *rval) +{ + scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get}; + do_scrub_ls(this, arg, snapsets, interval, rval); +} |