summaryrefslogtreecommitdiffstats
path: root/src/rgw/services/svc_mdlog.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/services/svc_mdlog.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/services/svc_mdlog.cc')
-rw-r--r--src/rgw/services/svc_mdlog.cc549
1 files changed, 549 insertions, 0 deletions
diff --git a/src/rgw/services/svc_mdlog.cc b/src/rgw/services/svc_mdlog.cc
new file mode 100644
index 000000000..09a68d3d7
--- /dev/null
+++ b/src/rgw/services/svc_mdlog.cc
@@ -0,0 +1,549 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "svc_mdlog.h"
+#include "svc_rados.h"
+#include "svc_zone.h"
+#include "svc_sys_obj.h"
+
+#include "rgw_tools.h"
+#include "rgw_mdlog.h"
+#include "rgw_coroutine.h"
+#include "rgw_cr_rados.h"
+#include "rgw_zone.h"
+
+#include "common/errno.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+
+using Svc = RGWSI_MDLog::Svc;
+using Cursor = RGWPeriodHistory::Cursor;
+
+RGWSI_MDLog::RGWSI_MDLog(CephContext *cct, bool _run_sync) : RGWServiceInstance(cct), run_sync(_run_sync) {
+}
+
+RGWSI_MDLog::~RGWSI_MDLog() {
+}
+
+int RGWSI_MDLog::init(RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_Cls *_cls_svc)
+{
+ svc.zone = _zone_svc;
+ svc.sysobj = _sysobj_svc;
+ svc.mdlog = this;
+ svc.rados = _rados_svc;
+ svc.cls = _cls_svc;
+
+ return 0;
+}
+
+int RGWSI_MDLog::do_start(optional_yield y, const DoutPrefixProvider *dpp)
+{
+ auto& current_period = svc.zone->get_current_period();
+
+ current_log = get_log(current_period.get_id());
+
+ period_puller.reset(new RGWPeriodPuller(svc.zone, svc.sysobj));
+ period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
+ current_period));
+
+ if (run_sync &&
+ svc.zone->need_to_sync()) {
+ // initialize the log period history
+ svc.mdlog->init_oldest_log_period(y, dpp);
+ }
+ return 0;
+}
+
+int RGWSI_MDLog::read_history(RGWMetadataLogHistory *state,
+ RGWObjVersionTracker *objv_tracker,
+ optional_yield y,
+ const DoutPrefixProvider *dpp) const
+{
+ auto& pool = svc.zone->get_zone_params().log_pool;
+ const auto& oid = RGWMetadataLogHistory::oid;
+ bufferlist bl;
+ int ret = rgw_get_system_obj(svc.sysobj, pool, oid, bl, objv_tracker, nullptr, y, dpp);
+ if (ret < 0) {
+ return ret;
+ }
+ if (bl.length() == 0) {
+ /* bad history object, remove it */
+ rgw_raw_obj obj(pool, oid);
+ auto sysobj = svc.sysobj->get_obj(obj);
+ ret = sysobj.wop().remove(dpp, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: meta history is empty, but cannot remove it (" << cpp_strerror(-ret) << ")" << dendl;
+ return ret;
+ }
+ return -ENOENT;
+ }
+ try {
+ auto p = bl.cbegin();
+ state->decode(p);
+ } catch (buffer::error& e) {
+ ldpp_dout(dpp, 1) << "failed to decode the mdlog history: "
+ << e.what() << dendl;
+ return -EIO;
+ }
+ return 0;
+}
+
+int RGWSI_MDLog::write_history(const DoutPrefixProvider *dpp,
+ const RGWMetadataLogHistory& state,
+ RGWObjVersionTracker *objv_tracker,
+ optional_yield y, bool exclusive)
+{
+ bufferlist bl;
+ state.encode(bl);
+
+ auto& pool = svc.zone->get_zone_params().log_pool;
+ const auto& oid = RGWMetadataLogHistory::oid;
+ return rgw_put_system_obj(dpp, svc.sysobj, pool, oid, bl,
+ exclusive, objv_tracker, real_time{}, y);
+}
+
+namespace mdlog {
+
+using Cursor = RGWPeriodHistory::Cursor;
+
+namespace {
+template <class T>
+class SysObjReadCR : public RGWSimpleCoroutine {
+ const DoutPrefixProvider *dpp;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWSI_SysObj *svc;
+
+ rgw_raw_obj obj;
+ T *result;
+ /// on ENOENT, call handle_data() with an empty object instead of failing
+ const bool empty_on_enoent;
+ RGWObjVersionTracker *objv_tracker;
+ RGWAsyncGetSystemObj *req{nullptr};
+
+public:
+ SysObjReadCR(const DoutPrefixProvider *_dpp,
+ RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
+ const rgw_raw_obj& _obj,
+ T *_result, bool empty_on_enoent = true,
+ RGWObjVersionTracker *objv_tracker = nullptr)
+ : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), svc(_svc),
+ obj(_obj), result(_result),
+ empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
+
+ ~SysObjReadCR() override {
+ try {
+ request_cleanup();
+ } catch (const boost::container::length_error_t& e) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
+ ": reference counted object mismatched, \"" << e.what() <<
+ "\"" << dendl;
+ }
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request(const DoutPrefixProvider *dpp) {
+ req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(), svc,
+ objv_tracker, obj, false, false);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ int ret = req->get_ret_status();
+ retcode = ret;
+ if (ret == -ENOENT && empty_on_enoent) {
+ *result = T();
+ } else {
+ if (ret < 0) {
+ return ret;
+ }
+ if (objv_tracker) { // copy the updated version
+ *objv_tracker = req->objv_tracker;
+ }
+ try {
+ auto iter = req->bl.cbegin();
+ if (iter.end()) {
+ // allow successful reads with empty buffers. ReadSyncStatus
+ // coroutines depend on this to be able to read without
+ // locking, because the cls lock from InitSyncStatus will
+ // create an empty object if it didn't exist
+ *result = T();
+ } else {
+ decode(*result, iter);
+ }
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ }
+ return handle_data(*result);
+ }
+
+ virtual int handle_data(T& data) {
+ return 0;
+ }
+};
+
+template <class T>
+class SysObjWriteCR : public RGWSimpleCoroutine {
+ const DoutPrefixProvider *dpp;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWSI_SysObj *svc;
+ bufferlist bl;
+ rgw_raw_obj obj;
+ RGWObjVersionTracker *objv_tracker;
+ bool exclusive;
+ RGWAsyncPutSystemObj *req{nullptr};
+
+public:
+ SysObjWriteCR(const DoutPrefixProvider *_dpp,
+ RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
+ const rgw_raw_obj& _obj, const T& _data,
+ RGWObjVersionTracker *objv_tracker = nullptr,
+ bool exclusive = false)
+ : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados),
+ svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) {
+ encode(_data, bl);
+ }
+
+ ~SysObjWriteCR() override {
+ try {
+ request_cleanup();
+ } catch (const boost::container::length_error_t& e) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
+ ": reference counted object mismatched, \"" << e.what() <<
+ "\"" << dendl;
+ }
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request(const DoutPrefixProvider *dpp) override {
+ req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(),
+ svc, objv_tracker, obj, exclusive, std::move(bl));
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ if (objv_tracker) { // copy the updated version
+ *objv_tracker = req->objv_tracker;
+ }
+ return req->get_ret_status();
+ }
+};
+}
+
+/// read the mdlog history and use it to initialize the given cursor
+class ReadHistoryCR : public RGWCoroutine {
+ const DoutPrefixProvider *dpp;
+ Svc svc;
+ Cursor *cursor;
+ RGWObjVersionTracker *objv_tracker;
+ RGWMetadataLogHistory state;
+ RGWAsyncRadosProcessor *async_processor;
+
+ public:
+ ReadHistoryCR(const DoutPrefixProvider *dpp,
+ const Svc& svc,
+ Cursor *cursor,
+ RGWObjVersionTracker *objv_tracker)
+ : RGWCoroutine(svc.zone->ctx()), dpp(dpp), svc(svc),
+ cursor(cursor),
+ objv_tracker(objv_tracker),
+ async_processor(svc.rados->get_async_processor())
+ {}
+
+ int operate(const DoutPrefixProvider *dpp) {
+ reenter(this) {
+ yield {
+ rgw_raw_obj obj{svc.zone->get_zone_params().log_pool,
+ RGWMetadataLogHistory::oid};
+ constexpr bool empty_on_enoent = false;
+
+ using ReadCR = SysObjReadCR<RGWMetadataLogHistory>;
+ call(new ReadCR(dpp, async_processor, svc.sysobj, obj,
+ &state, empty_on_enoent, objv_tracker));
+ }
+ if (retcode < 0) {
+ ldpp_dout(dpp, 1) << "failed to read mdlog history: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ *cursor = svc.mdlog->period_history->lookup(state.oldest_realm_epoch);
+ if (!*cursor) {
+ return set_cr_error(cursor->get_error());
+ }
+
+ ldpp_dout(dpp, 10) << "read mdlog history with oldest period id="
+ << state.oldest_period_id << " realm_epoch="
+ << state.oldest_realm_epoch << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+/// write the given cursor to the mdlog history
+class WriteHistoryCR : public RGWCoroutine {
+ const DoutPrefixProvider *dpp;
+ Svc svc;
+ Cursor cursor;
+ RGWObjVersionTracker *objv;
+ RGWMetadataLogHistory state;
+ RGWAsyncRadosProcessor *async_processor;
+
+ public:
+ WriteHistoryCR(const DoutPrefixProvider *dpp,
+ Svc& svc,
+ const Cursor& cursor,
+ RGWObjVersionTracker *objv)
+ : RGWCoroutine(svc.zone->ctx()), dpp(dpp), svc(svc),
+ cursor(cursor), objv(objv),
+ async_processor(svc.rados->get_async_processor())
+ {}
+
+ int operate(const DoutPrefixProvider *dpp) {
+ reenter(this) {
+ state.oldest_period_id = cursor.get_period().get_id();
+ state.oldest_realm_epoch = cursor.get_epoch();
+
+ yield {
+ rgw_raw_obj obj{svc.zone->get_zone_params().log_pool,
+ RGWMetadataLogHistory::oid};
+
+ using WriteCR = SysObjWriteCR<RGWMetadataLogHistory>;
+ call(new WriteCR(dpp, async_processor, svc.sysobj, obj, state, objv));
+ }
+ if (retcode < 0) {
+ ldpp_dout(dpp, 1) << "failed to write mdlog history: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ ldpp_dout(dpp, 10) << "wrote mdlog history with oldest period id="
+ << state.oldest_period_id << " realm_epoch="
+ << state.oldest_realm_epoch << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+/// update the mdlog history to reflect trimmed logs
+class TrimHistoryCR : public RGWCoroutine {
+ const DoutPrefixProvider *dpp;
+ Svc svc;
+ const Cursor cursor; //< cursor to trimmed period
+ RGWObjVersionTracker *objv; //< to prevent racing updates
+ Cursor next; //< target cursor for oldest log period
+ Cursor existing; //< existing cursor read from disk
+
+ public:
+ TrimHistoryCR(const DoutPrefixProvider *dpp, const Svc& svc, Cursor cursor, RGWObjVersionTracker *objv)
+ : RGWCoroutine(svc.zone->ctx()), dpp(dpp), svc(svc),
+ cursor(cursor), objv(objv), next(cursor) {
+ next.next(); // advance past cursor
+ }
+
+ int operate(const DoutPrefixProvider *dpp) {
+ reenter(this) {
+ // read an existing history, and write the new history if it's newer
+ yield call(new ReadHistoryCR(dpp, svc, &existing, objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ // reject older trims with ECANCELED
+ if (cursor.get_epoch() < existing.get_epoch()) {
+ ldpp_dout(dpp, 4) << "found oldest log epoch=" << existing.get_epoch()
+ << ", rejecting trim at epoch=" << cursor.get_epoch() << dendl;
+ return set_cr_error(-ECANCELED);
+ }
+ // overwrite with updated history
+ yield call(new WriteHistoryCR(dpp, svc, next, objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+} // mdlog namespace
+
+// traverse all the way back to the beginning of the period history, and
+// return a cursor to the first period in a fully attached history
+Cursor RGWSI_MDLog::find_oldest_period(const DoutPrefixProvider *dpp, optional_yield y)
+{
+ auto cursor = period_history->get_current();
+
+ while (cursor) {
+ // advance to the period's predecessor
+ if (!cursor.has_prev()) {
+ auto& predecessor = cursor.get_period().get_predecessor();
+ if (predecessor.empty()) {
+ // this is the first period, so our logs must start here
+ ldpp_dout(dpp, 10) << "find_oldest_period returning first "
+ "period " << cursor.get_period().get_id() << dendl;
+ return cursor;
+ }
+ // pull the predecessor and add it to our history
+ RGWPeriod period;
+ int r = period_puller->pull(dpp, predecessor, period, y);
+ if (r < 0) {
+ return cursor;
+ }
+ auto prev = period_history->insert(std::move(period));
+ if (!prev) {
+ return prev;
+ }
+ ldpp_dout(dpp, 20) << "find_oldest_period advancing to "
+ "predecessor period " << predecessor << dendl;
+ ceph_assert(cursor.has_prev());
+ }
+ cursor.prev();
+ }
+ ldpp_dout(dpp, 10) << "find_oldest_period returning empty cursor" << dendl;
+ return cursor;
+}
+
+Cursor RGWSI_MDLog::init_oldest_log_period(optional_yield y, const DoutPrefixProvider *dpp)
+{
+ // read the mdlog history
+ RGWMetadataLogHistory state;
+ RGWObjVersionTracker objv;
+ int ret = read_history(&state, &objv, y, dpp);
+
+ if (ret == -ENOENT) {
+ // initialize the mdlog history and write it
+ ldpp_dout(dpp, 10) << "initializing mdlog history" << dendl;
+ auto cursor = find_oldest_period(dpp, y);
+ if (!cursor) {
+ return cursor;
+ }
+ // write the initial history
+ state.oldest_realm_epoch = cursor.get_epoch();
+ state.oldest_period_id = cursor.get_period().get_id();
+
+ constexpr bool exclusive = true; // don't overwrite
+ int ret = write_history(dpp, state, &objv, y, exclusive);
+ if (ret < 0 && ret != -EEXIST) {
+ ldpp_dout(dpp, 1) << "failed to write mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+ return cursor;
+ } else if (ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to read mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+
+ // if it's already in the history, return it
+ auto cursor = period_history->lookup(state.oldest_realm_epoch);
+ if (cursor) {
+ return cursor;
+ } else {
+ cursor = find_oldest_period(dpp, y);
+ state.oldest_realm_epoch = cursor.get_epoch();
+ state.oldest_period_id = cursor.get_period().get_id();
+ ldpp_dout(dpp, 10) << "rewriting mdlog history" << dendl;
+ ret = write_history(dpp, state, &objv, y);
+ if (ret < 0 && ret != -ECANCELED) {
+ ldpp_dout(dpp, 1) << "failed to write mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+ return cursor;
+ }
+
+ // pull the oldest period by id
+ RGWPeriod period;
+ ret = period_puller->pull(dpp, state.oldest_period_id, period, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to read period id=" << state.oldest_period_id
+ << " for mdlog history: " << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+ // verify its realm_epoch
+ if (period.get_realm_epoch() != state.oldest_realm_epoch) {
+ ldpp_dout(dpp, 1) << "inconsistent mdlog history: read period id="
+ << period.get_id() << " with realm_epoch=" << period.get_realm_epoch()
+ << ", expected realm_epoch=" << state.oldest_realm_epoch << dendl;
+ return Cursor{-EINVAL};
+ }
+ // attach the period to our history
+ return period_history->attach(dpp, std::move(period), y);
+}
+
+Cursor RGWSI_MDLog::read_oldest_log_period(optional_yield y, const DoutPrefixProvider *dpp) const
+{
+ RGWMetadataLogHistory state;
+ int ret = read_history(&state, nullptr, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to read mdlog history: "
+ << cpp_strerror(ret) << dendl;
+ return Cursor{ret};
+ }
+
+ ldpp_dout(dpp, 10) << "read mdlog history with oldest period id="
+ << state.oldest_period_id << " realm_epoch="
+ << state.oldest_realm_epoch << dendl;
+
+ return period_history->lookup(state.oldest_realm_epoch);
+}
+
+RGWCoroutine* RGWSI_MDLog::read_oldest_log_period_cr(const DoutPrefixProvider *dpp,
+ Cursor *period, RGWObjVersionTracker *objv) const
+{
+ return new mdlog::ReadHistoryCR(dpp, svc, period, objv);
+}
+
+RGWCoroutine* RGWSI_MDLog::trim_log_period_cr(const DoutPrefixProvider *dpp,
+ Cursor period, RGWObjVersionTracker *objv) const
+{
+ return new mdlog::TrimHistoryCR(dpp, svc, period, objv);
+}
+
+RGWMetadataLog* RGWSI_MDLog::get_log(const std::string& period)
+{
+ // construct the period's log in place if it doesn't exist
+ auto insert = md_logs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(period),
+ std::forward_as_tuple(cct, svc.zone, svc.cls, period));
+ return &insert.first->second;
+}
+
+int RGWSI_MDLog::add_entry(const DoutPrefixProvider *dpp, const string& hash_key, const string& section, const string& key, bufferlist& bl)
+{
+ ceph_assert(current_log); // must have called init()
+ return current_log->add_entry(dpp, hash_key, section, key, bl);
+}
+
+int RGWSI_MDLog::get_shard_id(const string& hash_key, int *shard_id)
+{
+ ceph_assert(current_log); // must have called init()
+ return current_log->get_shard_id(hash_key, shard_id);
+}
+
+int RGWSI_MDLog::pull_period(const DoutPrefixProvider *dpp, const std::string& period_id, RGWPeriod& period,
+ optional_yield y)
+{
+ return period_puller->pull(dpp, period_id, period, y);
+}
+