diff options
Diffstat (limited to 'src/messages/MOSDOpReply.h')
-rw-r--r-- | src/messages/MOSDOpReply.h | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h new file mode 100644 index 000000000..54c5157c2 --- /dev/null +++ b/src/messages/MOSDOpReply.h @@ -0,0 +1,353 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef CEPH_MOSDOPREPLY_H +#define CEPH_MOSDOPREPLY_H + +#include "msg/Message.h" + +#include "MOSDOp.h" +#include "common/errno.h" + +/* + * OSD op reply + * + * oid - object id + * op - OSD_OP_DELETE, etc. + * + */ + +class MOSDOpReply final : public Message { +private: + static constexpr int HEAD_VERSION = 8; + static constexpr int COMPAT_VERSION = 2; + + object_t oid; + pg_t pgid; + std::vector<OSDOp> ops; + bool bdata_encode; + int64_t flags = 0; + errorcode32_t result; + eversion_t bad_replay_version; + eversion_t replay_version; + version_t user_version = 0; + epoch_t osdmap_epoch = 0; + int32_t retry_attempt = -1; + bool do_redirect; + request_redirect_t redirect; + +public: + const object_t& get_oid() const { return oid; } + const pg_t& get_pg() const { return pgid; } + int get_flags() const { return flags; } + + bool is_ondisk() const { return get_flags() & CEPH_OSD_FLAG_ONDISK; } + bool is_onnvram() const { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; } + + int get_result() const { return result; } + const eversion_t& get_replay_version() const { return replay_version; } + const version_t& get_user_version() const { return user_version; } + + void set_result(int r) { result = r; } + + void set_reply_versions(eversion_t v, version_t uv) { + replay_version = v; + user_version = uv; + /* We go through some shenanigans here for backwards compatibility + * with old clients, who do not look at our replay_version and + * user_version but instead see what we now call the + * bad_replay_version. On pools without caching + * the user_version infrastructure is a slightly-laggy copy of + * the regular pg version/at_version infrastructure; the difference + * being it is not updated on watch ops like that is -- but on updates + * it is set equal to at_version. This means that for non-watch write ops + * on classic pools, all three of replay_version, user_version, and + * bad_replay_version are identical. But for watch ops the replay_version + * has been updated, while the user_at_version has not, and the semantics + * we promised old clients are that the version they see is not an update. + * So set the bad_replay_version to be the same as the user_at_version. */ + bad_replay_version = v; + if (uv) { + bad_replay_version.version = uv; + } + } + + /* Don't fill in replay_version for non-write ops */ + void set_enoent_reply_versions(const eversion_t& v, const version_t& uv) { + user_version = uv; + bad_replay_version = v; + } + + void set_redirect(const request_redirect_t& redir) { redirect = redir; } + const request_redirect_t& get_redirect() const { return redirect; } + bool is_redirect_reply() const { return do_redirect; } + + void add_flags(int f) { flags |= f; } + + void claim_op_out_data(std::vector<OSDOp>& o) { + ceph_assert(ops.size() == o.size()); + for (unsigned i = 0; i < o.size(); i++) { + ops[i].outdata = std::move(o[i].outdata); + } + } + void claim_ops(std::vector<OSDOp>& o) { + o.swap(ops); + bdata_encode = false; + } + void set_op_returns(const std::vector<pg_log_op_return_item_t>& op_returns) { + if (op_returns.size()) { + ceph_assert(ops.empty() || ops.size() == op_returns.size()); + ops.resize(op_returns.size()); + for (unsigned i = 0; i < op_returns.size(); ++i) { + ops[i].rval = op_returns[i].rval; + ops[i].outdata = op_returns[i].bl; + } + } + } + + /** + * get retry attempt + * + * If we don't know the attempt (because the server is old), return -1. + */ + int get_retry_attempt() const { + return retry_attempt; + } + + // osdmap + epoch_t get_map_epoch() const { return osdmap_epoch; } + + /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(), + head.client_inc, + head.tid); } + */ + +public: + MOSDOpReply() + : Message{CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION}, + bdata_encode(false) { + do_redirect = false; + } + MOSDOpReply(const MOSDOp *req, int r, epoch_t e, int acktype, + bool ignore_out_data) + : Message{CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION}, + oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops), + bdata_encode(false) { + + set_tid(req->get_tid()); + result = r; + flags = + (req->flags & ~(CEPH_OSD_FLAG_ONDISK|CEPH_OSD_FLAG_ONNVRAM|CEPH_OSD_FLAG_ACK)) | acktype; + osdmap_epoch = e; + user_version = 0; + retry_attempt = req->get_retry_attempt(); + do_redirect = false; + + for (unsigned i = 0; i < ops.size(); i++) { + // zero out input data + ops[i].indata.clear(); + if (ignore_out_data) { + // original request didn't set the RETURNVEC flag + ops[i].outdata.clear(); + } + } + } +private: + ~MOSDOpReply() final {} + +public: + void encode_payload(uint64_t features) override { + using ceph::encode; + if(false == bdata_encode) { + OSDOp::merge_osd_op_vector_out_data(ops, data); + bdata_encode = true; + } + + if ((features & CEPH_FEATURE_PGID64) == 0) { + header.version = 1; + ceph_osd_reply_head head; + memset(&head, 0, sizeof(head)); + head.layout.ol_pgid = pgid.get_old_pg().v; + head.flags = flags; + head.osdmap_epoch = osdmap_epoch; + head.reassert_version = bad_replay_version; + head.result = result; + head.num_ops = ops.size(); + head.object_len = oid.name.length(); + encode(head, payload); + for (unsigned i = 0; i < head.num_ops; i++) { + encode(ops[i].op, payload); + } + ceph::encode_nohead(oid.name, payload); + } else { + header.version = HEAD_VERSION; + encode(oid, payload); + encode(pgid, payload); + encode(flags, payload); + encode(result, payload); + encode(bad_replay_version, payload); + encode(osdmap_epoch, payload); + + __u32 num_ops = ops.size(); + encode(num_ops, payload); + for (unsigned i = 0; i < num_ops; i++) + encode(ops[i].op, payload); + + encode(retry_attempt, payload); + + for (unsigned i = 0; i < num_ops; i++) + encode(ops[i].rval, payload); + + encode(replay_version, payload); + encode(user_version, payload); + if ((features & CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING) == 0) { + header.version = 6; + encode(redirect, payload); + } else { + do_redirect = !redirect.empty(); + encode(do_redirect, payload); + if (do_redirect) { + encode(redirect, payload); + } + } + encode_trace(payload, features); + } + } + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + + // Always keep here the newest version of decoding order/rule + if (header.version == HEAD_VERSION) { + decode(oid, p); + decode(pgid, p); + decode(flags, p); + decode(result, p); + decode(bad_replay_version, p); + decode(osdmap_epoch, p); + + __u32 num_ops = ops.size(); + decode(num_ops, p); + ops.resize(num_ops); + for (unsigned i = 0; i < num_ops; i++) + decode(ops[i].op, p); + decode(retry_attempt, p); + + for (unsigned i = 0; i < num_ops; ++i) + decode(ops[i].rval, p); + + OSDOp::split_osd_op_vector_out_data(ops, data); + + decode(replay_version, p); + decode(user_version, p); + decode(do_redirect, p); + if (do_redirect) + decode(redirect, p); + decode_trace(p); + } else if (header.version < 2) { + ceph_osd_reply_head head; + decode(head, p); + ops.resize(head.num_ops); + for (unsigned i = 0; i < head.num_ops; i++) { + decode(ops[i].op, p); + } + ceph::decode_nohead(head.object_len, oid.name, p); + pgid = pg_t(head.layout.ol_pgid); + result = (int32_t)head.result; + flags = head.flags; + replay_version = head.reassert_version; + user_version = replay_version.version; + osdmap_epoch = head.osdmap_epoch; + retry_attempt = -1; + } else { + decode(oid, p); + decode(pgid, p); + decode(flags, p); + decode(result, p); + decode(bad_replay_version, p); + decode(osdmap_epoch, p); + + __u32 num_ops = ops.size(); + decode(num_ops, p); + ops.resize(num_ops); + for (unsigned i = 0; i < num_ops; i++) + decode(ops[i].op, p); + + if (header.version >= 3) + decode(retry_attempt, p); + else + retry_attempt = -1; + + if (header.version >= 4) { + for (unsigned i = 0; i < num_ops; ++i) + decode(ops[i].rval, p); + + OSDOp::split_osd_op_vector_out_data(ops, data); + } + + if (header.version >= 5) { + decode(replay_version, p); + decode(user_version, p); + } else { + replay_version = bad_replay_version; + user_version = replay_version.version; + } + + if (header.version == 6) { + decode(redirect, p); + do_redirect = !redirect.empty(); + } + if (header.version >= 7) { + decode(do_redirect, p); + if (do_redirect) { + decode(redirect, p); + } + } + if (header.version >= 8) { + decode_trace(p); + } + } + } + + std::string_view get_type_name() const override { return "osd_op_reply"; } + + void print(std::ostream& out) const override { + out << "osd_op_reply(" << get_tid() + << " " << oid << " " << ops + << " v" << get_replay_version() + << " uv" << get_user_version(); + if (is_ondisk()) + out << " ondisk"; + else if (is_onnvram()) + out << " onnvram"; + else + out << " ack"; + out << " = " << get_result(); + if (get_result() < 0) { + out << " (" << cpp_strerror(get_result()) << ")"; + } + if (is_redirect_reply()) { + out << " redirect: { " << redirect << " }"; + } + out << ")"; + } + +private: + template<class T, typename... Args> + friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args); +}; + + +#endif |