summaryrefslogtreecommitdiffstats
path: root/src/osdc/Objecter.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/osdc/Objecter.h
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osdc/Objecter.h')
-rw-r--r--src/osdc/Objecter.h3924
1 files changed, 3924 insertions, 0 deletions
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
new file mode 100644
index 000000000..d9d723dca
--- /dev/null
+++ b/src/osdc/Objecter.h
@@ -0,0 +1,3924 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OBJECTER_H
+#define CEPH_OBJECTER_H
+
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <string_view>
+#include <type_traits>
+#include <variant>
+
+#include <boost/container/small_vector.hpp>
+#include <boost/asio.hpp>
+
+#include <fmt/format.h>
+
+#include "include/buffer.h"
+#include "include/ceph_assert.h"
+#include "include/ceph_fs.h"
+#include "include/common_fwd.h"
+#include "include/expected.hpp"
+#include "include/types.h"
+#include "include/rados/rados_types.hpp"
+#include "include/function2.hpp"
+#include "include/neorados/RADOS_Decodable.hpp"
+
+#include "common/admin_socket.h"
+#include "common/async/completion.h"
+#include "common/ceph_time.h"
+#include "common/ceph_mutex.h"
+#include "common/ceph_timer.h"
+#include "common/config_obs.h"
+#include "common/shunique_lock.h"
+#include "common/zipkin_trace.h"
+#include "common/Throttle.h"
+
+#include "mon/MonClient.h"
+
+#include "messages/MOSDOp.h"
+#include "msg/Dispatcher.h"
+
+#include "osd/OSDMap.h"
+
+class Context;
+class Messenger;
+class MonClient;
+class Message;
+
+class MPoolOpReply;
+
+class MGetPoolStatsReply;
+class MStatfsReply;
+class MCommandReply;
+class MWatchNotify;
+struct ObjectOperation;
+template<typename T>
+struct EnumerationContext;
+template<typename t>
+struct CB_EnumerateReply;
+
+inline constexpr std::size_t osdc_opvec_len = 2;
+using osdc_opvec = boost::container::small_vector<OSDOp, osdc_opvec_len>;
+
+// -----------------------------------------
+
+struct ObjectOperation {
+ osdc_opvec ops;
+ int flags = 0;
+ int priority = 0;
+
+ boost::container::small_vector<ceph::buffer::list*, osdc_opvec_len> out_bl;
+ boost::container::small_vector<
+ fu2::unique_function<void(boost::system::error_code, int,
+ const ceph::buffer::list& bl) &&>,
+ osdc_opvec_len> out_handler;
+ boost::container::small_vector<int*, osdc_opvec_len> out_rval;
+ boost::container::small_vector<boost::system::error_code*,
+ osdc_opvec_len> out_ec;
+
+ ObjectOperation() = default;
+ ObjectOperation(const ObjectOperation&) = delete;
+ ObjectOperation& operator =(const ObjectOperation&) = delete;
+ ObjectOperation(ObjectOperation&&) = default;
+ ObjectOperation& operator =(ObjectOperation&&) = default;
+ ~ObjectOperation() = default;
+
+ size_t size() const {
+ return ops.size();
+ }
+
+ void clear() {
+ ops.clear();
+ flags = 0;
+ priority = 0;
+ out_bl.clear();
+ out_handler.clear();
+ out_rval.clear();
+ out_ec.clear();
+ }
+
+ void set_last_op_flags(int flags) {
+ ceph_assert(!ops.empty());
+ ops.rbegin()->op.flags = flags;
+ }
+
+
+ void set_handler(fu2::unique_function<void(boost::system::error_code, int,
+ const ceph::buffer::list&) &&> f) {
+ if (f) {
+ if (out_handler.back()) {
+ // This happens seldom enough that we may as well keep folding
+ // functions together when we get another one rather than
+ // using a container.
+ out_handler.back() =
+ [f = std::move(f),
+ g = std::move(std::move(out_handler.back()))]
+ (boost::system::error_code ec, int r,
+ const ceph::buffer::list& bl) mutable {
+ std::move(g)(ec, r, bl);
+ std::move(f)(ec, r, bl);
+ };
+ } else {
+ out_handler.back() = std::move(f);
+ }
+ }
+ ceph_assert(ops.size() == out_handler.size());
+ }
+
+ void set_handler(Context *c) {
+ if (c)
+ set_handler([c = std::unique_ptr<Context>(c)](boost::system::error_code,
+ int r,
+ const ceph::buffer::list&) mutable {
+ c.release()->complete(r);
+ });
+
+ }
+
+ OSDOp& add_op(int op) {
+ ops.emplace_back();
+ ops.back().op.op = op;
+ out_bl.push_back(nullptr);
+ ceph_assert(ops.size() == out_bl.size());
+ out_handler.emplace_back();
+ ceph_assert(ops.size() == out_handler.size());
+ out_rval.push_back(nullptr);
+ ceph_assert(ops.size() == out_rval.size());
+ out_ec.push_back(nullptr);
+ ceph_assert(ops.size() == out_ec.size());
+ return ops.back();
+ }
+ void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list& bl) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.extent.offset = off;
+ osd_op.op.extent.length = len;
+ osd_op.indata.claim_append(bl);
+ }
+ void add_writesame(int op, uint64_t off, uint64_t write_len,
+ ceph::buffer::list& bl) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.writesame.offset = off;
+ osd_op.op.writesame.length = write_len;
+ osd_op.op.writesame.data_length = bl.length();
+ osd_op.indata.claim_append(bl);
+ }
+ void add_xattr(int op, const char *name, const ceph::buffer::list& data) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
+ osd_op.op.xattr.value_len = data.length();
+ if (name)
+ osd_op.indata.append(name, osd_op.op.xattr.name_len);
+ osd_op.indata.append(data);
+ }
+ void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
+ uint8_t cmp_mode, const ceph::buffer::list& data) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
+ osd_op.op.xattr.value_len = data.length();
+ osd_op.op.xattr.cmp_op = cmp_op;
+ osd_op.op.xattr.cmp_mode = cmp_mode;
+ if (name)
+ osd_op.indata.append(name, osd_op.op.xattr.name_len);
+ osd_op.indata.append(data);
+ }
+ void add_xattr(int op, std::string_view name, const ceph::buffer::list& data) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.xattr.name_len = name.size();
+ osd_op.op.xattr.value_len = data.length();
+ osd_op.indata.append(name.data(), osd_op.op.xattr.name_len);
+ osd_op.indata.append(data);
+ }
+ void add_xattr_cmp(int op, std::string_view name, uint8_t cmp_op,
+ uint8_t cmp_mode, const ceph::buffer::list& data) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.xattr.name_len = name.size();
+ osd_op.op.xattr.value_len = data.length();
+ osd_op.op.xattr.cmp_op = cmp_op;
+ osd_op.op.xattr.cmp_mode = cmp_mode;
+ if (!name.empty())
+ osd_op.indata.append(name.data(), osd_op.op.xattr.name_len);
+ osd_op.indata.append(data);
+ }
+ void add_call(int op, std::string_view cname, std::string_view method,
+ const ceph::buffer::list &indata,
+ ceph::buffer::list *outbl, Context *ctx, int *prval) {
+ OSDOp& osd_op = add_op(op);
+
+ unsigned p = ops.size() - 1;
+ set_handler(ctx);
+ out_bl[p] = outbl;
+ out_rval[p] = prval;
+
+ osd_op.op.cls.class_len = cname.size();
+ osd_op.op.cls.method_len = method.size();
+ osd_op.op.cls.indata_len = indata.length();
+ osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
+ osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
+ osd_op.indata.append(indata);
+ }
+ void add_call(int op, std::string_view cname, std::string_view method,
+ const ceph::buffer::list &indata,
+ fu2::unique_function<void(boost::system::error_code,
+ const ceph::buffer::list&) &&> f) {
+ OSDOp& osd_op = add_op(op);
+
+ set_handler([f = std::move(f)](boost::system::error_code ec,
+ int,
+ const ceph::buffer::list& bl) mutable {
+ std::move(f)(ec, bl);
+ });
+
+ osd_op.op.cls.class_len = cname.size();
+ osd_op.op.cls.method_len = method.size();
+ osd_op.op.cls.indata_len = indata.length();
+ osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
+ osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
+ osd_op.indata.append(indata);
+ }
+ void add_call(int op, std::string_view cname, std::string_view method,
+ const ceph::buffer::list &indata,
+ fu2::unique_function<void(boost::system::error_code, int,
+ const ceph::buffer::list&) &&> f) {
+ OSDOp& osd_op = add_op(op);
+
+ set_handler([f = std::move(f)](boost::system::error_code ec,
+ int r,
+ const ceph::buffer::list& bl) mutable {
+ std::move(f)(ec, r, bl);
+ });
+
+ osd_op.op.cls.class_len = cname.size();
+ osd_op.op.cls.method_len = method.size();
+ osd_op.op.cls.indata_len = indata.length();
+ osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
+ osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
+ osd_op.indata.append(indata);
+ }
+ void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
+ epoch_t start_epoch) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.pgls.count = count;
+ osd_op.op.pgls.start_epoch = start_epoch;
+ encode(cookie, osd_op.indata);
+ }
+ void add_pgls_filter(int op, uint64_t count, const ceph::buffer::list& filter,
+ collection_list_handle_t cookie, epoch_t start_epoch) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.pgls.count = count;
+ osd_op.op.pgls.start_epoch = start_epoch;
+ std::string cname = "pg";
+ std::string mname = "filter";
+ encode(cname, osd_op.indata);
+ encode(mname, osd_op.indata);
+ osd_op.indata.append(filter);
+ encode(cookie, osd_op.indata);
+ }
+ void add_alloc_hint(int op, uint64_t expected_object_size,
+ uint64_t expected_write_size,
+ uint32_t flags) {
+ OSDOp& osd_op = add_op(op);
+ osd_op.op.alloc_hint.expected_object_size = expected_object_size;
+ osd_op.op.alloc_hint.expected_write_size = expected_write_size;
+ osd_op.op.alloc_hint.flags = flags;
+ }
+
+ // ------
+
+ // pg
+ void pg_ls(uint64_t count, ceph::buffer::list& filter,
+ collection_list_handle_t cookie, epoch_t start_epoch) {
+ if (filter.length() == 0)
+ add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
+ else
+ add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
+ start_epoch);
+ flags |= CEPH_OSD_FLAG_PGOP;
+ }
+
+ void pg_nls(uint64_t count, const ceph::buffer::list& filter,
+ collection_list_handle_t cookie, epoch_t start_epoch) {
+ if (filter.length() == 0)
+ add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
+ else
+ add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
+ start_epoch);
+ flags |= CEPH_OSD_FLAG_PGOP;
+ }
+
+ void scrub_ls(const librados::object_id_t& start_after,
+ uint64_t max_to_get,
+ std::vector<librados::inconsistent_obj_t> *objects,
+ uint32_t *interval,
+ int *rval);
+ void scrub_ls(const librados::object_id_t& start_after,
+ uint64_t max_to_get,
+ std::vector<librados::inconsistent_snapset_t> *objects,
+ uint32_t *interval,
+ int *rval);
+
+ void create(bool excl) {
+ OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
+ o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
+ }
+
+ struct CB_ObjectOperation_stat {
+ ceph::buffer::list bl;
+ uint64_t *psize;
+ ceph::real_time *pmtime;
+ time_t *ptime;
+ struct timespec *pts;
+ int *prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
+ int *prval, boost::system::error_code* pec)
+ : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval), pec(pec) {}
+ void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
+ using ceph::decode;
+ if (r >= 0) {
+ auto p = bl.cbegin();
+ try {
+ uint64_t size;
+ ceph::real_time mtime;
+ decode(size, p);
+ decode(mtime, p);
+ if (psize)
+ *psize = size;
+ if (pmtime)
+ *pmtime = mtime;
+ if (ptime)
+ *ptime = ceph::real_clock::to_time_t(mtime);
+ if (pts)
+ *pts = ceph::real_clock::to_timespec(mtime);
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ }
+ }
+ };
+ void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr, prval,
+ nullptr));
+ out_rval.back() = prval;
+ }
+ void stat(uint64_t *psize, ceph::real_time *pmtime,
+ boost::system::error_code* ec) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr,
+ nullptr, ec));
+ out_ec.back() = ec;
+ }
+ void stat(uint64_t *psize, time_t *ptime, int *prval) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, nullptr, ptime, nullptr, prval,
+ nullptr));
+ out_rval.back() = prval;
+ }
+ void stat(uint64_t *psize, struct timespec *pts, int *prval) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, pts, prval, nullptr));
+ out_rval.back() = prval;
+ }
+ void stat(uint64_t *psize, ceph::real_time *pmtime, std::nullptr_t) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr, nullptr,
+ nullptr));
+ }
+ void stat(uint64_t *psize, time_t *ptime, std::nullptr_t) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, nullptr, ptime, nullptr, nullptr,
+ nullptr));
+ }
+ void stat(uint64_t *psize, struct timespec *pts, std::nullptr_t) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, pts, nullptr,
+ nullptr));
+ }
+ void stat(uint64_t *psize, std::nullptr_t, std::nullptr_t) {
+ add_op(CEPH_OSD_OP_STAT);
+ set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, nullptr,
+ nullptr, nullptr));
+ }
+
+ // object cmpext
+ struct CB_ObjectOperation_cmpext {
+ int* prval = nullptr;
+ boost::system::error_code* ec = nullptr;
+ std::size_t* s = nullptr;
+ explicit CB_ObjectOperation_cmpext(int *prval)
+ : prval(prval) {}
+ CB_ObjectOperation_cmpext(boost::system::error_code* ec, std::size_t* s)
+ : ec(ec), s(s) {}
+
+ void operator()(boost::system::error_code ec, int r, const ceph::buffer::list&) {
+ if (prval)
+ *prval = r;
+ if (this->ec)
+ *this->ec = ec;
+ if (s)
+ *s = static_cast<std::size_t>(-(MAX_ERRNO - r));
+ }
+ };
+
+ void cmpext(uint64_t off, ceph::buffer::list& cmp_bl, int *prval) {
+ add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
+ set_handler(CB_ObjectOperation_cmpext(prval));
+ out_rval.back() = prval;
+ }
+
+ void cmpext(uint64_t off, ceph::buffer::list&& cmp_bl, boost::system::error_code* ec,
+ std::size_t* s) {
+ add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
+ set_handler(CB_ObjectOperation_cmpext(ec, s));
+ out_ec.back() = ec;
+ }
+
+ // Used by C API
+ void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
+ ceph::buffer::list cmp_bl;
+ cmp_bl.append(cmp_buf, cmp_len);
+ add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
+ set_handler(CB_ObjectOperation_cmpext(prval));
+ out_rval.back() = prval;
+ }
+
+ void read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, int *prval,
+ Context* ctx) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_READ, off, len, bl);
+ unsigned p = ops.size() - 1;
+ out_bl[p] = pbl;
+ out_rval[p] = prval;
+ set_handler(ctx);
+ }
+
+ void read(uint64_t off, uint64_t len, boost::system::error_code* ec,
+ ceph::buffer::list* pbl) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_READ, off, len, bl);
+ out_ec.back() = ec;
+ out_bl.back() = pbl;
+ }
+
+ template<typename Ex>
+ struct CB_ObjectOperation_sparse_read {
+ ceph::buffer::list* data_bl;
+ Ex* extents;
+ int* prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_sparse_read(ceph::buffer::list* data_bl,
+ Ex* extents,
+ int* prval,
+ boost::system::error_code* pec)
+ : data_bl(data_bl), extents(extents), prval(prval), pec(pec) {}
+ void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
+ auto iter = bl.cbegin();
+ if (r >= 0) {
+ // NOTE: it's possible the sub-op has not been executed but the result
+ // code remains zeroed. Avoid the costly exception handling on a
+ // potential IO path.
+ if (bl.length() > 0) {
+ try {
+ decode(*extents, iter);
+ decode(*data_bl, iter);
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ } else if (prval) {
+ *prval = -EIO;
+ if (pec)
+ *pec = buffer::errc::end_of_buffer;
+ }
+ }
+ }
+ };
+ void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t, uint64_t>* m,
+ ceph::buffer::list* data_bl, int* prval,
+ uint64_t truncate_size = 0, uint32_t truncate_seq = 0) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
+ OSDOp& o = *ops.rbegin();
+ o.op.extent.truncate_size = truncate_size;
+ o.op.extent.truncate_seq = truncate_seq;
+ set_handler(CB_ObjectOperation_sparse_read(data_bl, m, prval, nullptr));
+ out_rval.back() = prval;
+ }
+ void sparse_read(uint64_t off, uint64_t len,
+ boost::system::error_code* ec,
+ std::vector<std::pair<uint64_t, uint64_t>>* m,
+ ceph::buffer::list* data_bl) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
+ set_handler(CB_ObjectOperation_sparse_read(data_bl, m, nullptr, ec));
+ out_ec.back() = ec;
+ }
+ void write(uint64_t off, ceph::buffer::list& bl,
+ uint64_t truncate_size,
+ uint32_t truncate_seq) {
+ add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
+ OSDOp& o = *ops.rbegin();
+ o.op.extent.truncate_size = truncate_size;
+ o.op.extent.truncate_seq = truncate_seq;
+ }
+ void write(uint64_t off, ceph::buffer::list& bl) {
+ write(off, bl, 0, 0);
+ }
+ void write_full(ceph::buffer::list& bl) {
+ add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
+ }
+ void writesame(uint64_t off, uint64_t write_len, ceph::buffer::list& bl) {
+ add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
+ }
+ void append(ceph::buffer::list& bl) {
+ add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
+ }
+ void zero(uint64_t off, uint64_t len) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_ZERO, off, len, bl);
+ }
+ void truncate(uint64_t off) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
+ }
+ void remove() {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
+ }
+ void mapext(uint64_t off, uint64_t len) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
+ }
+ void sparse_read(uint64_t off, uint64_t len) {
+ ceph::buffer::list bl;
+ add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
+ }
+
+ void checksum(uint8_t type, const ceph::buffer::list &init_value_bl,
+ uint64_t off, uint64_t len, size_t chunk_size,
+ ceph::buffer::list *pbl, int *prval, Context *ctx) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
+ osd_op.op.checksum.offset = off;
+ osd_op.op.checksum.length = len;
+ osd_op.op.checksum.type = type;
+ osd_op.op.checksum.chunk_size = chunk_size;
+ osd_op.indata.append(init_value_bl);
+
+ unsigned p = ops.size() - 1;
+ out_bl[p] = pbl;
+ out_rval[p] = prval;
+ set_handler(ctx);
+ }
+
+ // object attrs
+ void getxattr(const char *name, ceph::buffer::list *pbl, int *prval) {
+ ceph::buffer::list bl;
+ add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
+ unsigned p = ops.size() - 1;
+ out_bl[p] = pbl;
+ out_rval[p] = prval;
+ }
+ void getxattr(std::string_view name, boost::system::error_code* ec,
+ buffer::list *pbl) {
+ ceph::buffer::list bl;
+ add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
+ out_bl.back() = pbl;
+ out_ec.back() = ec;
+ }
+
+ template<typename Vals>
+ struct CB_ObjectOperation_decodevals {
+ uint64_t max_entries;
+ Vals* pattrs;
+ bool* ptruncated;
+ int* prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_decodevals(uint64_t m, Vals* pa,
+ bool *pt, int *pr,
+ boost::system::error_code* pec)
+ : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr), pec(pec) {
+ if (ptruncated) {
+ *ptruncated = false;
+ }
+ }
+ void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
+ if (r >= 0) {
+ auto p = bl.cbegin();
+ try {
+ if (pattrs)
+ decode(*pattrs, p);
+ if (ptruncated) {
+ Vals ignore;
+ if (!pattrs) {
+ decode(ignore, p);
+ pattrs = &ignore;
+ }
+ if (!p.end()) {
+ decode(*ptruncated, p);
+ } else {
+ // The OSD did not provide this. Since old OSDs do not
+ // enfoce omap result limits either, we can infer it from
+ // the size of the result
+ *ptruncated = (pattrs->size() == max_entries);
+ }
+ }
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ }
+ }
+ };
+ template<typename Keys>
+ struct CB_ObjectOperation_decodekeys {
+ uint64_t max_entries;
+ Keys* pattrs;
+ bool *ptruncated;
+ int *prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_decodekeys(uint64_t m, Keys* pa, bool *pt,
+ int *pr, boost::system::error_code* pec)
+ : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr), pec(pec) {
+ if (ptruncated) {
+ *ptruncated = false;
+ }
+ }
+ void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
+ if (r >= 0) {
+ using ceph::decode;
+ auto p = bl.cbegin();
+ try {
+ if (pattrs)
+ decode(*pattrs, p);
+ if (ptruncated) {
+ Keys ignore;
+ if (!pattrs) {
+ decode(ignore, p);
+ pattrs = &ignore;
+ }
+ if (!p.end()) {
+ decode(*ptruncated, p);
+ } else {
+ // the OSD did not provide this. since old OSDs do not
+ // enforce omap result limits either, we can infer it from
+ // the size of the result
+ *ptruncated = (pattrs->size() == max_entries);
+ }
+ }
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ }
+ }
+ };
+ struct CB_ObjectOperation_decodewatchers {
+ std::list<obj_watch_t>* pwatchers;
+ int* prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_decodewatchers(std::list<obj_watch_t>* pw, int* pr,
+ boost::system::error_code* pec)
+ : pwatchers(pw), prval(pr), pec(pec) {}
+ void operator()(boost::system::error_code ec, int r,
+ const ceph::buffer::list& bl) {
+ if (r >= 0) {
+ auto p = bl.cbegin();
+ try {
+ obj_list_watch_response_t resp;
+ decode(resp, p);
+ if (pwatchers) {
+ for (const auto& watch_item : resp.entries) {
+ obj_watch_t ow;
+ std::string sa = watch_item.addr.get_legacy_str();
+ strncpy(ow.addr, sa.c_str(), sizeof(ow.addr) - 1);
+ ow.addr[sizeof(ow.addr) - 1] = '\0';
+ ow.watcher_id = watch_item.name.num();
+ ow.cookie = watch_item.cookie;
+ ow.timeout_seconds = watch_item.timeout_seconds;
+ pwatchers->push_back(std::move(ow));
+ }
+ }
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ }
+ }
+ };
+
+ struct CB_ObjectOperation_decodewatchersneo {
+ std::vector<neorados::ObjWatcher>* pwatchers;
+ int* prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_decodewatchersneo(std::vector<neorados::ObjWatcher>* pw,
+ int* pr,
+ boost::system::error_code* pec)
+ : pwatchers(pw), prval(pr), pec(pec) {}
+ void operator()(boost::system::error_code ec, int r,
+ const ceph::buffer::list& bl) {
+ if (r >= 0) {
+ auto p = bl.cbegin();
+ try {
+ obj_list_watch_response_t resp;
+ decode(resp, p);
+ if (pwatchers) {
+ for (const auto& watch_item : resp.entries) {
+ neorados::ObjWatcher ow;
+ ow.addr = watch_item.addr.get_legacy_str();
+ ow.watcher_id = watch_item.name.num();
+ ow.cookie = watch_item.cookie;
+ ow.timeout_seconds = watch_item.timeout_seconds;
+ pwatchers->push_back(std::move(ow));
+ }
+ }
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ }
+ }
+ };
+
+
+ struct CB_ObjectOperation_decodesnaps {
+ librados::snap_set_t *psnaps;
+ neorados::SnapSet *neosnaps;
+ int *prval;
+ boost::system::error_code* pec;
+ CB_ObjectOperation_decodesnaps(librados::snap_set_t* ps,
+ neorados::SnapSet* ns, int* pr,
+ boost::system::error_code* pec)
+ : psnaps(ps), neosnaps(ns), prval(pr), pec(pec) {}
+ void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
+ if (r >= 0) {
+ using ceph::decode;
+ auto p = bl.cbegin();
+ try {
+ obj_list_snap_response_t resp;
+ decode(resp, p);
+ if (psnaps) {
+ psnaps->clones.clear();
+ for (auto ci = resp.clones.begin();
+ ci != resp.clones.end();
+ ++ci) {
+ librados::clone_info_t clone;
+
+ clone.cloneid = ci->cloneid;
+ clone.snaps.reserve(ci->snaps.size());
+ clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
+ ci->snaps.end());
+ clone.overlap = ci->overlap;
+ clone.size = ci->size;
+
+ psnaps->clones.push_back(clone);
+ }
+ psnaps->seq = resp.seq;
+ }
+
+ if (neosnaps) {
+ neosnaps->clones.clear();
+ for (auto&& c : resp.clones) {
+ neorados::CloneInfo clone;
+
+ clone.cloneid = std::move(c.cloneid);
+ clone.snaps.reserve(c.snaps.size());
+ std::move(c.snaps.begin(), c.snaps.end(),
+ std::back_inserter(clone.snaps));
+ clone.overlap = c.overlap;
+ clone.size = c.size;
+ neosnaps->clones.push_back(std::move(clone));
+ }
+ neosnaps->seq = resp.seq;
+ }
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ if (pec)
+ *pec = e.code();
+ }
+ }
+ }
+ };
+ void getxattrs(std::map<std::string,ceph::buffer::list> *pattrs, int *prval) {
+ add_op(CEPH_OSD_OP_GETXATTRS);
+ if (pattrs || prval) {
+ set_handler(CB_ObjectOperation_decodevals(0, pattrs, nullptr, prval,
+ nullptr));
+ out_rval.back() = prval;
+ }
+ }
+ void getxattrs(boost::system::error_code* ec,
+ boost::container::flat_map<std::string, ceph::buffer::list> *pattrs) {
+ add_op(CEPH_OSD_OP_GETXATTRS);
+ set_handler(CB_ObjectOperation_decodevals(0, pattrs, nullptr, nullptr, ec));
+ out_ec.back() = ec;
+ }
+ void setxattr(const char *name, const ceph::buffer::list& bl) {
+ add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
+ }
+ void setxattr(std::string_view name, const ceph::buffer::list& bl) {
+ add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
+ }
+ void setxattr(const char *name, const std::string& s) {
+ ceph::buffer::list bl;
+ bl.append(s);
+ add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
+ }
+ void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
+ const ceph::buffer::list& bl) {
+ add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
+ }
+ void cmpxattr(std::string_view name, uint8_t cmp_op, uint8_t cmp_mode,
+ const ceph::buffer::list& bl) {
+ add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
+ }
+ void rmxattr(const char *name) {
+ ceph::buffer::list bl;
+ add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
+ }
+ void rmxattr(std::string_view name) {
+ ceph::buffer::list bl;
+ add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
+ }
+ void setxattrs(std::map<std::string, ceph::buffer::list>& attrs) {
+ using ceph::encode;
+ ceph::buffer::list bl;
+ encode(attrs, bl);
+ add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
+ }
+ void resetxattrs(const char *prefix, std::map<std::string, ceph::buffer::list>& attrs) {
+ using ceph::encode;
+ ceph::buffer::list bl;
+ encode(attrs, bl);
+ add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
+ }
+
+ // trivialmap
+ void tmap_update(ceph::buffer::list& bl) {
+ add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
+ }
+
+ // objectmap
+ void omap_get_keys(const std::string &start_after,
+ uint64_t max_to_get,
+ std::set<std::string> *out_set,
+ bool *ptruncated,
+ int *prval) {
+ using ceph::encode;
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
+ ceph::buffer::list bl;
+ encode(start_after, bl);
+ encode(max_to_get, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ if (prval || ptruncated || out_set) {
+ set_handler(CB_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval,
+ nullptr));
+ out_rval.back() = prval;
+ }
+ }
+ void omap_get_keys(std::optional<std::string_view> start_after,
+ uint64_t max_to_get,
+ boost::system::error_code* ec,
+ boost::container::flat_set<std::string> *out_set,
+ bool *ptruncated) {
+ OSDOp& op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
+ ceph::buffer::list bl;
+ encode(start_after ? *start_after : std::string_view{}, bl);
+ encode(max_to_get, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ set_handler(
+ CB_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, nullptr,
+ ec));
+ out_ec.back() = ec;
+ }
+
+ void omap_get_vals(const std::string &start_after,
+ const std::string &filter_prefix,
+ uint64_t max_to_get,
+ std::map<std::string, ceph::buffer::list> *out_set,
+ bool *ptruncated,
+ int *prval) {
+ using ceph::encode;
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
+ ceph::buffer::list bl;
+ encode(start_after, bl);
+ encode(max_to_get, bl);
+ encode(filter_prefix, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ if (prval || out_set || ptruncated) {
+ set_handler(CB_ObjectOperation_decodevals(max_to_get, out_set, ptruncated,
+ prval, nullptr));
+ out_rval.back() = prval;
+ }
+ }
+
+ void omap_get_vals(std::optional<std::string_view> start_after,
+ std::optional<std::string_view> filter_prefix,
+ uint64_t max_to_get,
+ boost::system::error_code* ec,
+ boost::container::flat_map<std::string, ceph::buffer::list> *out_set,
+ bool *ptruncated) {
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
+ ceph::buffer::list bl;
+ encode(start_after ? *start_after : std::string_view{}, bl);
+ encode(max_to_get, bl);
+ encode(filter_prefix ? *start_after : std::string_view{}, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ set_handler(CB_ObjectOperation_decodevals(max_to_get, out_set, ptruncated,
+ nullptr, ec));
+ out_ec.back() = ec;
+ }
+
+ void omap_get_vals_by_keys(const std::set<std::string> &to_get,
+ std::map<std::string, ceph::buffer::list> *out_set,
+ int *prval) {
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
+ ceph::buffer::list bl;
+ encode(to_get, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ if (prval || out_set) {
+ set_handler(CB_ObjectOperation_decodevals(0, out_set, nullptr, prval,
+ nullptr));
+ out_rval.back() = prval;
+ }
+ }
+
+ void omap_get_vals_by_keys(
+ const boost::container::flat_set<std::string>& to_get,
+ boost::system::error_code* ec,
+ boost::container::flat_map<std::string, ceph::buffer::list> *out_set) {
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
+ ceph::buffer::list bl;
+ encode(to_get, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ set_handler(CB_ObjectOperation_decodevals(0, out_set, nullptr, nullptr,
+ ec));
+ out_ec.back() = ec;
+ }
+
+ void omap_cmp(const std::map<std::string, std::pair<ceph::buffer::list,int> > &assertions,
+ int *prval) {
+ using ceph::encode;
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
+ ceph::buffer::list bl;
+ encode(assertions, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ if (prval) {
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ }
+ }
+
+ void omap_cmp(const boost::container::flat_map<
+ std::string, std::pair<ceph::buffer::list, int>>& assertions,
+ boost::system::error_code *ec) {
+ OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
+ ceph::buffer::list bl;
+ encode(assertions, bl);
+ op.op.extent.offset = 0;
+ op.op.extent.length = bl.length();
+ op.indata.claim_append(bl);
+ out_ec.back() = ec;
+ }
+
+ struct C_ObjectOperation_copyget : public Context {
+ ceph::buffer::list bl;
+ object_copy_cursor_t *cursor;
+ uint64_t *out_size;
+ ceph::real_time *out_mtime;
+ std::map<std::string,ceph::buffer::list,std::less<>> *out_attrs;
+ ceph::buffer::list *out_data, *out_omap_header, *out_omap_data;
+ std::vector<snapid_t> *out_snaps;
+ snapid_t *out_snap_seq;
+ uint32_t *out_flags;
+ uint32_t *out_data_digest;
+ uint32_t *out_omap_digest;
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids;
+ mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
+ uint64_t *out_truncate_seq;
+ uint64_t *out_truncate_size;
+ int *prval;
+ C_ObjectOperation_copyget(object_copy_cursor_t *c,
+ uint64_t *s,
+ ceph::real_time *m,
+ std::map<std::string,ceph::buffer::list,std::less<>> *a,
+ ceph::buffer::list *d, ceph::buffer::list *oh,
+ ceph::buffer::list *o,
+ std::vector<snapid_t> *osnaps,
+ snapid_t *osnap_seq,
+ uint32_t *flags,
+ uint32_t *dd,
+ uint32_t *od,
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *oreqids,
+ mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
+ uint64_t *otseq,
+ uint64_t *otsize,
+ int *r)
+ : cursor(c),
+ out_size(s), out_mtime(m),
+ out_attrs(a), out_data(d), out_omap_header(oh),
+ out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
+ out_flags(flags), out_data_digest(dd), out_omap_digest(od),
+ out_reqids(oreqids),
+ out_reqid_return_codes(oreqid_return_codes),
+ out_truncate_seq(otseq),
+ out_truncate_size(otsize),
+ prval(r) {}
+ void finish(int r) override {
+ using ceph::decode;
+ // reqids are copied on ENOENT
+ if (r < 0 && r != -ENOENT)
+ return;
+ try {
+ auto p = bl.cbegin();
+ object_copy_data_t copy_reply;
+ decode(copy_reply, p);
+ if (r == -ENOENT) {
+ if (out_reqids)
+ *out_reqids = copy_reply.reqids;
+ return;
+ }
+ if (out_size)
+ *out_size = copy_reply.size;
+ if (out_mtime)
+ *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
+ if (out_attrs)
+ *out_attrs = copy_reply.attrs;
+ if (out_data)
+ out_data->claim_append(copy_reply.data);
+ if (out_omap_header)
+ out_omap_header->claim_append(copy_reply.omap_header);
+ if (out_omap_data)
+ *out_omap_data = copy_reply.omap_data;
+ if (out_snaps)
+ *out_snaps = copy_reply.snaps;
+ if (out_snap_seq)
+ *out_snap_seq = copy_reply.snap_seq;
+ if (out_flags)
+ *out_flags = copy_reply.flags;
+ if (out_data_digest)
+ *out_data_digest = copy_reply.data_digest;
+ if (out_omap_digest)
+ *out_omap_digest = copy_reply.omap_digest;
+ if (out_reqids)
+ *out_reqids = copy_reply.reqids;
+ if (out_reqid_return_codes)
+ *out_reqid_return_codes = copy_reply.reqid_return_codes;
+ if (out_truncate_seq)
+ *out_truncate_seq = copy_reply.truncate_seq;
+ if (out_truncate_size)
+ *out_truncate_size = copy_reply.truncate_size;
+ *cursor = copy_reply.cursor;
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ }
+ }
+ };
+
+ void copy_get(object_copy_cursor_t *cursor,
+ uint64_t max,
+ uint64_t *out_size,
+ ceph::real_time *out_mtime,
+ std::map<std::string,ceph::buffer::list,std::less<>> *out_attrs,
+ ceph::buffer::list *out_data,
+ ceph::buffer::list *out_omap_header,
+ ceph::buffer::list *out_omap_data,
+ std::vector<snapid_t> *out_snaps,
+ snapid_t *out_snap_seq,
+ uint32_t *out_flags,
+ uint32_t *out_data_digest,
+ uint32_t *out_omap_digest,
+ mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids,
+ mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
+ uint64_t *truncate_seq,
+ uint64_t *truncate_size,
+ int *prval) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
+ osd_op.op.copy_get.max = max;
+ encode(*cursor, osd_op.indata);
+ encode(max, osd_op.indata);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_copyget *h =
+ new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
+ out_attrs, out_data, out_omap_header,
+ out_omap_data, out_snaps, out_snap_seq,
+ out_flags, out_data_digest,
+ out_omap_digest, out_reqids,
+ out_reqid_return_codes, truncate_seq,
+ truncate_size, prval);
+ out_bl[p] = &h->bl;
+ set_handler(h);
+ }
+
+ void undirty() {
+ add_op(CEPH_OSD_OP_UNDIRTY);
+ }
+
+ struct C_ObjectOperation_isdirty : public Context {
+ ceph::buffer::list bl;
+ bool *pisdirty;
+ int *prval;
+ C_ObjectOperation_isdirty(bool *p, int *r)
+ : pisdirty(p), prval(r) {}
+ void finish(int r) override {
+ using ceph::decode;
+ if (r < 0)
+ return;
+ try {
+ auto p = bl.cbegin();
+ bool isdirty;
+ decode(isdirty, p);
+ if (pisdirty)
+ *pisdirty = isdirty;
+ } catch (const ceph::buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ }
+ }
+ };
+
+ void is_dirty(bool *pisdirty, int *prval) {
+ add_op(CEPH_OSD_OP_ISDIRTY);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_isdirty *h =
+ new C_ObjectOperation_isdirty(pisdirty, prval);
+ out_bl[p] = &h->bl;
+ set_handler(h);
+ }
+
+ struct C_ObjectOperation_hit_set_ls : public Context {
+ ceph::buffer::list bl;
+ std::list< std::pair<time_t, time_t> > *ptls;
+ std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
+ int *prval;
+ C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
+ std::list< std::pair<ceph::real_time,
+ ceph::real_time> > *ut,
+ int *r)
+ : ptls(t), putls(ut), prval(r) {}
+ void finish(int r) override {
+ using ceph::decode;
+ if (r < 0)
+ return;
+ try {
+ auto p = bl.cbegin();
+ std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
+ decode(ls, p);
+ if (ptls) {
+ ptls->clear();
+ for (auto p = ls.begin(); p != ls.end(); ++p)
+ // round initial timestamp up to the next full second to
+ // keep this a valid interval.
+ ptls->push_back(
+ std::make_pair(ceph::real_clock::to_time_t(
+ ceph::ceil(p->first,
+ // Sadly, no time literals until C++14.
+ std::chrono::seconds(1))),
+ ceph::real_clock::to_time_t(p->second)));
+ }
+ if (putls)
+ putls->swap(ls);
+ } catch (const ceph::buffer::error& e) {
+ r = -EIO;
+ }
+ if (prval)
+ *prval = r;
+ }
+ };
+
+ /**
+ * std::list available HitSets.
+ *
+ * We will get back a std::list of time intervals. Note that the most
+ * recent range may have an empty end timestamp if it is still
+ * accumulating.
+ *
+ * @param pls [out] std::list of time intervals
+ * @param prval [out] return value
+ */
+ void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
+ add_op(CEPH_OSD_OP_PG_HITSET_LS);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_hit_set_ls *h =
+ new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
+ out_bl[p] = &h->bl;
+ set_handler(h);
+ }
+ void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
+ int *prval) {
+ add_op(CEPH_OSD_OP_PG_HITSET_LS);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_hit_set_ls *h =
+ new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
+ out_bl[p] = &h->bl;
+ set_handler(h);
+ }
+
+ /**
+ * get HitSet
+ *
+ * Return an encoded HitSet that includes the provided time
+ * interval.
+ *
+ * @param stamp [in] timestamp
+ * @param pbl [out] target buffer for encoded HitSet
+ * @param prval [out] return value
+ */
+ void hit_set_get(ceph::real_time stamp, ceph::buffer::list *pbl, int *prval) {
+ OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
+ op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ out_bl[p] = pbl;
+ }
+
+ void omap_get_header(ceph::buffer::list *bl, int *prval) {
+ add_op(CEPH_OSD_OP_OMAPGETHEADER);
+ unsigned p = ops.size() - 1;
+ out_bl[p] = bl;
+ out_rval[p] = prval;
+ }
+
+ void omap_get_header(boost::system::error_code* ec, ceph::buffer::list *bl) {
+ add_op(CEPH_OSD_OP_OMAPGETHEADER);
+ out_bl.back() = bl;
+ out_ec.back() = ec;
+ }
+
+ void omap_set(const std::map<std::string, ceph::buffer::list> &map) {
+ ceph::buffer::list bl;
+ encode(map, bl);
+ add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
+ }
+
+ void omap_set(const boost::container::flat_map<std::string, ceph::buffer::list>& map) {
+ ceph::buffer::list bl;
+ encode(map, bl);
+ add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
+ }
+
+ void omap_set_header(ceph::buffer::list &bl) {
+ add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
+ }
+
+ void omap_clear() {
+ add_op(CEPH_OSD_OP_OMAPCLEAR);
+ }
+
+ void omap_rm_keys(const std::set<std::string> &to_remove) {
+ using ceph::encode;
+ ceph::buffer::list bl;
+ encode(to_remove, bl);
+ add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
+ }
+ void omap_rm_keys(const boost::container::flat_set<std::string>& to_remove) {
+ ceph::buffer::list bl;
+ encode(to_remove, bl);
+ add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
+ }
+
+ void omap_rm_range(std::string_view key_begin, std::string_view key_end) {
+ ceph::buffer::list bl;
+ using ceph::encode;
+ encode(key_begin, bl);
+ encode(key_end, bl);
+ add_data(CEPH_OSD_OP_OMAPRMKEYRANGE, 0, bl.length(), bl);
+ }
+
+ // object classes
+ void call(const char *cname, const char *method, ceph::buffer::list &indata) {
+ add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
+ }
+
+ void call(const char *cname, const char *method, ceph::buffer::list &indata,
+ ceph::buffer::list *outdata, Context *ctx, int *prval) {
+ add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
+ }
+
+ void call(std::string_view cname, std::string_view method,
+ const ceph::buffer::list& indata, boost::system::error_code* ec) {
+ add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
+ out_ec.back() = ec;
+ }
+
+ void call(std::string_view cname, std::string_view method, const ceph::buffer::list& indata,
+ boost::system::error_code* ec, ceph::buffer::list *outdata) {
+ add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, nullptr, nullptr);
+ out_ec.back() = ec;
+ }
+ void call(std::string_view cname, std::string_view method,
+ const ceph::buffer::list& indata,
+ fu2::unique_function<void (boost::system::error_code,
+ const ceph::buffer::list&) &&> f) {
+ add_call(CEPH_OSD_OP_CALL, cname, method, indata, std::move(f));
+ }
+ void call(std::string_view cname, std::string_view method,
+ const ceph::buffer::list& indata,
+ fu2::unique_function<void (boost::system::error_code, int,
+ const ceph::buffer::list&) &&> f) {
+ add_call(CEPH_OSD_OP_CALL, cname, method, indata, std::move(f));
+ }
+
+ // watch/notify
+ void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
+ osd_op.op.watch.cookie = cookie;
+ osd_op.op.watch.op = op;
+ osd_op.op.watch.timeout = timeout;
+ }
+
+ void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
+ ceph::buffer::list &bl, ceph::buffer::list *inbl) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
+ osd_op.op.notify.cookie = cookie;
+ encode(prot_ver, *inbl);
+ encode(timeout, *inbl);
+ encode(bl, *inbl);
+ osd_op.indata.append(*inbl);
+ }
+
+ void notify_ack(uint64_t notify_id, uint64_t cookie,
+ ceph::buffer::list& reply_bl) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
+ ceph::buffer::list bl;
+ encode(notify_id, bl);
+ encode(cookie, bl);
+ encode(reply_bl, bl);
+ osd_op.indata.append(bl);
+ }
+
+ void list_watchers(std::list<obj_watch_t> *out,
+ int *prval) {
+ add_op(CEPH_OSD_OP_LIST_WATCHERS);
+ if (prval || out) {
+ set_handler(CB_ObjectOperation_decodewatchers(out, prval, nullptr));
+ out_rval.back() = prval;
+ }
+ }
+ void list_watchers(std::vector<neorados::ObjWatcher>* out,
+ boost::system::error_code* ec) {
+ add_op(CEPH_OSD_OP_LIST_WATCHERS);
+ set_handler(CB_ObjectOperation_decodewatchersneo(out, nullptr, ec));
+ out_ec.back() = ec;
+ }
+
+ void list_snaps(librados::snap_set_t *out, int *prval,
+ boost::system::error_code* ec = nullptr) {
+ add_op(CEPH_OSD_OP_LIST_SNAPS);
+ if (prval || out || ec) {
+ set_handler(CB_ObjectOperation_decodesnaps(out, nullptr, prval, ec));
+ out_rval.back() = prval;
+ out_ec.back() = ec;
+ }
+ }
+
+ void list_snaps(neorados::SnapSet *out, int *prval,
+ boost::system::error_code* ec = nullptr) {
+ add_op(CEPH_OSD_OP_LIST_SNAPS);
+ if (prval || out || ec) {
+ set_handler(CB_ObjectOperation_decodesnaps(nullptr, out, prval, ec));
+ out_rval.back() = prval;
+ out_ec.back() = ec;
+ }
+ }
+
+ void assert_version(uint64_t ver) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
+ osd_op.op.assert_ver.ver = ver;
+ }
+
+ void cmpxattr(const char *name, const ceph::buffer::list& val,
+ int op, int mode) {
+ add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
+ OSDOp& o = *ops.rbegin();
+ o.op.xattr.cmp_op = op;
+ o.op.xattr.cmp_mode = mode;
+ }
+
+ void rollback(uint64_t snapid) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
+ osd_op.op.snap.snapid = snapid;
+ }
+
+ void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
+ version_t src_version, unsigned flags,
+ unsigned src_fadvise_flags) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
+ osd_op.op.copy_from.snapid = snapid;
+ osd_op.op.copy_from.src_version = src_version;
+ osd_op.op.copy_from.flags = flags;
+ osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
+ encode(src, osd_op.indata);
+ encode(src_oloc, osd_op.indata);
+ }
+ void copy_from2(object_t src, snapid_t snapid, object_locator_t src_oloc,
+ version_t src_version, unsigned flags,
+ uint32_t truncate_seq, uint64_t truncate_size,
+ unsigned src_fadvise_flags) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM2);
+ osd_op.op.copy_from.snapid = snapid;
+ osd_op.op.copy_from.src_version = src_version;
+ osd_op.op.copy_from.flags = flags;
+ osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
+ encode(src, osd_op.indata);
+ encode(src_oloc, osd_op.indata);
+ encode(truncate_seq, osd_op.indata);
+ encode(truncate_size, osd_op.indata);
+ }
+
+ /**
+ * writeback content to backing tier
+ *
+ * If object is marked dirty in the cache tier, write back content
+ * to backing tier. If the object is clean this is a no-op.
+ *
+ * If writeback races with an update, the update will block.
+ *
+ * use with IGNORE_CACHE to avoid triggering promote.
+ */
+ void cache_flush() {
+ add_op(CEPH_OSD_OP_CACHE_FLUSH);
+ }
+
+ /**
+ * writeback content to backing tier
+ *
+ * If object is marked dirty in the cache tier, write back content
+ * to backing tier. If the object is clean this is a no-op.
+ *
+ * If writeback races with an update, return EAGAIN. Requires that
+ * the SKIPRWLOCKS flag be set.
+ *
+ * use with IGNORE_CACHE to avoid triggering promote.
+ */
+ void cache_try_flush() {
+ add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
+ }
+
+ /**
+ * evict object from cache tier
+ *
+ * If object is marked clean, remove the object from the cache tier.
+ * Otherwise, return EBUSY.
+ *
+ * use with IGNORE_CACHE to avoid triggering promote.
+ */
+ void cache_evict() {
+ add_op(CEPH_OSD_OP_CACHE_EVICT);
+ }
+
+ /*
+ * Extensible tier
+ */
+ void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
+ version_t tgt_version, int flag) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
+ osd_op.op.copy_from.snapid = snapid;
+ osd_op.op.copy_from.src_version = tgt_version;
+ encode(tgt, osd_op.indata);
+ encode(tgt_oloc, osd_op.indata);
+ set_last_op_flags(flag);
+ }
+
+ void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
+ object_t tgt_oid, uint64_t tgt_offset, int flag) {
+ using ceph::encode;
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
+ encode(src_offset, osd_op.indata);
+ encode(src_length, osd_op.indata);
+ encode(tgt_oloc, osd_op.indata);
+ encode(tgt_oid, osd_op.indata);
+ encode(tgt_offset, osd_op.indata);
+ set_last_op_flags(flag);
+ }
+
+ void tier_promote() {
+ add_op(CEPH_OSD_OP_TIER_PROMOTE);
+ }
+
+ void unset_manifest() {
+ add_op(CEPH_OSD_OP_UNSET_MANIFEST);
+ }
+
+ void tier_flush() {
+ add_op(CEPH_OSD_OP_TIER_FLUSH);
+ }
+
+ void tier_evict() {
+ add_op(CEPH_OSD_OP_TIER_EVICT);
+ }
+
+ void set_alloc_hint(uint64_t expected_object_size,
+ uint64_t expected_write_size,
+ uint32_t flags) {
+ add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
+ expected_write_size, flags);
+
+ // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
+ // not worth a feature bit. Set FAILOK per-op flag to make
+ // sure older osds don't trip over an unsupported opcode.
+ set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
+ }
+
+ template<typename V>
+ void dup(V& sops) {
+ ops.clear();
+ std::copy(sops.begin(), sops.end(),
+ std::back_inserter(ops));
+ out_bl.resize(sops.size());
+ out_handler.resize(sops.size());
+ out_rval.resize(sops.size());
+ out_ec.resize(sops.size());
+ for (uint32_t i = 0; i < sops.size(); i++) {
+ out_bl[i] = &sops[i].outdata;
+ out_rval[i] = &sops[i].rval;
+ out_ec[i] = nullptr;
+ }
+ }
+
+ /**
+ * Pin/unpin an object in cache tier
+ */
+ void cache_pin() {
+ add_op(CEPH_OSD_OP_CACHE_PIN);
+ }
+
+ void cache_unpin() {
+ add_op(CEPH_OSD_OP_CACHE_UNPIN);
+ }
+};
+
+inline std::ostream& operator <<(std::ostream& m, const ObjectOperation& oo) {
+ auto i = oo.ops.cbegin();
+ m << '[';
+ while (i != oo.ops.cend()) {
+ if (i != oo.ops.cbegin())
+ m << ' ';
+ m << *i;
+ ++i;
+ }
+ m << ']';
+ return m;
+}
+
+
+// ----------------
+
+class Objecter : public md_config_obs_t, public Dispatcher {
+ using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;
+public:
+ using OpSignature = void(boost::system::error_code);
+ using OpCompletion = ceph::async::Completion<OpSignature>;
+
+ // config observer bits
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) override;
+
+public:
+ Messenger *messenger;
+ MonClient *monc;
+ boost::asio::io_context& service;
+ // The guaranteed sequenced, one-at-a-time execution and apparently
+ // people sometimes depend on this.
+ boost::asio::io_context::strand finish_strand{service};
+ ZTracer::Endpoint trace_endpoint{"0.0.0.0", 0, "Objecter"};
+private:
+ std::unique_ptr<OSDMap> osdmap{std::make_unique<OSDMap>()};
+public:
+ using Dispatcher::cct;
+ std::multimap<std::string,std::string> crush_location;
+
+ std::atomic<bool> initialized{false};
+
+private:
+ std::atomic<uint64_t> last_tid{0};
+ std::atomic<unsigned> inflight_ops{0};
+ std::atomic<int> client_inc{-1};
+ uint64_t max_linger_id{0};
+ std::atomic<unsigned> num_in_flight{0};
+ std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
+ bool keep_balanced_budget = false;
+ bool honor_pool_full = true;
+
+ // If this is true, accumulate a set of blocklisted entities
+ // to be drained by consume_blocklist_events.
+ bool blocklist_events_enabled = false;
+ std::set<entity_addr_t> blocklist_events;
+ struct pg_mapping_t {
+ epoch_t epoch = 0;
+ std::vector<int> up;
+ int up_primary = -1;
+ std::vector<int> acting;
+ int acting_primary = -1;
+
+ pg_mapping_t() {}
+ pg_mapping_t(epoch_t epoch, const std::vector<int>& up, int up_primary,
+ const std::vector<int>& acting, int acting_primary)
+ : epoch(epoch), up(up), up_primary(up_primary),
+ acting(acting), acting_primary(acting_primary) {}
+ };
+ ceph::shared_mutex pg_mapping_lock =
+ ceph::make_shared_mutex("Objecter::pg_mapping_lock");
+ // pool -> pg mapping
+ std::map<int64_t, std::vector<pg_mapping_t>> pg_mappings;
+
+ // convenient accessors
+ bool lookup_pg_mapping(const pg_t& pg, epoch_t epoch, std::vector<int> *up,
+ int *up_primary, std::vector<int> *acting,
+ int *acting_primary) {
+ std::shared_lock l{pg_mapping_lock};
+ auto it = pg_mappings.find(pg.pool());
+ if (it == pg_mappings.end())
+ return false;
+ auto& mapping_array = it->second;
+ if (pg.ps() >= mapping_array.size())
+ return false;
+ if (mapping_array[pg.ps()].epoch != epoch) // stale
+ return false;
+ auto& pg_mapping = mapping_array[pg.ps()];
+ *up = pg_mapping.up;
+ *up_primary = pg_mapping.up_primary;
+ *acting = pg_mapping.acting;
+ *acting_primary = pg_mapping.acting_primary;
+ return true;
+ }
+ void update_pg_mapping(const pg_t& pg, pg_mapping_t&& pg_mapping) {
+ std::lock_guard l{pg_mapping_lock};
+ auto& mapping_array = pg_mappings[pg.pool()];
+ ceph_assert(pg.ps() < mapping_array.size());
+ mapping_array[pg.ps()] = std::move(pg_mapping);
+ }
+ void prune_pg_mapping(const mempool::osdmap::map<int64_t,pg_pool_t>& pools) {
+ std::lock_guard l{pg_mapping_lock};
+ for (auto& pool : pools) {
+ auto& mapping_array = pg_mappings[pool.first];
+ size_t pg_num = pool.second.get_pg_num();
+ if (mapping_array.size() != pg_num) {
+ // catch both pg_num increasing & decreasing
+ mapping_array.resize(pg_num);
+ }
+ }
+ for (auto it = pg_mappings.begin(); it != pg_mappings.end(); ) {
+ if (!pools.count(it->first)) {
+ // pool is gone
+ pg_mappings.erase(it++);
+ continue;
+ }
+ it++;
+ }
+ }
+
+public:
+ void maybe_request_map();
+
+ void enable_blocklist_events();
+private:
+
+ void _maybe_request_map();
+
+ version_t last_seen_osdmap_version = 0;
+ version_t last_seen_pgmap_version = 0;
+
+ mutable ceph::shared_mutex rwlock =
+ ceph::make_shared_mutex("Objecter::rwlock");
+ ceph::timer<ceph::coarse_mono_clock> timer;
+
+ PerfCounters* logger = nullptr;
+
+ uint64_t tick_event = 0;
+
+ void start_tick();
+ void tick();
+ void update_crush_location();
+
+ class RequestStateHook;
+
+ RequestStateHook *m_request_state_hook = nullptr;
+
+public:
+ /*** track pending operations ***/
+ // read
+
+ struct OSDSession;
+
+ struct op_target_t {
+ int flags = 0;
+
+ epoch_t epoch = 0; ///< latest epoch we calculated the mapping
+
+ object_t base_oid;
+ object_locator_t base_oloc;
+ object_t target_oid;
+ object_locator_t target_oloc;
+
+ ///< true if we are directed at base_pgid, not base_oid
+ bool precalc_pgid = false;
+
+ ///< true if we have ever mapped to a valid pool
+ bool pool_ever_existed = false;
+
+ ///< explcit pg target, if any
+ pg_t base_pgid;
+
+ pg_t pgid; ///< last (raw) pg we mapped to
+ spg_t actual_pgid; ///< last (actual) spg_t we mapped to
+ unsigned pg_num = 0; ///< last pg_num we mapped to
+ unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
+ unsigned pg_num_pending = 0; ///< last pg_num we mapped to
+ std::vector<int> up; ///< set of up osds for last pg we mapped to
+ std::vector<int> acting; ///< set of acting osds for last pg we mapped to
+ int up_primary = -1; ///< last up_primary we mapped to
+ int acting_primary = -1; ///< last acting_primary we mapped to
+ int size = -1; ///< the size of the pool when were were last mapped
+ int min_size = -1; ///< the min size of the pool when were were last mapped
+ bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
+ bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
+ uint32_t peering_crush_bucket_count = 0;
+ uint32_t peering_crush_bucket_target = 0;
+ uint32_t peering_crush_bucket_barrier = 0;
+ int32_t peering_crush_mandatory_member = CRUSH_ITEM_NONE;
+
+ bool used_replica = false;
+ bool paused = false;
+
+ int osd = -1; ///< the final target osd, or -1
+
+ epoch_t last_force_resend = 0;
+
+ op_target_t(const object_t& oid, const object_locator_t& oloc, int flags)
+ : flags(flags),
+ base_oid(oid),
+ base_oloc(oloc)
+ {}
+
+ explicit op_target_t(pg_t pgid)
+ : base_oloc(pgid.pool(), pgid.ps()),
+ precalc_pgid(true),
+ base_pgid(pgid)
+ {}
+
+ op_target_t() = default;
+
+ hobject_t get_hobj() {
+ return hobject_t(target_oid,
+ target_oloc.key,
+ CEPH_NOSNAP,
+ target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
+ target_oloc.pool,
+ target_oloc.nspace);
+ }
+
+ bool contained_by(const hobject_t& begin, const hobject_t& end) {
+ hobject_t h = get_hobj();
+ int r = cmp(h, begin);
+ return r == 0 || (r > 0 && h < end);
+ }
+
+ bool respects_full() const {
+ return
+ (flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
+ !(flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
+ }
+
+ void dump(ceph::Formatter *f) const;
+ };
+
+ std::unique_ptr<ceph::async::Completion<void(boost::system::error_code)>>
+ OpContextVert(Context* c) {
+ if (c)
+ return ceph::async::Completion<void(boost::system::error_code)>::create(
+ service.get_executor(),
+ [c = std::unique_ptr<Context>(c)]
+ (boost::system::error_code e) mutable {
+ c.release()->complete(e);
+ });
+ else
+ return nullptr;
+ }
+
+ template<typename T>
+ std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
+ OpContextVert(Context* c, T* p) {
+
+ if (c || p)
+ return
+ ceph::async::Completion<void(boost::system::error_code, T)>::create(
+ service.get_executor(),
+ [c = std::unique_ptr<Context>(c), p]
+ (boost::system::error_code e, T r) mutable {
+ if (p)
+ *p = std::move(r);
+ if (c)
+ c.release()->complete(ceph::from_error_code(e));
+ });
+ else
+ return nullptr;
+ }
+
+ template<typename T>
+ std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
+ OpContextVert(Context* c, T& p) {
+ if (c)
+ return ceph::async::Completion<
+ void(boost::system::error_code, T)>::create(
+ service.get_executor(),
+ [c = std::unique_ptr<Context>(c), &p]
+ (boost::system::error_code e, T r) mutable {
+ p = std::move(r);
+ if (c)
+ c.release()->complete(ceph::from_error_code(e));
+ });
+ else
+ return nullptr;
+ }
+
+ struct Op : public RefCountedObject {
+ OSDSession *session = nullptr;
+ int incarnation = 0;
+
+ op_target_t target;
+
+ ConnectionRef con = nullptr; // for rx buffer only
+ uint64_t features = CEPH_FEATURES_SUPPORTED_DEFAULT; // explicitly specified op features
+
+ osdc_opvec ops;
+
+ snapid_t snapid = CEPH_NOSNAP;
+ SnapContext snapc;
+ ceph::real_time mtime;
+
+ ceph::buffer::list *outbl = nullptr;
+ boost::container::small_vector<ceph::buffer::list*, osdc_opvec_len> out_bl;
+ boost::container::small_vector<
+ fu2::unique_function<void(boost::system::error_code, int,
+ const ceph::buffer::list& bl) &&>,
+ osdc_opvec_len> out_handler;
+ boost::container::small_vector<int*, osdc_opvec_len> out_rval;
+ boost::container::small_vector<boost::system::error_code*,
+ osdc_opvec_len> out_ec;
+
+ int priority = 0;
+ using OpSig = void(boost::system::error_code);
+ using OpComp = ceph::async::Completion<OpSig>;
+ // Due to an irregularity of cmpxattr, we actualy need the 'int'
+ // value for onfinish for legacy librados users. As such just
+ // preserve the Context* in this one case. That way we can have
+ // our callers just pass in a unique_ptr<OpComp> and not deal with
+ // our signature in Objecter being different than the exposed
+ // signature in RADOS.
+ //
+ // Add a function for the linger case, where we want better
+ // semantics than Context, but still need to be under the completion_lock.
+ std::variant<std::unique_ptr<OpComp>, fu2::unique_function<OpSig>,
+ Context*> onfinish;
+ uint64_t ontimeout = 0;
+
+ ceph_tid_t tid = 0;
+ int attempts = 0;
+
+ version_t *objver;
+ epoch_t *reply_epoch = nullptr;
+
+ ceph::coarse_mono_time stamp;
+
+ epoch_t map_dne_bound = 0;
+
+ int budget = -1;
+
+ /// true if we should resend this message on failure
+ bool should_resend = true;
+
+ /// true if the throttle budget is get/put on a series of OPs,
+ /// instead of per OP basis, when this flag is set, the budget is
+ /// acquired before sending the very first OP of the series and
+ /// released upon receiving the last OP reply.
+ bool ctx_budgeted = false;
+
+ int *data_offset;
+
+ osd_reqid_t reqid; // explicitly setting reqid
+ ZTracer::Trace trace;
+
+ static bool has_completion(decltype(onfinish)& f) {
+ return std::visit([](auto&& arg) { return bool(arg);}, f);
+ }
+ bool has_completion() {
+ return has_completion(onfinish);
+ }
+
+ static void complete(decltype(onfinish)&& f, boost::system::error_code ec,
+ int r) {
+ std::visit([ec, r](auto&& arg) {
+ if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
+ Context*>) {
+ arg->complete(r);
+ } else if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
+ fu2::unique_function<OpSig>>) {
+ std::move(arg)(ec);
+ } else {
+ arg->defer(std::move(arg), ec);
+ }
+ }, std::move(f));
+ }
+ void complete(boost::system::error_code ec, int r) {
+ complete(std::move(onfinish), ec, r);
+ }
+
+ Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
+ int f, std::unique_ptr<OpComp>&& fin,
+ version_t *ov, int *offset = nullptr,
+ ZTracer::Trace *parent_trace = nullptr) :
+ target(o, ol, f),
+ ops(std::move(_ops)),
+ out_bl(ops.size(), nullptr),
+ out_handler(ops.size()),
+ out_rval(ops.size(), nullptr),
+ out_ec(ops.size(), nullptr),
+ onfinish(std::move(fin)),
+ objver(ov),
+ data_offset(offset) {
+ if (target.base_oloc.key == o)
+ target.base_oloc.key.clear();
+ if (parent_trace && parent_trace->valid()) {
+ trace.init("op", nullptr, parent_trace);
+ trace.event("start");
+ }
+ }
+
+ Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
+ int f, Context* fin, version_t *ov, int *offset = nullptr,
+ ZTracer::Trace *parent_trace = nullptr) :
+ target(o, ol, f),
+ ops(std::move(_ops)),
+ out_bl(ops.size(), nullptr),
+ out_handler(ops.size()),
+ out_rval(ops.size(), nullptr),
+ out_ec(ops.size(), nullptr),
+ onfinish(fin),
+ objver(ov),
+ data_offset(offset) {
+ if (target.base_oloc.key == o)
+ target.base_oloc.key.clear();
+ if (parent_trace && parent_trace->valid()) {
+ trace.init("op", nullptr, parent_trace);
+ trace.event("start");
+ }
+ }
+
+ Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
+ int f, fu2::unique_function<OpSig>&& fin, version_t *ov, int *offset = nullptr,
+ ZTracer::Trace *parent_trace = nullptr) :
+ target(o, ol, f),
+ ops(std::move(_ops)),
+ out_bl(ops.size(), nullptr),
+ out_handler(ops.size()),
+ out_rval(ops.size(), nullptr),
+ out_ec(ops.size(), nullptr),
+ onfinish(std::move(fin)),
+ objver(ov),
+ data_offset(offset) {
+ if (target.base_oloc.key == o)
+ target.base_oloc.key.clear();
+ if (parent_trace && parent_trace->valid()) {
+ trace.init("op", nullptr, parent_trace);
+ trace.event("start");
+ }
+ }
+
+ bool operator<(const Op& other) const {
+ return tid < other.tid;
+ }
+
+ private:
+ ~Op() override {
+ trace.event("finish");
+ }
+ };
+
+ struct CB_Op_Map_Latest {
+ Objecter *objecter;
+ ceph_tid_t tid;
+ CB_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
+ void operator()(boost::system::error_code err, version_t latest, version_t);
+ };
+
+ struct CB_Command_Map_Latest {
+ Objecter *objecter;
+ uint64_t tid;
+ CB_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
+ void operator()(boost::system::error_code err, version_t latest, version_t);
+ };
+
+ struct C_Stat : public Context {
+ ceph::buffer::list bl;
+ uint64_t *psize;
+ ceph::real_time *pmtime;
+ Context *fin;
+ C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
+ psize(ps), pmtime(pm), fin(c) {}
+ void finish(int r) override {
+ using ceph::decode;
+ if (r >= 0) {
+ auto p = bl.cbegin();
+ uint64_t s;
+ ceph::real_time m;
+ decode(s, p);
+ decode(m, p);
+ if (psize)
+ *psize = s;
+ if (pmtime)
+ *pmtime = m;
+ }
+ fin->complete(r);
+ }
+ };
+
+ struct C_GetAttrs : public Context {
+ ceph::buffer::list bl;
+ std::map<std::string,ceph::buffer::list>& attrset;
+ Context *fin;
+ C_GetAttrs(std::map<std::string, ceph::buffer::list>& set, Context *c) : attrset(set),
+ fin(c) {}
+ void finish(int r) override {
+ using ceph::decode;
+ if (r >= 0) {
+ auto p = bl.cbegin();
+ decode(attrset, p);
+ }
+ fin->complete(r);
+ }
+ };
+
+
+ // Pools and statistics
+ struct NListContext {
+ collection_list_handle_t pos;
+
+ // these are for !sortbitwise compat only
+ int current_pg = 0;
+ int starting_pg_num = 0;
+ bool sort_bitwise = false;
+
+ bool at_end_of_pool = false; ///< publicly visible end flag
+
+ int64_t pool_id = -1;
+ int pool_snap_seq = 0;
+ uint64_t max_entries = 0;
+ std::string nspace;
+
+ ceph::buffer::list bl; // raw data read to here
+ std::list<librados::ListObjectImpl> list;
+
+ ceph::buffer::list filter;
+
+ // The budget associated with this context, once it is set (>= 0),
+ // the budget is not get/released on OP basis, instead the budget
+ // is acquired before sending the first OP and released upon receiving
+ // the last op reply.
+ int ctx_budget = -1;
+
+ bool at_end() const {
+ return at_end_of_pool;
+ }
+
+ uint32_t get_pg_hash_position() const {
+ return pos.get_hash();
+ }
+ };
+
+ struct C_NList : public Context {
+ NListContext *list_context;
+ Context *final_finish;
+ Objecter *objecter;
+ epoch_t epoch;
+ C_NList(NListContext *lc, Context * finish, Objecter *ob) :
+ list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
+ void finish(int r) override {
+ if (r >= 0) {
+ objecter->_nlist_reply(list_context, r, final_finish, epoch);
+ } else {
+ final_finish->complete(r);
+ }
+ }
+ };
+
+ struct PoolStatOp {
+ ceph_tid_t tid;
+ std::vector<std::string> pools;
+ using OpSig = void(boost::system::error_code,
+ boost::container::flat_map<std::string, pool_stat_t>,
+ bool);
+ using OpComp = ceph::async::Completion<OpSig>;
+ std::unique_ptr<OpComp> onfinish;
+ std::uint64_t ontimeout;
+ ceph::coarse_mono_time last_submit;
+ };
+
+ struct StatfsOp {
+ ceph_tid_t tid;
+ std::optional<int64_t> data_pool;
+ using OpSig = void(boost::system::error_code,
+ const struct ceph_statfs);
+ using OpComp = ceph::async::Completion<OpSig>;
+
+ std::unique_ptr<OpComp> onfinish;
+ uint64_t ontimeout;
+
+ ceph::coarse_mono_time last_submit;
+ };
+
+ struct PoolOp {
+ ceph_tid_t tid = 0;
+ int64_t pool = 0;
+ std::string name;
+ using OpSig = void(boost::system::error_code, ceph::buffer::list);
+ using OpComp = ceph::async::Completion<OpSig>;
+ std::unique_ptr<OpComp> onfinish;
+ uint64_t ontimeout = 0;
+ int pool_op = 0;
+ int16_t crush_rule = 0;
+ snapid_t snapid = 0;
+ ceph::coarse_mono_time last_submit;
+
+ PoolOp() {}
+ };
+
+ // -- osd commands --
+ struct CommandOp : public RefCountedObject {
+ OSDSession *session = nullptr;
+ ceph_tid_t tid = 0;
+ std::vector<std::string> cmd;
+ ceph::buffer::list inbl;
+
+ // target_osd == -1 means target_pg is valid
+ const int target_osd = -1;
+ const pg_t target_pg;
+
+ op_target_t target;
+
+ epoch_t map_dne_bound = 0;
+ int map_check_error = 0; // error to return if std::map check fails
+ const char *map_check_error_str = nullptr;
+
+ using OpSig = void(boost::system::error_code, std::string,
+ ceph::buffer::list);
+ using OpComp = ceph::async::Completion<OpSig>;
+ std::unique_ptr<OpComp> onfinish;
+
+ uint64_t ontimeout = 0;
+ ceph::coarse_mono_time last_submit;
+
+ CommandOp(
+ int target_osd,
+ std::vector<std::string>&& cmd,
+ ceph::buffer::list&& inbl,
+ decltype(onfinish)&& onfinish)
+ : cmd(std::move(cmd)),
+ inbl(std::move(inbl)),
+ target_osd(target_osd),
+ onfinish(std::move(onfinish)) {}
+
+ CommandOp(
+ pg_t pgid,
+ std::vector<std::string>&& cmd,
+ ceph::buffer::list&& inbl,
+ decltype(onfinish)&& onfinish)
+ : cmd(std::move(cmd)),
+ inbl(std::move(inbl)),
+ target_pg(pgid),
+ target(pgid),
+ onfinish(std::move(onfinish)) {}
+ };
+
+ void submit_command(CommandOp *c, ceph_tid_t *ptid);
+ int _calc_command_target(CommandOp *c,
+ ceph::shunique_lock<ceph::shared_mutex> &sul);
+ void _assign_command_session(CommandOp *c,
+ ceph::shunique_lock<ceph::shared_mutex> &sul);
+ void _send_command(CommandOp *c);
+ int command_op_cancel(OSDSession *s, ceph_tid_t tid,
+ boost::system::error_code ec);
+ void _finish_command(CommandOp *c, boost::system::error_code ec,
+ std::string&& rs, ceph::buffer::list&& bl);
+ void handle_command_reply(MCommandReply *m);
+
+ // -- lingering ops --
+
+ struct LingerOp : public RefCountedObject {
+ Objecter *objecter;
+ uint64_t linger_id{0};
+ op_target_t target{object_t(), object_locator_t(), 0};
+ snapid_t snap{CEPH_NOSNAP};
+ SnapContext snapc;
+ ceph::real_time mtime;
+
+ osdc_opvec ops;
+ ceph::buffer::list inbl;
+ version_t *pobjver{nullptr};
+
+ bool is_watch{false};
+ ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
+ boost::system::error_code last_error; ///< error from last failed ping|reconnect, if any
+ ceph::shared_mutex watch_lock;
+
+ // queue of pending async operations, with the timestamp of
+ // when they were queued.
+ std::list<ceph::coarse_mono_time> watch_pending_async;
+
+ uint32_t register_gen{0};
+ bool registered{false};
+ bool canceled{false};
+ using OpSig = void(boost::system::error_code, ceph::buffer::list);
+ using OpComp = ceph::async::Completion<OpSig>;
+ std::unique_ptr<OpComp> on_reg_commit;
+ std::unique_ptr<OpComp> on_notify_finish;
+ uint64_t notify_id{0};
+
+ fu2::unique_function<void(boost::system::error_code,
+ uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ ceph::buffer::list&& bl)> handle;
+ OSDSession *session{nullptr};
+
+ int ctx_budget{-1};
+ ceph_tid_t register_tid{0};
+ ceph_tid_t ping_tid{0};
+ epoch_t map_dne_bound{0};
+
+ void _queued_async() {
+ // watch_lock ust be locked unique
+ watch_pending_async.push_back(ceph::coarse_mono_clock::now());
+ }
+ void finished_async() {
+ std::unique_lock l(watch_lock);
+ ceph_assert(!watch_pending_async.empty());
+ watch_pending_async.pop_front();
+ }
+
+ LingerOp(Objecter *o, uint64_t linger_id);
+ const LingerOp& operator=(const LingerOp& r) = delete;
+ LingerOp(const LingerOp& o) = delete;
+
+ uint64_t get_cookie() {
+ return reinterpret_cast<uint64_t>(this);
+ }
+ };
+
+ struct CB_Linger_Commit {
+ Objecter *objecter;
+ boost::intrusive_ptr<LingerOp> info;
+ ceph::buffer::list outbl; // used for notify only
+ CB_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {}
+ ~CB_Linger_Commit() = default;
+
+ void operator()(boost::system::error_code ec) {
+ objecter->_linger_commit(info.get(), ec, outbl);
+ }
+ };
+
+ struct CB_Linger_Reconnect {
+ Objecter *objecter;
+ boost::intrusive_ptr<LingerOp> info;
+ CB_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {}
+ ~CB_Linger_Reconnect() = default;
+
+ void operator()(boost::system::error_code ec) {
+ objecter->_linger_reconnect(info.get(), ec);
+ info.reset();
+ }
+ };
+
+ struct CB_Linger_Ping {
+ Objecter *objecter;
+ boost::intrusive_ptr<LingerOp> info;
+ ceph::coarse_mono_time sent;
+ uint32_t register_gen;
+ CB_Linger_Ping(Objecter *o, LingerOp *l, ceph::coarse_mono_time s)
+ : objecter(o), info(l), sent(s), register_gen(info->register_gen) {}
+ void operator()(boost::system::error_code ec) {
+ objecter->_linger_ping(info.get(), ec, sent, register_gen);
+ info.reset();
+ }
+ };
+
+ struct CB_Linger_Map_Latest {
+ Objecter *objecter;
+ uint64_t linger_id;
+ CB_Linger_Map_Latest(Objecter *o, uint64_t id) : objecter(o), linger_id(id) {}
+ void operator()(boost::system::error_code err, version_t latest, version_t);
+ };
+
+ // -- osd sessions --
+ struct OSDBackoff {
+ spg_t pgid;
+ uint64_t id;
+ hobject_t begin, end;
+ };
+
+ struct OSDSession : public RefCountedObject {
+ // pending ops
+ std::map<ceph_tid_t,Op*> ops;
+ std::map<uint64_t, LingerOp*> linger_ops;
+ std::map<ceph_tid_t,CommandOp*> command_ops;
+
+ // backoffs
+ std::map<spg_t,std::map<hobject_t,OSDBackoff>> backoffs;
+ std::map<uint64_t,OSDBackoff*> backoffs_by_id;
+
+ int osd;
+ // NB locking two sessions at the same time is only safe because
+ // it is only done in _recalc_linger_op_target with s and
+ // linger_op->session, and it holds rwlock for write. We disable
+ // lockdep (using std::sharedMutex) because lockdep doesn't know
+ // that.
+ std::shared_mutex lock;
+
+ int incarnation;
+ ConnectionRef con;
+ int num_locks;
+ std::unique_ptr<std::mutex[]> completion_locks;
+
+ OSDSession(CephContext *cct, int o) :
+ osd(o), incarnation(0), con(NULL),
+ num_locks(cct->_conf->objecter_completion_locks_per_session),
+ completion_locks(new std::mutex[num_locks]) {}
+
+ ~OSDSession() override;
+
+ bool is_homeless() { return (osd == -1); }
+
+ std::unique_lock<std::mutex> get_lock(object_t& oid);
+ };
+ std::map<int,OSDSession*> osd_sessions;
+
+ bool osdmap_full_flag() const;
+ bool osdmap_pool_full(const int64_t pool_id) const;
+
+
+ private:
+
+ /**
+ * Test pg_pool_t::FLAG_FULL on a pool
+ *
+ * @return true if the pool exists and has the flag set, or
+ * the global full flag is set, else false
+ */
+ bool _osdmap_pool_full(const int64_t pool_id) const;
+ bool _osdmap_pool_full(const pg_pool_t &p) const {
+ return p.has_flag(pg_pool_t::FLAG_FULL) && honor_pool_full;
+ }
+ void update_pool_full_map(std::map<int64_t, bool>& pool_full_map);
+
+ std::map<uint64_t, LingerOp*> linger_ops;
+ // we use this just to confirm a cookie is valid before dereferencing the ptr
+ std::set<LingerOp*> linger_ops_set;
+
+ std::map<ceph_tid_t,PoolStatOp*> poolstat_ops;
+ std::map<ceph_tid_t,StatfsOp*> statfs_ops;
+ std::map<ceph_tid_t,PoolOp*> pool_ops;
+ std::atomic<unsigned> num_homeless_ops{0};
+
+ OSDSession* homeless_session = new OSDSession(cct, -1);
+
+
+ // ops waiting for an osdmap with a new pool or confirmation that
+ // the pool does not exist (may be expanded to other uses later)
+ std::map<uint64_t, LingerOp*> check_latest_map_lingers;
+ std::map<ceph_tid_t, Op*> check_latest_map_ops;
+ std::map<ceph_tid_t, CommandOp*> check_latest_map_commands;
+
+ std::map<epoch_t,
+ std::vector<std::pair<std::unique_ptr<OpCompletion>,
+ boost::system::error_code>>> waiting_for_map;
+
+ ceph::timespan mon_timeout;
+ ceph::timespan osd_timeout;
+
+ MOSDOp *_prepare_osd_op(Op *op);
+ void _send_op(Op *op);
+ void _send_op_account(Op *op);
+ void _cancel_linger_op(Op *op);
+ void _finish_op(Op *op, int r);
+ static bool is_pg_changed(
+ int oldprimary,
+ const std::vector<int>& oldacting,
+ int newprimary,
+ const std::vector<int>& newacting,
+ bool any_change=false);
+ enum recalc_op_target_result {
+ RECALC_OP_TARGET_NO_ACTION = 0,
+ RECALC_OP_TARGET_NEED_RESEND,
+ RECALC_OP_TARGET_POOL_DNE,
+ RECALC_OP_TARGET_OSD_DNE,
+ RECALC_OP_TARGET_OSD_DOWN,
+ RECALC_OP_TARGET_POOL_EIO,
+ };
+ bool _osdmap_full_flag() const;
+ bool _osdmap_has_pool_full() const;
+ void _prune_snapc(
+ const mempool::osdmap::map<int64_t, snap_interval_set_t>& new_removed_snaps,
+ Op *op);
+
+ bool target_should_be_paused(op_target_t *op);
+ int _calc_target(op_target_t *t, Connection *con,
+ bool any_change = false);
+ int _map_session(op_target_t *op, OSDSession **s,
+ ceph::shunique_lock<ceph::shared_mutex>& lc);
+
+ void _session_op_assign(OSDSession *s, Op *op);
+ void _session_op_remove(OSDSession *s, Op *op);
+ void _session_linger_op_assign(OSDSession *to, LingerOp *op);
+ void _session_linger_op_remove(OSDSession *from, LingerOp *op);
+ void _session_command_op_assign(OSDSession *to, CommandOp *op);
+ void _session_command_op_remove(OSDSession *from, CommandOp *op);
+
+ int _assign_op_target_session(Op *op, ceph::shunique_lock<ceph::shared_mutex>& lc,
+ bool src_session_locked,
+ bool dst_session_locked);
+ int _recalc_linger_op_target(LingerOp *op,
+ ceph::shunique_lock<ceph::shared_mutex>& lc);
+
+ void _linger_submit(LingerOp *info,
+ ceph::shunique_lock<ceph::shared_mutex>& sul);
+ void _send_linger(LingerOp *info,
+ ceph::shunique_lock<ceph::shared_mutex>& sul);
+ void _linger_commit(LingerOp *info, boost::system::error_code ec,
+ ceph::buffer::list& outbl);
+ void _linger_reconnect(LingerOp *info, boost::system::error_code ec);
+ void _send_linger_ping(LingerOp *info);
+ void _linger_ping(LingerOp *info, boost::system::error_code ec,
+ ceph::coarse_mono_time sent, uint32_t register_gen);
+ boost::system::error_code _normalize_watch_error(boost::system::error_code ec);
+
+ friend class CB_Objecter_GetVersion;
+ friend class CB_DoWatchError;
+public:
+ template<typename CT>
+ auto linger_callback_flush(CT&& ct) {
+ boost::asio::async_completion<CT, void(void)> init(ct);
+ boost::asio::defer(finish_strand, std::move(init.completion_handler));
+ return init.result.get();
+ }
+
+private:
+ void _check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl);
+ void _check_op_pool_eio(Op *op, std::unique_lock<std::shared_mutex> *sl);
+ void _send_op_map_check(Op *op);
+ void _op_cancel_map_check(Op *op);
+ void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
+ void _check_linger_pool_eio(LingerOp *op);
+ void _send_linger_map_check(LingerOp *op);
+ void _linger_cancel_map_check(LingerOp *op);
+ void _check_command_map_dne(CommandOp *op);
+ void _send_command_map_check(CommandOp *op);
+ void _command_cancel_map_check(CommandOp *op);
+
+ void _kick_requests(OSDSession *session, std::map<uint64_t, LingerOp *>& lresend);
+ void _linger_ops_resend(std::map<uint64_t, LingerOp *>& lresend,
+ std::unique_lock<ceph::shared_mutex>& ul);
+
+ int _get_session(int osd, OSDSession **session,
+ ceph::shunique_lock<ceph::shared_mutex>& sul);
+ void put_session(OSDSession *s);
+ void get_session(OSDSession *s);
+ void _reopen_session(OSDSession *session);
+ void close_session(OSDSession *session);
+
+ void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
+ epoch_t reply_epoch);
+
+ void resend_mon_ops();
+
+ /**
+ * handle a budget for in-flight ops
+ * budget is taken whenever an op goes into the ops std::map
+ * and returned whenever an op is removed from the std::map
+ * If throttle_op needs to throttle it will unlock client_lock.
+ */
+ int calc_op_budget(const boost::container::small_vector_base<OSDOp>& ops);
+ void _throttle_op(Op *op, ceph::shunique_lock<ceph::shared_mutex>& sul,
+ int op_size = 0);
+ int _take_op_budget(Op *op, ceph::shunique_lock<ceph::shared_mutex>& sul) {
+ ceph_assert(sul && sul.mutex() == &rwlock);
+ int op_budget = calc_op_budget(op->ops);
+ if (keep_balanced_budget) {
+ _throttle_op(op, sul, op_budget);
+ } else { // update take_linger_budget to match this!
+ op_throttle_bytes.take(op_budget);
+ op_throttle_ops.take(1);
+ }
+ op->budget = op_budget;
+ return op_budget;
+ }
+ int take_linger_budget(LingerOp *info);
+ void put_op_budget_bytes(int op_budget) {
+ ceph_assert(op_budget >= 0);
+ op_throttle_bytes.put(op_budget);
+ op_throttle_ops.put(1);
+ }
+ void put_nlist_context_budget(NListContext *list_context);
+ Throttle op_throttle_bytes{cct, "objecter_bytes",
+ static_cast<int64_t>(
+ cct->_conf->objecter_inflight_op_bytes)};
+ Throttle op_throttle_ops{cct, "objecter_ops",
+ static_cast<int64_t>(
+ cct->_conf->objecter_inflight_ops)};
+ public:
+ Objecter(CephContext *cct, Messenger *m, MonClient *mc,
+ boost::asio::io_context& service);
+ ~Objecter() override;
+
+ void init();
+ void start(const OSDMap *o = nullptr);
+ void shutdown();
+
+ // These two templates replace osdmap_(get)|(put)_read. Simply wrap
+ // whatever functionality you want to use the OSDMap in a lambda like:
+ //
+ // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
+ //
+ // or
+ //
+ // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
+ //
+ // Do not call into something that will try to lock the OSDMap from
+ // here or you will have great woe and misery.
+
+ template<typename Callback, typename...Args>
+ decltype(auto) with_osdmap(Callback&& cb, Args&&... args) {
+ std::shared_lock l(rwlock);
+ return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
+ }
+
+
+ /**
+ * Tell the objecter to throttle outgoing ops according to its
+ * budget (in _conf). If you do this, ops can block, in
+ * which case it will unlock client_lock and sleep until
+ * incoming messages reduce the used budget low enough for
+ * the ops to continue going; then it will lock client_lock again.
+ */
+ void set_balanced_budget() { keep_balanced_budget = true; }
+ void unset_balanced_budget() { keep_balanced_budget = false; }
+
+ void set_honor_pool_full() { honor_pool_full = true; }
+ void unset_honor_pool_full() { honor_pool_full = false; }
+
+ void _scan_requests(
+ OSDSession *s,
+ bool skipped_map,
+ bool cluster_full,
+ std::map<int64_t, bool> *pool_full_map,
+ std::map<ceph_tid_t, Op*>& need_resend,
+ std::list<LingerOp*>& need_resend_linger,
+ std::map<ceph_tid_t, CommandOp*>& need_resend_command,
+ ceph::shunique_lock<ceph::shared_mutex>& sul);
+
+ int64_t get_object_hash_position(int64_t pool, const std::string& key,
+ const std::string& ns);
+ int64_t get_object_pg_hash_position(int64_t pool, const std::string& key,
+ const std::string& ns);
+
+ // messages
+ public:
+ bool ms_dispatch(Message *m) override;
+ bool ms_can_fast_dispatch_any() const override {
+ return true;
+ }
+ bool ms_can_fast_dispatch(const Message *m) const override {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OPREPLY:
+ case CEPH_MSG_WATCH_NOTIFY:
+ return true;
+ default:
+ return false;
+ }
+ }
+ void ms_fast_dispatch(Message *m) override {
+ if (!ms_dispatch(m)) {
+ m->put();
+ }
+ }
+
+ void handle_osd_op_reply(class MOSDOpReply *m);
+ void handle_osd_backoff(class MOSDBackoff *m);
+ void handle_watch_notify(class MWatchNotify *m);
+ void handle_osd_map(class MOSDMap *m);
+ void wait_for_osd_map(epoch_t e=0);
+
+ template<typename CompletionToken>
+ auto wait_for_osd_map(CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, void()> init(token);
+ std::unique_lock l(rwlock);
+ if (osdmap->get_epoch()) {
+ l.unlock();
+ boost::asio::post(std::move(init.completion_handler));
+ } else {
+ waiting_for_map[0].emplace_back(
+ OpCompletion::create(
+ service.get_executor(),
+ [c = std::move(init.completion_handler)]
+ (boost::system::error_code) mutable {
+ std::move(c)();
+ }), boost::system::error_code{});
+ l.unlock();
+ }
+ return init.result.get();
+ }
+
+
+ /**
+ * Get std::list of entities blocklisted since this was last called,
+ * and reset the std::list.
+ *
+ * Uses a std::set because typical use case is to compare some
+ * other std::list of clients to see which overlap with the blocklisted
+ * addrs.
+ *
+ */
+ void consume_blocklist_events(std::set<entity_addr_t> *events);
+
+ int pool_snap_by_name(int64_t poolid,
+ const char *snap_name,
+ snapid_t *snap) const;
+ int pool_snap_get_info(int64_t poolid, snapid_t snap,
+ pool_snap_info_t *info) const;
+ int pool_snap_list(int64_t poolid, std::vector<uint64_t> *snaps);
+private:
+
+ void emit_blocklist_events(const OSDMap::Incremental &inc);
+ void emit_blocklist_events(const OSDMap &old_osd_map,
+ const OSDMap &new_osd_map);
+
+ // low-level
+ void _op_submit(Op *op, ceph::shunique_lock<ceph::shared_mutex>& lc,
+ ceph_tid_t *ptid);
+ void _op_submit_with_budget(Op *op,
+ ceph::shunique_lock<ceph::shared_mutex>& lc,
+ ceph_tid_t *ptid,
+ int *ctx_budget = NULL);
+ // public interface
+public:
+ void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
+ bool is_active() {
+ std::shared_lock l(rwlock);
+ return !((!inflight_ops) && linger_ops.empty() &&
+ poolstat_ops.empty() && statfs_ops.empty());
+ }
+
+ /**
+ * Output in-flight requests
+ */
+ void _dump_active(OSDSession *s);
+ void _dump_active();
+ void dump_active();
+ void dump_requests(ceph::Formatter *fmt);
+ void _dump_ops(const OSDSession *s, ceph::Formatter *fmt);
+ void dump_ops(ceph::Formatter *fmt);
+ void _dump_linger_ops(const OSDSession *s, ceph::Formatter *fmt);
+ void dump_linger_ops(ceph::Formatter *fmt);
+ void _dump_command_ops(const OSDSession *s, ceph::Formatter *fmt);
+ void dump_command_ops(ceph::Formatter *fmt);
+ void dump_pool_ops(ceph::Formatter *fmt) const;
+ void dump_pool_stat_ops(ceph::Formatter *fmt) const;
+ void dump_statfs_ops(ceph::Formatter *fmt) const;
+
+ int get_client_incarnation() const { return client_inc; }
+ void set_client_incarnation(int inc) { client_inc = inc; }
+
+ bool have_map(epoch_t epoch);
+
+ struct CB_Objecter_GetVersion {
+ Objecter *objecter;
+ std::unique_ptr<OpCompletion> fin;
+
+ CB_Objecter_GetVersion(Objecter *o, std::unique_ptr<OpCompletion> c)
+ : objecter(o), fin(std::move(c)) {}
+ void operator()(boost::system::error_code ec, version_t newest,
+ version_t oldest) {
+ if (ec == boost::system::errc::resource_unavailable_try_again) {
+ // try again as instructed
+ objecter->_wait_for_latest_osdmap(std::move(*this));
+ } else if (ec) {
+ ceph::async::post(std::move(fin), ec);
+ } else {
+ auto l = std::unique_lock(objecter->rwlock);
+ objecter->_get_latest_version(oldest, newest, std::move(fin),
+ std::move(l));
+ }
+ }
+ };
+
+ template<typename CompletionToken>
+ auto wait_for_map(epoch_t epoch, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, OpSignature> init(token);
+
+ if (osdmap->get_epoch() >= epoch) {
+ boost::asio::post(service,
+ ceph::async::bind_handler(
+ std::move(init.completion_handler),
+ boost::system::error_code()));
+ } else {
+ monc->get_version("osdmap",
+ CB_Objecter_GetVersion(
+ this,
+ OpCompletion::create(service.get_executor(),
+ std::move(init.completion_handler))));
+ }
+ return init.result.get();
+ }
+
+ void _wait_for_new_map(std::unique_ptr<OpCompletion>, epoch_t epoch,
+ boost::system::error_code = {});
+
+private:
+ void _wait_for_latest_osdmap(CB_Objecter_GetVersion&& c) {
+ monc->get_version("osdmap", std::move(c));
+ }
+
+public:
+
+ template<typename CompletionToken>
+ auto wait_for_latest_osdmap(CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, OpSignature> init(token);
+
+ monc->get_version("osdmap",
+ CB_Objecter_GetVersion(
+ this,
+ OpCompletion::create(service.get_executor(),
+ std::move(init.completion_handler))));
+ return init.result.get();
+ }
+
+ void wait_for_latest_osdmap(std::unique_ptr<OpCompletion> c) {
+ monc->get_version("osdmap",
+ CB_Objecter_GetVersion(this, std::move(c)));
+ }
+
+ template<typename CompletionToken>
+ auto get_latest_version(epoch_t oldest, epoch_t newest,
+ CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, OpSignature> init(token);
+ {
+ std::unique_lock wl(rwlock);
+ _get_latest_version(oldest, newest,
+ OpCompletion::create(
+ service.get_executor(),
+ std::move(init.completion_handler)),
+ std::move(wl));
+ }
+ return init.result.get();
+ }
+
+ void _get_latest_version(epoch_t oldest, epoch_t neweset,
+ std::unique_ptr<OpCompletion> fin,
+ std::unique_lock<ceph::shared_mutex>&& ul);
+
+ /** Get the current set of global op flags */
+ int get_global_op_flags() const { return global_op_flags; }
+ /** Add a flag to the global op flags, not really atomic operation */
+ void add_global_op_flags(int flag) {
+ global_op_flags.fetch_or(flag);
+ }
+ /** Clear the passed flags from the global op flag set */
+ void clear_global_op_flag(int flags) {
+ global_op_flags.fetch_and(~flags);
+ }
+
+ /// cancel an in-progress request with the given return code
+private:
+ int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
+ int _op_cancel(ceph_tid_t tid, int r);
+public:
+ int op_cancel(ceph_tid_t tid, int r);
+ int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
+
+ /**
+ * Any write op which is in progress at the start of this call shall no
+ * longer be in progress when this call ends. Operations started after the
+ * start of this call may still be in progress when this call ends.
+ *
+ * @return the latest possible epoch in which a cancelled op could have
+ * existed, or -1 if nothing was cancelled.
+ */
+ epoch_t op_cancel_writes(int r, int64_t pool=-1);
+
+ // commands
+ void osd_command(int osd, std::vector<std::string> cmd,
+ ceph::buffer::list inbl, ceph_tid_t *ptid,
+ decltype(CommandOp::onfinish)&& onfinish) {
+ ceph_assert(osd >= 0);
+ auto c = new CommandOp(
+ osd,
+ std::move(cmd),
+ std::move(inbl),
+ std::move(onfinish));
+ submit_command(c, ptid);
+ }
+ template<typename CompletionToken>
+ auto osd_command(int osd, std::vector<std::string> cmd,
+ ceph::buffer::list inbl, ceph_tid_t *ptid,
+ CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken,
+ CommandOp::OpSig> init(token);
+ osd_command(osd, std::move(cmd), std::move(inbl), ptid,
+ CommandOp::OpComp::create(service.get_executor(),
+ std::move(init.completion_handler)));
+ return init.result.get();
+ }
+
+ void pg_command(pg_t pgid, std::vector<std::string> cmd,
+ ceph::buffer::list inbl, ceph_tid_t *ptid,
+ decltype(CommandOp::onfinish)&& onfinish) {
+ auto *c = new CommandOp(
+ pgid,
+ std::move(cmd),
+ std::move(inbl),
+ std::move(onfinish));
+ submit_command(c, ptid);
+ }
+
+ template<typename CompletionToken>
+ auto pg_command(pg_t pgid, std::vector<std::string> cmd,
+ ceph::buffer::list inbl, ceph_tid_t *ptid,
+ CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken,
+ CommandOp::OpSig> init(token);
+ pg_command(pgid, std::move(cmd), std::move(inbl), ptid,
+ CommandOp::OpComp::create(service.get_executor(),
+ std::move(init.completion_handler)));
+ return init.result.get();
+ }
+
+ // mid-level helpers
+ Op *prepare_mutate_op(
+ const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation& op, const SnapContext& snapc,
+ ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ osd_reqid_t reqid = osd_reqid_t(),
+ ZTracer::Trace *parent_trace = nullptr) {
+ Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver,
+ nullptr, parent_trace);
+ o->priority = op.priority;
+ o->mtime = mtime;
+ o->snapc = snapc;
+ o->out_rval.swap(op.out_rval);
+ o->out_bl.swap(op.out_bl);
+ o->out_handler.swap(op.out_handler);
+ o->out_ec.swap(op.out_ec);
+ o->reqid = reqid;
+ op.clear();
+ return o;
+ }
+ ceph_tid_t mutate(
+ const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation& op, const SnapContext& snapc,
+ ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ osd_reqid_t reqid = osd_reqid_t()) {
+ Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
+ oncommit, objver, reqid);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ void mutate(const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation&& op, const SnapContext& snapc,
+ ceph::real_time mtime, int flags,
+ std::unique_ptr<Op::OpComp>&& oncommit,
+ version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
+ ZTracer::Trace *parent_trace = nullptr) {
+ Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
+ nullptr, parent_trace);
+ o->priority = op.priority;
+ o->mtime = mtime;
+ o->snapc = snapc;
+ o->out_bl.swap(op.out_bl);
+ o->out_handler.swap(op.out_handler);
+ o->out_rval.swap(op.out_rval);
+ o->out_ec.swap(op.out_ec);
+ o->reqid = reqid;
+ op.clear();
+ op_submit(o);
+ }
+
+ Op *prepare_read_op(
+ const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation& op,
+ snapid_t snapid, ceph::buffer::list *pbl, int flags,
+ Context *onack, version_t *objver = NULL,
+ int *data_offset = NULL,
+ uint64_t features = 0,
+ ZTracer::Trace *parent_trace = nullptr) {
+ Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onack, objver,
+ data_offset, parent_trace);
+ o->priority = op.priority;
+ o->snapid = snapid;
+ o->outbl = pbl;
+ if (!o->outbl && op.size() == 1 && op.out_bl[0] && op.out_bl[0]->length())
+ o->outbl = op.out_bl[0];
+ o->out_bl.swap(op.out_bl);
+ o->out_handler.swap(op.out_handler);
+ o->out_rval.swap(op.out_rval);
+ o->out_ec.swap(op.out_ec);
+ op.clear();
+ return o;
+ }
+ ceph_tid_t read(
+ const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation& op,
+ snapid_t snapid, ceph::buffer::list *pbl, int flags,
+ Context *onack, version_t *objver = NULL,
+ int *data_offset = NULL,
+ uint64_t features = 0) {
+ Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
+ data_offset);
+ if (features)
+ o->features = features;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ void read(const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
+ int flags, std::unique_ptr<Op::OpComp>&& onack,
+ version_t *objver = nullptr, int *data_offset = nullptr,
+ uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
+ Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, std::move(onack), objver,
+ data_offset, parent_trace);
+ o->priority = op.priority;
+ o->snapid = snapid;
+ o->outbl = pbl;
+ // XXX
+ if (!o->outbl && op.size() == 1 && op.out_bl[0] && op.out_bl[0]->length()) {
+ o->outbl = op.out_bl[0];
+ }
+ o->out_bl.swap(op.out_bl);
+ o->out_handler.swap(op.out_handler);
+ o->out_rval.swap(op.out_rval);
+ o->out_ec.swap(op.out_ec);
+ if (features)
+ o->features = features;
+ op.clear();
+ op_submit(o);
+ }
+
+
+ Op *prepare_pg_read_op(
+ uint32_t hash, object_locator_t oloc,
+ ObjectOperation& op, ceph::buffer::list *pbl, int flags,
+ Context *onack, epoch_t *reply_epoch,
+ int *ctx_budget) {
+ Op *o = new Op(object_t(), oloc,
+ std::move(op.ops),
+ flags | global_op_flags | CEPH_OSD_FLAG_READ |
+ CEPH_OSD_FLAG_IGNORE_OVERLAY,
+ onack, NULL);
+ o->target.precalc_pgid = true;
+ o->target.base_pgid = pg_t(hash, oloc.pool);
+ o->priority = op.priority;
+ o->snapid = CEPH_NOSNAP;
+ o->outbl = pbl;
+ o->out_bl.swap(op.out_bl);
+ o->out_handler.swap(op.out_handler);
+ o->out_rval.swap(op.out_rval);
+ o->out_ec.swap(op.out_ec);
+ o->reply_epoch = reply_epoch;
+ if (ctx_budget) {
+ // budget is tracked by listing context
+ o->ctx_budgeted = true;
+ }
+ op.clear();
+ return o;
+ }
+ ceph_tid_t pg_read(
+ uint32_t hash, object_locator_t oloc,
+ ObjectOperation& op, ceph::buffer::list *pbl, int flags,
+ Context *onack, epoch_t *reply_epoch,
+ int *ctx_budget) {
+ Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
+ onack, reply_epoch, ctx_budget);
+ ceph_tid_t tid;
+ op_submit(o, &tid, ctx_budget);
+ return tid;
+ }
+
+ ceph_tid_t pg_read(
+ uint32_t hash, object_locator_t oloc,
+ ObjectOperation& op, ceph::buffer::list *pbl, int flags,
+ std::unique_ptr<Op::OpComp>&& onack, epoch_t *reply_epoch, int *ctx_budget) {
+ ceph_tid_t tid;
+ Op *o = new Op(object_t(), oloc,
+ std::move(op.ops),
+ flags | global_op_flags | CEPH_OSD_FLAG_READ |
+ CEPH_OSD_FLAG_IGNORE_OVERLAY,
+ std::move(onack), nullptr);
+ o->target.precalc_pgid = true;
+ o->target.base_pgid = pg_t(hash, oloc.pool);
+ o->priority = op.priority;
+ o->snapid = CEPH_NOSNAP;
+ o->outbl = pbl;
+ o->out_bl.swap(op.out_bl);
+ o->out_handler.swap(op.out_handler);
+ o->out_rval.swap(op.out_rval);
+ o->out_ec.swap(op.out_ec);
+ o->reply_epoch = reply_epoch;
+ if (ctx_budget) {
+ // budget is tracked by listing context
+ o->ctx_budgeted = true;
+ }
+ op_submit(o, &tid, ctx_budget);
+ op.clear();
+ return tid;
+ }
+
+ // caller owns a ref
+ LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
+ int flags);
+ ceph_tid_t linger_watch(LingerOp *info,
+ ObjectOperation& op,
+ const SnapContext& snapc, ceph::real_time mtime,
+ ceph::buffer::list& inbl,
+ decltype(info->on_reg_commit)&& oncommit,
+ version_t *objver);
+ ceph_tid_t linger_watch(LingerOp *info,
+ ObjectOperation& op,
+ const SnapContext& snapc, ceph::real_time mtime,
+ ceph::buffer::list& inbl,
+ Context* onfinish,
+ version_t *objver) {
+ return linger_watch(info, op, snapc, mtime, inbl,
+ OpContextVert<ceph::buffer::list>(onfinish, nullptr), objver);
+ }
+ ceph_tid_t linger_notify(LingerOp *info,
+ ObjectOperation& op,
+ snapid_t snap, ceph::buffer::list& inbl,
+ decltype(LingerOp::on_reg_commit)&& onfinish,
+ version_t *objver);
+ ceph_tid_t linger_notify(LingerOp *info,
+ ObjectOperation& op,
+ snapid_t snap, ceph::buffer::list& inbl,
+ ceph::buffer::list *poutbl,
+ Context* onack,
+ version_t *objver) {
+ return linger_notify(info, op, snap, inbl,
+ OpContextVert(onack, poutbl),
+ objver);
+ }
+ tl::expected<ceph::timespan,
+ boost::system::error_code> linger_check(LingerOp *info);
+ void linger_cancel(LingerOp *info); // releases a reference
+ void _linger_cancel(LingerOp *info);
+
+ void _do_watch_notify(boost::intrusive_ptr<LingerOp> info,
+ boost::intrusive_ptr<MWatchNotify> m);
+
+ /**
+ * set up initial ops in the op std::vector, and allocate a final op slot.
+ *
+ * The caller is responsible for filling in the final ops_count ops.
+ *
+ * @param ops op std::vector
+ * @param ops_count number of final ops the caller will fill in
+ * @param extra_ops pointer to [array of] initial op[s]
+ * @return index of final op (for caller to fill in)
+ */
+ int init_ops(boost::container::small_vector_base<OSDOp>& ops, int ops_count,
+ ObjectOperation *extra_ops) {
+ int i;
+ int extra = 0;
+
+ if (extra_ops)
+ extra = extra_ops->ops.size();
+
+ ops.resize(ops_count + extra);
+
+ for (i=0; i<extra; i++) {
+ ops[i] = extra_ops->ops[i];
+ }
+
+ return i;
+ }
+
+
+ // high-level helpers
+ Op *prepare_stat_op(
+ const object_t& oid, const object_locator_t& oloc,
+ snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
+ int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_STAT;
+ C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, fin, objver);
+ o->snapid = snap;
+ o->outbl = &fin->bl;
+ return o;
+ }
+ ceph_tid_t stat(
+ const object_t& oid, const object_locator_t& oloc,
+ snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
+ int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
+ onfinish, objver, extra_ops);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ Op *prepare_read_op(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
+ int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0,
+ ZTracer::Trace *parent_trace = nullptr) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_READ;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = len;
+ ops[i].op.extent.truncate_size = 0;
+ ops[i].op.extent.truncate_seq = 0;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onfinish, objver,
+ nullptr, parent_trace);
+ o->snapid = snap;
+ o->outbl = pbl;
+ return o;
+ }
+ ceph_tid_t read(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
+ int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
+ onfinish, objver, extra_ops, op_flags);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ Op *prepare_cmpext_op(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, ceph::buffer::list &cmp_bl,
+ snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_CMPEXT;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = cmp_bl.length();
+ ops[i].op.extent.truncate_size = 0;
+ ops[i].op.extent.truncate_seq = 0;
+ ops[i].indata = cmp_bl;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onfinish, objver);
+ o->snapid = snap;
+ return o;
+ }
+
+ ceph_tid_t cmpext(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, ceph::buffer::list &cmp_bl,
+ snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
+ flags, onfinish, objver, extra_ops, op_flags);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snap,
+ ceph::buffer::list *pbl, int flags, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish,
+ version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_READ;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = len;
+ ops[i].op.extent.truncate_size = trunc_size;
+ ops[i].op.extent.truncate_seq = trunc_seq;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onfinish, objver);
+ o->snapid = snap;
+ o->outbl = pbl;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
+ int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_MAPEXT;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = len;
+ ops[i].op.extent.truncate_size = 0;
+ ops[i].op.extent.truncate_seq = 0;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onfinish, objver);
+ o->snapid = snap;
+ o->outbl = pbl;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
+ const char *name, snapid_t snap, ceph::buffer::list *pbl, int flags,
+ Context *onfinish,
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_GETXATTR;
+ ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
+ ops[i].op.xattr.value_len = 0;
+ if (name)
+ ops[i].indata.append(name, ops[i].op.xattr.name_len);
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onfinish, objver);
+ o->snapid = snap;
+ o->outbl = pbl;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
+ snapid_t snap, std::map<std::string,ceph::buffer::list>& attrset,
+ int flags, Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
+ C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, fin, objver);
+ o->snapid = snap;
+ o->outbl = &fin->bl;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
+ snapid_t snap, ceph::buffer::list *pbl, int flags,
+ Context *onfinish, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
+ CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
+ }
+
+
+ // writes
+ ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
+ osdc_opvec& ops,
+ ceph::real_time mtime,
+ const SnapContext& snapc, int flags,
+ Context *oncommit,
+ version_t *objver = NULL) {
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ Op *prepare_write_op(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, const SnapContext& snapc,
+ const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0,
+ ZTracer::Trace *parent_trace = nullptr) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_WRITE;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = len;
+ ops[i].op.extent.truncate_size = 0;
+ ops[i].op.extent.truncate_seq = 0;
+ ops[i].indata = bl;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
+ nullptr, parent_trace);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ return o;
+ }
+ ceph_tid_t write(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, const SnapContext& snapc,
+ const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
+ oncommit, objver, extra_ops, op_flags);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ Op *prepare_append_op(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t len, const SnapContext& snapc,
+ const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
+ Context *oncommit,
+ version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_APPEND;
+ ops[i].op.extent.offset = 0;
+ ops[i].op.extent.length = len;
+ ops[i].op.extent.truncate_size = 0;
+ ops[i].op.extent.truncate_seq = 0;
+ ops[i].indata = bl;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ return o;
+ }
+ ceph_tid_t append(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t len, const SnapContext& snapc,
+ const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
+ Context *oncommit,
+ version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
+ oncommit, objver, extra_ops);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, const SnapContext& snapc,
+ const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
+ uint64_t trunc_size, __u32 trunc_seq,
+ Context *oncommit,
+ version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_WRITE;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = len;
+ ops[i].op.extent.truncate_size = trunc_size;
+ ops[i].op.extent.truncate_seq = trunc_seq;
+ ops[i].indata = bl;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ Op *prepare_write_full_op(
+ const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, const ceph::buffer::list &bl,
+ ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
+ ops[i].op.extent.offset = 0;
+ ops[i].op.extent.length = bl.length();
+ ops[i].indata = bl;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ return o;
+ }
+ ceph_tid_t write_full(
+ const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, const ceph::buffer::list &bl,
+ ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+ Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
+ oncommit, objver, extra_ops, op_flags);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ Op *prepare_writesame_op(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t write_len, uint64_t off,
+ const SnapContext& snapc, const ceph::buffer::list &bl,
+ ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_WRITESAME;
+ ops[i].op.writesame.offset = off;
+ ops[i].op.writesame.length = write_len;
+ ops[i].op.writesame.data_length = bl.length();
+ ops[i].indata = bl;
+ ops[i].op.flags = op_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ return o;
+ }
+ ceph_tid_t writesame(
+ const object_t& oid, const object_locator_t& oloc,
+ uint64_t write_len, uint64_t off,
+ const SnapContext& snapc, const ceph::buffer::list &bl,
+ ceph::real_time mtime, int flags,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL, int op_flags = 0) {
+
+ Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
+ mtime, flags, oncommit, objver,
+ extra_ops, op_flags);
+
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, ceph::real_time mtime, int flags,
+ uint64_t trunc_size, __u32 trunc_seq,
+ Context *oncommit, version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
+ ops[i].op.extent.offset = trunc_size;
+ ops[i].op.extent.truncate_size = trunc_size;
+ ops[i].op.extent.truncate_seq = trunc_seq;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len, const SnapContext& snapc,
+ ceph::real_time mtime, int flags, Context *oncommit,
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_ZERO;
+ ops[i].op.extent.offset = off;
+ ops[i].op.extent.length = len;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, snapid_t snapid,
+ ceph::real_time mtime, Context *oncommit,
+ version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
+ ops[i].op.snap.snapid = snapid;
+ Op *o = new Op(oid, oloc, std::move(ops), CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, ceph::real_time mtime, int global_flags,
+ int create_flags, Context *oncommit,
+ version_t *objver = NULL,
+ ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_CREATE;
+ ops[i].op.flags = create_flags;
+ Op *o = new Op(oid, oloc, std::move(ops), global_flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ Op *prepare_remove_op(
+ const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, ceph::real_time mtime, int flags,
+ Context *oncommit,
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_DELETE;
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ return o;
+ }
+ ceph_tid_t remove(
+ const object_t& oid, const object_locator_t& oloc,
+ const SnapContext& snapc, ceph::real_time mtime, int flags,
+ Context *oncommit,
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
+ oncommit, objver, extra_ops);
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
+ const char *name, const SnapContext& snapc, const ceph::buffer::list &bl,
+ ceph::real_time mtime, int flags,
+ Context *oncommit,
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_SETXATTR;
+ ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
+ ops[i].op.xattr.value_len = bl.length();
+ if (name)
+ ops[i].indata.append(name, ops[i].op.xattr.name_len);
+ ops[i].indata.append(bl);
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit,
+ objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+ ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
+ const char *name, const SnapContext& snapc,
+ ceph::real_time mtime, int flags,
+ Context *oncommit,
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ osdc_opvec ops;
+ int i = init_ops(ops, 1, extra_ops);
+ ops[i].op.op = CEPH_OSD_OP_RMXATTR;
+ ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
+ ops[i].op.xattr.value_len = 0;
+ if (name)
+ ops[i].indata.append(name, ops[i].op.xattr.name_len);
+ Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
+ o->mtime = mtime;
+ o->snapc = snapc;
+ ceph_tid_t tid;
+ op_submit(o, &tid);
+ return tid;
+ }
+
+ void list_nobjects(NListContext *p, Context *onfinish);
+ uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
+ uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
+ void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
+
+ hobject_t enumerate_objects_begin();
+ hobject_t enumerate_objects_end();
+
+ template<typename T>
+ friend struct EnumerationContext;
+ template<typename T>
+ friend struct CB_EnumerateReply;
+ template<typename T>
+ void enumerate_objects(
+ int64_t pool_id,
+ std::string_view ns,
+ hobject_t start,
+ hobject_t end,
+ const uint32_t max,
+ const ceph::buffer::list& filter_bl,
+ fu2::unique_function<void(boost::system::error_code,
+ std::vector<T>,
+ hobject_t) &&> on_finish);
+ template<typename T>
+ void _issue_enumerate(hobject_t start,
+ std::unique_ptr<EnumerationContext<T>>);
+ template<typename T>
+ void _enumerate_reply(
+ ceph::buffer::list&& bl,
+ boost::system::error_code ec,
+ std::unique_ptr<EnumerationContext<T>>&& ectx);
+
+ // -------------------------
+ // pool ops
+private:
+ void pool_op_submit(PoolOp *op);
+ void _pool_op_submit(PoolOp *op);
+ void _finish_pool_op(PoolOp *op, int r);
+ void _do_delete_pool(int64_t pool,
+ decltype(PoolOp::onfinish)&& onfinish);
+
+public:
+ void create_pool_snap(int64_t pool, std::string_view snapName,
+ decltype(PoolOp::onfinish)&& onfinish);
+ void create_pool_snap(int64_t pool, std::string_view snapName,
+ Context* c) {
+ create_pool_snap(pool, snapName,
+ OpContextVert<ceph::buffer::list>(c, nullptr));
+ }
+ void allocate_selfmanaged_snap(int64_t pool,
+ std::unique_ptr<ceph::async::Completion<
+ void(boost::system::error_code,
+ snapid_t)>> onfinish);
+ void allocate_selfmanaged_snap(int64_t pool, snapid_t* psnapid,
+ Context* c) {
+ allocate_selfmanaged_snap(pool,
+ OpContextVert(c, psnapid));
+ }
+ void delete_pool_snap(int64_t pool, std::string_view snapName,
+ decltype(PoolOp::onfinish)&& onfinish);
+ void delete_pool_snap(int64_t pool, std::string_view snapName,
+ Context* c) {
+ delete_pool_snap(pool, snapName,
+ OpContextVert<ceph::buffer::list>(c, nullptr));
+ }
+
+ void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
+ decltype(PoolOp::onfinish)&& onfinish);
+ void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
+ Context* c) {
+ delete_selfmanaged_snap(pool, snap,
+ OpContextVert<ceph::buffer::list>(c, nullptr));
+ }
+
+
+ void create_pool(std::string_view name,
+ decltype(PoolOp::onfinish)&& onfinish,
+ int crush_rule=-1);
+ void create_pool(std::string_view name, Context *onfinish,
+ int crush_rule=-1) {
+ create_pool(name,
+ OpContextVert<ceph::buffer::list>(onfinish, nullptr),
+ crush_rule);
+ }
+ void delete_pool(int64_t pool,
+ decltype(PoolOp::onfinish)&& onfinish);
+ void delete_pool(int64_t pool,
+ Context* onfinish) {
+ delete_pool(pool, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
+ }
+
+ void delete_pool(std::string_view name,
+ decltype(PoolOp::onfinish)&& onfinish);
+
+ void delete_pool(std::string_view name,
+ Context* onfinish) {
+ delete_pool(name, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
+ }
+
+ void handle_pool_op_reply(MPoolOpReply *m);
+ int pool_op_cancel(ceph_tid_t tid, int r);
+
+ // --------------------------
+ // pool stats
+private:
+ void _poolstat_submit(PoolStatOp *op);
+public:
+ void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
+ void get_pool_stats(const std::vector<std::string>& pools,
+ decltype(PoolStatOp::onfinish)&& onfinish);
+ template<typename CompletionToken>
+ auto get_pool_stats(const std::vector<std::string>& pools,
+ CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken,
+ PoolStatOp::OpSig> init(token);
+ get_pool_stats(pools,
+ PoolStatOp::OpComp::create(
+ service.get_executor(),
+ std::move(init.completion_handler)));
+ return init.result.get();
+ }
+ int pool_stat_op_cancel(ceph_tid_t tid, int r);
+ void _finish_pool_stat_op(PoolStatOp *op, int r);
+
+ // ---------------------------
+ // df stats
+private:
+ void _fs_stats_submit(StatfsOp *op);
+public:
+ void handle_fs_stats_reply(MStatfsReply *m);
+ void get_fs_stats(std::optional<int64_t> poolid,
+ decltype(StatfsOp::onfinish)&& onfinish);
+ template<typename CompletionToken>
+ auto get_fs_stats(std::optional<int64_t> poolid,
+ CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, StatfsOp::OpSig> init(token);
+ get_fs_stats(poolid,
+ StatfsOp::OpComp::create(service.get_executor(),
+ std::move(init.completion_handler)));
+ return init.result.get();
+ }
+ void get_fs_stats(struct ceph_statfs& result, std::optional<int64_t> poolid,
+ Context *onfinish) {
+ get_fs_stats(poolid, OpContextVert(onfinish, result));
+ }
+ int statfs_op_cancel(ceph_tid_t tid, int r);
+ void _finish_statfs_op(StatfsOp *op, int r);
+
+ // ---------------------------
+ // some scatter/gather hackery
+
+ void _sg_read_finish(std::vector<ObjectExtent>& extents,
+ std::vector<ceph::buffer::list>& resultbl,
+ ceph::buffer::list *bl, Context *onfinish);
+
+ struct C_SGRead : public Context {
+ Objecter *objecter;
+ std::vector<ObjectExtent> extents;
+ std::vector<ceph::buffer::list> resultbl;
+ ceph::buffer::list *bl;
+ Context *onfinish;
+ C_SGRead(Objecter *ob,
+ std::vector<ObjectExtent>& e, std::vector<ceph::buffer::list>& r, ceph::buffer::list *b,
+ Context *c) :
+ objecter(ob), bl(b), onfinish(c) {
+ extents.swap(e);
+ resultbl.swap(r);
+ }
+ void finish(int r) override {
+ objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
+ }
+ };
+
+ void sg_read_trunc(std::vector<ObjectExtent>& extents, snapid_t snap,
+ ceph::buffer::list *bl, int flags, uint64_t trunc_size,
+ __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
+ if (extents.size() == 1) {
+ read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
+ extents[0].length, snap, bl, flags, extents[0].truncate_size,
+ trunc_seq, onfinish, 0, 0, op_flags);
+ } else {
+ C_GatherBuilder gather(cct);
+ std::vector<ceph::buffer::list> resultbl(extents.size());
+ int i=0;
+ for (auto p = extents.begin(); p != extents.end(); ++p) {
+ read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
+ flags, p->truncate_size, trunc_seq, gather.new_sub(),
+ 0, 0, op_flags);
+ }
+ gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
+ gather.activate();
+ }
+ }
+
+ void sg_read(std::vector<ObjectExtent>& extents, snapid_t snap, ceph::buffer::list *bl,
+ int flags, Context *onfinish, int op_flags = 0) {
+ sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
+ }
+
+ void sg_write_trunc(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
+ const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
+ uint64_t trunc_size, __u32 trunc_seq,
+ Context *oncommit, int op_flags = 0) {
+ if (extents.size() == 1) {
+ write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
+ extents[0].length, snapc, bl, mtime, flags,
+ extents[0].truncate_size, trunc_seq, oncommit,
+ 0, 0, op_flags);
+ } else {
+ C_GatherBuilder gcom(cct, oncommit);
+ auto it = bl.cbegin();
+ for (auto p = extents.begin(); p != extents.end(); ++p) {
+ ceph::buffer::list cur;
+ for (auto bit = p->buffer_extents.begin();
+ bit != p->buffer_extents.end();
+ ++bit) {
+ if (it.get_off() != bit->first) {
+ it.seek(bit->first);
+ }
+ it.copy(bit->second, cur);
+ }
+ ceph_assert(cur.length() == p->length);
+ write_trunc(p->oid, p->oloc, p->offset, p->length,
+ snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
+ oncommit ? gcom.new_sub():0,
+ 0, 0, op_flags);
+ }
+ gcom.activate();
+ }
+ }
+
+ void sg_write(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
+ const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
+ Context *oncommit, int op_flags = 0) {
+ sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
+ op_flags);
+ }
+
+ void ms_handle_connect(Connection *con) override;
+ bool ms_handle_reset(Connection *con) override;
+ void ms_handle_remote_reset(Connection *con) override;
+ bool ms_handle_refused(Connection *con) override;
+
+ void blocklist_self(bool set);
+
+private:
+ epoch_t epoch_barrier = 0;
+ bool retry_writes_after_first_reply =
+ cct->_conf->objecter_retry_writes_after_first_reply;
+
+public:
+ void set_epoch_barrier(epoch_t epoch);
+
+ PerfCounters *get_logger() {
+ return logger;
+ }
+};
+
+#endif