summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/src/reporter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'wsrep-lib/src/reporter.cpp')
-rw-r--r--wsrep-lib/src/reporter.cpp304
1 files changed, 304 insertions, 0 deletions
diff --git a/wsrep-lib/src/reporter.cpp b/wsrep-lib/src/reporter.cpp
new file mode 100644
index 00000000..2a5c6f56
--- /dev/null
+++ b/wsrep-lib/src/reporter.cpp
@@ -0,0 +1,304 @@
+/*
+ * Copyright (C) 2021 Codership Oy <info@codership.com>
+ *
+ * This file is part of wsrep-lib.
+ *
+ * Wsrep-lib is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Wsrep-lib is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "wsrep/reporter.hpp"
+#include "wsrep/logger.hpp"
+
+#include <sstream>
+#include <iomanip>
+
+#include <cstring> // strerror()
+#include <cstdlib> // mkstemp()
+#include <cerrno> // errno
+#include <unistd.h> // write()
+#include <cstdio> // rename(), snprintf()
+#include <ctime> // clock_gettime()
+#include <cmath> // floor()
+
+static std::string const TEMP_EXTENSION(".XXXXXX");
+
+static std::string make_progress_string(int const from, int const to,
+ int const total,int const done,
+ int const indefinite)
+{
+ std::ostringstream os;
+
+ os << "{ \"from\": " << from << ", "
+ << "\"to\": " << to << ", "
+ << "\"total\": " << total << ", "
+ << "\"done\": " << done << ", "
+ << "\"indefinite\": " << indefinite << " }";
+
+ return os.str();
+}
+
+static std::string const indefinite_progress
+ (make_progress_string(-1, -1, -1, -1, -1));
+static std::string const steady_state
+ (make_progress_string(-1, -1, 0, 0, -1));
+
+static inline double
+timestamp()
+{
+ struct timespec time;
+ clock_gettime(CLOCK_REALTIME, &time);
+ return (double(time.tv_sec) + double(time.tv_nsec)*1.0e-9);
+}
+
+wsrep::reporter::reporter(wsrep::mutex& mutex,
+ const std::string& file_name,
+ size_t const max_msg)
+ : mutex_(mutex)
+ , file_name_(file_name)
+ , progress_(indefinite_progress)
+ , template_(new char [file_name_.length() + TEMP_EXTENSION.length() + 1])
+ , state_(wsrep::reporter::s_disconnected_disconnected)
+ , initialized_(false)
+ , err_msg_()
+ , warn_msg_()
+ , max_msg_(max_msg)
+{
+ template_[file_name_.length() + TEMP_EXTENSION.length()] = '\0';
+ write_file(timestamp());
+}
+
+wsrep::reporter::~reporter()
+{
+ delete [] template_;
+}
+
+wsrep::reporter::substates
+wsrep::reporter::substate_map(enum wsrep::server_state::state const state)
+{
+ switch (state)
+ {
+ case wsrep::server_state::s_disconnected:
+ initialized_ = false;
+ return s_disconnected_disconnected;
+ case wsrep::server_state::s_initializing:
+ if (s_disconnected_disconnected == state_)
+ return s_disconnected_initializing;
+ else if (s_joining_sst == state_)
+ return s_joining_initializing;
+ else if (s_joining_initializing == state_)
+ return s_joining_initializing; // continuation
+ else
+ {
+ assert(0);
+ return state_;
+ }
+ case wsrep::server_state::s_initialized:
+ initialized_ = true;
+ if (s_disconnected_initializing >= state_)
+ return s_disconnected_initialized;
+ else if (s_joining_initializing == state_)
+ return s_joining_ist;
+ else if (s_joining_ist == state_)
+ return s_joining_ist; // continuation
+ else
+ {
+ assert(0);
+ return state_;
+ }
+ case wsrep::server_state::s_connected:
+ return s_connected_waiting;
+ case wsrep::server_state::s_joiner:
+ if (initialized_)
+ return s_joining_initialized;
+ else
+ return s_joining_sst;
+ case wsrep::server_state::s_joined:
+ return s_joined_syncing;
+ case wsrep::server_state::s_donor:
+ return s_donor_sending;
+ case wsrep::server_state::s_synced:
+ return s_synced_running;
+ case wsrep::server_state::s_disconnecting:
+ return s_disconnecting_disconnecting;
+ default:
+ assert(0);
+ return state_;
+ }
+}
+
+void
+wsrep::reporter::write_log_msg(std::ostream& os,
+ const log_msg& msg)
+{
+ os << "\t\t{\n";
+ os << "\t\t\t\"timestamp\": " << std::showpoint << std::setprecision(18)
+ << msg.tstamp << ",\n";
+ os << "\t\t\t\"msg\": \"" << msg.msg << "\"\n";
+ os << "\t\t}";
+}
+
+void
+wsrep::reporter::write_log_msgs(std::ostream& os,
+ const std::string& label,
+ const std::deque<log_msg>& msgs)
+{
+ os << "\t\"" << label << "\": [\n";
+ for (size_t i(0); i < msgs.size(); ++i)
+ {
+ write_log_msg(os, msgs[i]);
+ os << (i+1 < msgs.size() ? ",\n" : "\n");
+ }
+ os << "\t],\n";
+}
+
+// write data to temporary file and then rename it to target file for atomicity
+void
+wsrep::reporter::write_file(double const tstamp)
+{
+ enum progress_type {
+ t_indefinite = -1, // indefinite wait
+ t_progressive, // measurable progress
+ t_final // final state
+ };
+
+ struct strings {
+ const char* state;
+ const char* comment;
+ progress_type type;
+ };
+
+ static const struct strings strings[substates_max] =
+ {
+ { "DISCONNECTED", "Disconnected", t_indefinite },
+ { "DISCONNECTED", "Initializing", t_indefinite },
+ { "DISCONNECTED", "Connecting", t_indefinite },
+ { "CONNECTED", "Waiting", t_indefinite },
+ { "JOINING", "Receiving state", t_progressive },
+ { "JOINING", "Receiving SST", t_progressive },
+ { "JOINING", "Initializing", t_progressive },
+ { "JOINING", "Receiving IST", t_progressive },
+ { "JOINED", "Syncing", t_progressive },
+ { "SYNCED", "Operational", t_final },
+ { "DONOR", "Donating SST", t_progressive },
+ { "DISCONNECTING", "Disconnecting", t_indefinite }
+ };
+
+ // prepare template for mkstemp()
+ file_name_.copy(template_, file_name_.length());
+ TEMP_EXTENSION.copy(template_ +file_name_.length(),TEMP_EXTENSION.length());
+
+ int const fd(mkstemp(template_));
+ if (fd < 0)
+ {
+ std::cerr << "Reporter could not open temporary file `" << template_
+ << "': " << strerror(errno) << " (" << errno << ")\n";
+ return;
+ }
+
+ double const seconds(floor(tstamp));
+ time_t const tt = time_t(seconds);
+ struct tm date;
+ localtime_r(&tt, &date);
+
+ char date_str[85] = { '\0', };
+ snprintf(date_str, sizeof(date_str) - 1,
+ "%04d-%02d-%02d %02d:%02d:%02d.%03d",
+ date.tm_year + 1900, date.tm_mon + 1, date.tm_mday,
+ date.tm_hour, date.tm_min, date.tm_sec,
+ (int)((tstamp-seconds)*1000));
+
+ std::ostringstream os;
+ os << "{\n";
+ os << "\t\"date\": \"" << date_str << "\",\n";
+ os << "\t\"timestamp\": " << std::showpoint << std::setprecision(18)
+ << tstamp << ",\n";
+ write_log_msgs(os, "errors", err_msg_);
+ write_log_msgs(os, "warnings", warn_msg_);
+ os << "\t\"status\": {\n";
+ os << "\t\t\"state\": \"" << strings[state_].state << "\",\n";
+ os << "\t\t\"comment\": \"" << strings[state_].comment << "\",\n";
+ os << "\t\t\"progress\": " << progress_ << "\n";
+ os << "\t}\n";
+ os << "}\n";
+
+ std::string str(os.str());
+ ssize_t err(write(fd, str.c_str(), str.length()));
+ if (err < 0)
+ {
+ std::cerr << "Could not write " << str.length()
+ << " bytes to temporary file '"
+ << template_ << "': " << strerror(errno)
+ << " (" << errno << ")\n";
+ return;
+ }
+
+ rename(template_, file_name_.c_str());
+}
+
+void
+wsrep::reporter::report_state(enum server_state::state const s)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ substates const state(substate_map(s));
+
+ if (state != state_)
+ {
+ state_ = state;
+
+ if (state_ == s_synced_running)
+ progress_ = steady_state;
+ else
+ progress_ = indefinite_progress;
+
+ write_file(timestamp());
+ }
+}
+
+void
+wsrep::reporter::report_progress(const std::string& json)
+{
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ if (json != progress_)
+ {
+ if (state_ != s_synced_running)
+ {
+ // ignore any progress in SYNCED state
+ progress_ = json;
+ write_file(timestamp());
+ }
+ }
+}
+
+void
+wsrep::reporter::report_log_msg(log_level const lvl,
+ const std::string& msg,
+ double tstamp)
+{
+ std::deque<log_msg>& deque(lvl == error ? err_msg_ : warn_msg_);
+
+ wsrep::unique_lock<wsrep::mutex> lock(mutex_);
+
+ if (deque.empty() || msg != deque.back().msg)
+ {
+ if (deque.size() == max_msg_) deque.pop_front();
+
+ if (tstamp <= undefined) tstamp = timestamp();
+
+ log_msg entry({tstamp, msg});
+ deque.push_back(entry);
+ write_file(tstamp);
+ }
+}