summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_sync_trace.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_sync_trace.cc')
-rw-r--r--src/rgw/rgw_sync_trace.cc285
1 files changed, 285 insertions, 0 deletions
diff --git a/src/rgw/rgw_sync_trace.cc b/src/rgw/rgw_sync_trace.cc
new file mode 100644
index 000000000..e99fdcf50
--- /dev/null
+++ b/src/rgw/rgw_sync_trace.cc
@@ -0,0 +1,285 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#ifndef CEPH_RGW_SYNC_TRACE_H
+#define CEPH_RGW_SYNC_TRACE_H
+
+#include <regex>
+
+#include "common/debug.h"
+#include "common/ceph_json.h"
+
+#include "rgw_sync_trace.h"
+#include "rgw_rados.h"
+#include "rgw_worker.h"
+
+
+#define dout_context g_ceph_context
+
+RGWSyncTraceNode::RGWSyncTraceNode(CephContext *_cct, uint64_t _handle,
+ const RGWSyncTraceNodeRef& _parent,
+ const string& _type, const string& _id) : cct(_cct),
+ parent(_parent),
+ type(_type),
+ id(_id),
+ handle(_handle),
+ history(cct->_conf->rgw_sync_trace_per_node_log_size)
+{
+ if (parent.get()) {
+ prefix = parent->get_prefix();
+ }
+
+ if (!type.empty()) {
+ prefix += type;
+ if (!id.empty()) {
+ prefix += "[" + id + "]";
+ }
+ prefix += ":";
+ }
+}
+
+void RGWSyncTraceNode::log(int level, const string& s)
+{
+ status = s;
+ history.push_back(status);
+ /* dump output on either rgw_sync, or rgw -- but only once */
+ if (cct->_conf->subsys.should_gather(ceph_subsys_rgw_sync, level)) {
+ lsubdout(cct, rgw_sync,
+ ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
+ } else {
+ lsubdout(cct, rgw,
+ ceph::dout::need_dynamic(level)) << "RGW-SYNC:" << to_str() << dendl;
+ }
+}
+
+
+class RGWSyncTraceServiceMapThread : public RGWRadosThread {
+ RGWRados *store;
+ RGWSyncTraceManager *manager;
+
+ uint64_t interval_msec() override {
+ return cct->_conf->rgw_sync_trace_servicemap_update_interval * 1000;
+ }
+public:
+ RGWSyncTraceServiceMapThread(RGWRados *_store, RGWSyncTraceManager *_manager)
+ : RGWRadosThread(_store, "sync-trace"), store(_store), manager(_manager) {}
+
+ int process(const DoutPrefixProvider *dpp) override;
+};
+
+int RGWSyncTraceServiceMapThread::process(const DoutPrefixProvider *dpp)
+{
+ map<string, string> status;
+ status["current_sync"] = manager->get_active_names();
+ int ret = store->update_service_map(std::move(status));
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: update_service_map() returned ret=" << ret << dendl;
+ }
+ return 0;
+}
+
+RGWSyncTraceNodeRef RGWSyncTraceManager::add_node(const RGWSyncTraceNodeRef& parent,
+ const std::string& type,
+ const std::string& id)
+{
+ shunique_lock wl(lock, ceph::acquire_unique);
+ auto handle = alloc_handle();
+ RGWSyncTraceNodeRef& ref = nodes[handle];
+ ref.reset(new RGWSyncTraceNode(cct, handle, parent, type, id));
+ // return a separate shared_ptr that calls finish() on the node instead of
+ // deleting it. the lambda capture holds a reference to the original 'ref'
+ auto deleter = [ref, this] (RGWSyncTraceNode *node) { finish_node(node); };
+ return {ref.get(), deleter};
+}
+
+bool RGWSyncTraceNode::match(const string& search_term, bool search_history)
+{
+ try {
+ std::regex expr(search_term);
+ std::smatch m;
+
+ if (regex_search(prefix, m, expr)) {
+ return true;
+ }
+ if (regex_search(status, m,expr)) {
+ return true;
+ }
+ if (!search_history) {
+ return false;
+ }
+
+ for (auto h : history) {
+ if (regex_search(h, m, expr)) {
+ return true;
+ }
+ }
+ } catch (const std::regex_error& e) {
+ ldout(cct, 5) << "NOTICE: sync trace: bad expression: bad regex search term" << dendl;
+ }
+
+ return false;
+}
+
+void RGWSyncTraceManager::init(RGWRados *store)
+{
+ service_map_thread = new RGWSyncTraceServiceMapThread(store, this);
+ service_map_thread->start();
+}
+
+RGWSyncTraceManager::~RGWSyncTraceManager()
+{
+ cct->get_admin_socket()->unregister_commands(this);
+ service_map_thread->stop();
+ delete service_map_thread;
+
+ nodes.clear();
+}
+
+int RGWSyncTraceManager::hook_to_admin_command()
+{
+ AdminSocket *admin_socket = cct->get_admin_socket();
+
+ admin_commands = { { "sync trace show name=search,type=CephString,req=false", "sync trace show [filter_str]: show current multisite tracing information" },
+ { "sync trace history name=search,type=CephString,req=false", "sync trace history [filter_str]: show history of multisite tracing information" },
+ { "sync trace active name=search,type=CephString,req=false", "show active multisite sync entities information" },
+ { "sync trace active_short name=search,type=CephString,req=false", "show active multisite sync entities entries" } };
+ for (auto cmd : admin_commands) {
+ int r = admin_socket->register_command(cmd[0], this,
+ cmd[1]);
+ if (r < 0) {
+ lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
+ return r;
+ }
+ }
+ return 0;
+}
+
+static void dump_node(RGWSyncTraceNode *entry, bool show_history, Formatter *f)
+{
+ f->open_object_section("entry");
+ ::encode_json("status", entry->to_str(), f);
+ if (show_history) {
+ f->open_array_section("history");
+ for (auto h : entry->get_history()) {
+ ::encode_json("entry", h, f);
+ }
+ f->close_section();
+ }
+ f->close_section();
+}
+
+string RGWSyncTraceManager::get_active_names()
+{
+ shunique_lock rl(lock, ceph::acquire_shared);
+
+ stringstream ss;
+ JSONFormatter f;
+
+ f.open_array_section("result");
+ for (auto n : nodes) {
+ auto& entry = n.second;
+
+ if (!entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
+ continue;
+ }
+ const string& name = entry->get_resource_name();
+ if (!name.empty()) {
+ ::encode_json("entry", name, &f);
+ }
+ f.flush(ss);
+ }
+ f.close_section();
+ f.flush(ss);
+
+ return ss.str();
+}
+
+int RGWSyncTraceManager::call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& ss,
+ bufferlist& out) {
+
+ bool show_history = (command == "sync trace history");
+ bool show_short = (command == "sync trace active_short");
+ bool show_active = (command == "sync trace active") || show_short;
+
+ string search;
+
+ auto si = cmdmap.find("search");
+ if (si != cmdmap.end()) {
+ search = boost::get<string>(si->second);
+ }
+
+ shunique_lock rl(lock, ceph::acquire_shared);
+
+ f->open_object_section("result");
+ f->open_array_section("running");
+ for (auto n : nodes) {
+ auto& entry = n.second;
+
+ if (!search.empty() && !entry->match(search, show_history)) {
+ continue;
+ }
+ if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
+ continue;
+ }
+ if (show_short) {
+ const string& name = entry->get_resource_name();
+ if (!name.empty()) {
+ ::encode_json("entry", name, f);
+ }
+ } else {
+ dump_node(entry.get(), show_history, f);
+ }
+ f->flush(out);
+ }
+ f->close_section();
+
+ f->open_array_section("complete");
+ for (auto& entry : complete_nodes) {
+ if (!search.empty() && !entry->match(search, show_history)) {
+ continue;
+ }
+ if (show_active && !entry->test_flags(RGW_SNS_FLAG_ACTIVE)) {
+ continue;
+ }
+ dump_node(entry.get(), show_history, f);
+ f->flush(out);
+ }
+ f->close_section();
+
+ f->close_section();
+
+ return 0;
+}
+
+void RGWSyncTraceManager::finish_node(RGWSyncTraceNode *node)
+{
+ RGWSyncTraceNodeRef old_node;
+
+ {
+ shunique_lock wl(lock, ceph::acquire_unique);
+ if (!node) {
+ return;
+ }
+ auto iter = nodes.find(node->handle);
+ if (iter == nodes.end()) {
+ /* not found, already finished */
+ return;
+ }
+
+ if (complete_nodes.full()) {
+ /* take a reference to the entry that is going to be evicted,
+ * can't let it get evicted under lock held, otherwise
+ * it's a deadlock as it will call finish_node()
+ */
+ old_node = complete_nodes.front();
+ }
+
+ complete_nodes.push_back(iter->second);
+ nodes.erase(iter);
+ }
+};
+
+#endif
+