summaryrefslogtreecommitdiffstats
path: root/src/crimson/osd/osd_operations/client_request.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/crimson/osd/osd_operations/client_request.cc')
-rw-r--r--src/crimson/osd/osd_operations/client_request.cc201
1 files changed, 201 insertions, 0 deletions
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
new file mode 100644
index 000000000..87b8fc788
--- /dev/null
+++ b/src/crimson/osd/osd_operations/client_request.cc
@@ -0,0 +1,201 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+
+#include "crimson/common/exception.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+ClientRequest::ClientRequest(
+ OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
+ : osd(osd), conn(conn), m(m)
+{}
+
+void ClientRequest::print(std::ostream &lhs) const
+{
+ lhs << *m;
+}
+
+void ClientRequest::dump_detail(Formatter *f) const
+{
+}
+
+ClientRequest::ConnectionPipeline &ClientRequest::cp()
+{
+ return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
+{
+ return pg.client_request_pg_pipeline;
+}
+
+bool ClientRequest::is_pg_op() const
+{
+ return std::any_of(
+ begin(m->ops), end(m->ops),
+ [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
+}
+
+seastar::future<> ClientRequest::start()
+{
+ logger().debug("{}: start", *this);
+
+ IRef opref = this;
+ return crimson::common::handle_system_shutdown(
+ [this, opref=std::move(opref)]() mutable {
+ return seastar::repeat([this, opref]() mutable {
+ return with_blocking_future(handle.enter(cp().await_map))
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+ }).then([this, opref](Ref<PG> pgref) {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return osd.send_incremental_map(conn, m->get_map_epoch());
+ }
+ return with_blocking_future(
+ handle.enter(pp(pg).await_map)
+ ).then([this, &pg]() mutable {
+ return with_blocking_future(
+ pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then([this, &pg](auto map) mutable {
+ return with_blocking_future(
+ handle.enter(pp(pg).wait_for_active));
+ }).then([this, &pg]() mutable {
+ return with_blocking_future(pg.wait_for_active_blocker.wait());
+ }).then([this, pgref=std::move(pgref)]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(pgref);
+ }
+ });
+ }).then([] {
+ return seastar::stop_iteration::yes;
+ }).handle_exception_type([](crimson::common::actingset_changed& e) {
+ if (e.is_primary()) {
+ logger().debug("operation restart, acting set changed");
+ return seastar::stop_iteration::no;
+ } else {
+ logger().debug("operation abort, up primary changed");
+ return seastar::stop_iteration::yes;
+ }
+ });
+ });
+ });
+}
+
+seastar::future<> ClientRequest::process_pg_op(
+ Ref<PG> &pg)
+{
+ return pg->do_pg_ops(m)
+ .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
+ return conn->send(reply);
+ });
+}
+
+seastar::future<> ClientRequest::process_op(
+ Ref<PG> &pgref)
+{
+ PG& pg = *pgref;
+ return with_blocking_future(
+ handle.enter(pp(pg).recover_missing)
+ ).then([this, &pg, pgref] {
+ eversion_t ver;
+ const hobject_t& soid = m->get_hobj();
+ logger().debug("{} check for recovery, {}", *this, soid);
+ if (pg.is_unreadable_object(soid, &ver) ||
+ pg.is_degraded_or_backfilling_object(soid)) {
+ logger().debug("{} need to wait for recovery, {}", *this, soid);
+ if (pg.get_recovery_backend()->is_recovering(soid)) {
+ return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered();
+ } else {
+ auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>(
+ soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch());
+ return std::move(fut);
+ }
+ }
+ return seastar::now();
+ }).then([this, &pg] {
+ return with_blocking_future(handle.enter(pp(pg).get_obc));
+ }).then([this, &pg]() -> PG::load_obc_ertr::future<> {
+ op_info.set_from_op(&*m, *pg.get_osdmap());
+ return pg.with_locked_obc(m, op_info, this, [this, &pg](auto obc) {
+ return with_blocking_future(
+ handle.enter(pp(pg).process)
+ ).then([this, &pg, obc] {
+ if (!pg.is_primary()) {
+ // primary can handle both normal ops and balanced reads
+ if (is_misdirected(pg)) {
+ logger().trace("process_op: dropping misdirected op");
+ return seastar::make_ready_future<Ref<MOSDOpReply>>();
+ } else if (const hobject_t& hoid = m->get_hobj();
+ !pg.get_peering_state().can_serve_replica_read(hoid)) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -EAGAIN, pg.get_osdmap_epoch(),
+ m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }
+ }
+ return pg.do_osd_ops(m, obc, op_info);
+ }).then([this](Ref<MOSDOpReply> reply) {
+ if (reply) {
+ return conn->send(std::move(reply));
+ } else {
+ return seastar::now();
+ }
+ });
+ });
+ }).safe_then([pgref=std::move(pgref)] {
+ return seastar::now();
+ }, PG::load_obc_ertr::all_same_way([](auto &code) {
+ logger().error("ClientRequest saw error code {}", code);
+ return seastar::now();
+ }));
+}
+
+bool ClientRequest::is_misdirected(const PG& pg) const
+{
+ // otherwise take a closer look
+ if (const int flags = m->get_flags();
+ flags & CEPH_OSD_FLAG_BALANCE_READS ||
+ flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
+ if (!op_info.may_read()) {
+ // no read found, so it can't be balanced read
+ return true;
+ }
+ if (op_info.may_write() || op_info.may_cache()) {
+ // write op, but i am not primary
+ return true;
+ }
+ // balanced reads; any replica will do
+ return pg.is_nonprimary();
+ }
+ // neither balanced nor localize reads
+ return true;
+}
+
+}