summaryrefslogtreecommitdiffstats
path: root/lib/livestatus/livestatusquery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/livestatus/livestatusquery.cpp')
-rw-r--r--lib/livestatus/livestatusquery.cpp648
1 files changed, 648 insertions, 0 deletions
diff --git a/lib/livestatus/livestatusquery.cpp b/lib/livestatus/livestatusquery.cpp
new file mode 100644
index 0000000..0f9b3da
--- /dev/null
+++ b/lib/livestatus/livestatusquery.cpp
@@ -0,0 +1,648 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "livestatus/livestatusquery.hpp"
+#include "livestatus/countaggregator.hpp"
+#include "livestatus/sumaggregator.hpp"
+#include "livestatus/minaggregator.hpp"
+#include "livestatus/maxaggregator.hpp"
+#include "livestatus/avgaggregator.hpp"
+#include "livestatus/stdaggregator.hpp"
+#include "livestatus/invsumaggregator.hpp"
+#include "livestatus/invavgaggregator.hpp"
+#include "livestatus/attributefilter.hpp"
+#include "livestatus/negatefilter.hpp"
+#include "livestatus/orfilter.hpp"
+#include "livestatus/andfilter.hpp"
+#include "icinga/externalcommandprocessor.hpp"
+#include "base/debug.hpp"
+#include "base/convert.hpp"
+#include "base/objectlock.hpp"
+#include "base/logger.hpp"
+#include "base/exception.hpp"
+#include "base/utility.hpp"
+#include "base/json.hpp"
+#include "base/serializer.hpp"
+#include "base/timer.hpp"
+#include "base/initialize.hpp"
+#include <boost/algorithm/string/replace.hpp>
+#include <boost/algorithm/string/join.hpp>
+
+using namespace icinga;
+
+static int l_ExternalCommands = 0;
+static std::mutex l_QueryMutex;
+
+LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
+ : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), m_Limit(-1), m_ErrorCode(0),
+ m_LogTimeFrom(0), m_LogTimeUntil(static_cast<long>(Utility::GetTime()))
+{
+ if (lines.size() == 0) {
+ m_Verb = "ERROR";
+ m_ErrorCode = LivestatusErrorQuery;
+ m_ErrorMessage = "Empty Query. Aborting.";
+ return;
+ }
+
+ String msg;
+ for (const String& line : lines) {
+ msg += line + "\n";
+ }
+ Log(LogDebug, "LivestatusQuery", msg);
+
+ m_CompatLogPath = compat_log_path;
+
+ /* default separators */
+ m_Separators.emplace_back("\n");
+ m_Separators.emplace_back(";");
+ m_Separators.emplace_back(",");
+ m_Separators.emplace_back("|");
+
+ String line = lines[0];
+
+ size_t sp_index = line.FindFirstOf(" ");
+
+ if (sp_index == String::NPos)
+ BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus header must contain a verb."));
+
+ String verb = line.SubStr(0, sp_index);
+ String target = line.SubStr(sp_index + 1);
+
+ m_Verb = verb;
+
+ if (m_Verb == "COMMAND") {
+ m_KeepAlive = true;
+ m_Command = target;
+ } else if (m_Verb == "GET") {
+ m_Table = target;
+ } else {
+ m_Verb = "ERROR";
+ m_ErrorCode = LivestatusErrorQuery;
+ m_ErrorMessage = "Unknown livestatus verb: " + m_Verb;
+ return;
+ }
+
+ std::deque<Filter::Ptr> filters, stats;
+ std::deque<Aggregator::Ptr> aggregators;
+
+ for (unsigned int i = 1; i < lines.size(); i++) {
+ line = lines[i];
+
+ size_t col_index = line.FindFirstOf(":");
+ String header = line.SubStr(0, col_index);
+ String params;
+
+ //OutputFormat:json or OutputFormat: json
+ if (line.GetLength() > col_index + 1)
+ params = line.SubStr(col_index + 1).Trim();
+
+ if (header == "ResponseHeader")
+ m_ResponseHeader = params;
+ else if (header == "OutputFormat")
+ m_OutputFormat = params;
+ else if (header == "KeepAlive")
+ m_KeepAlive = (params == "on");
+ else if (header == "Columns") {
+ m_ColumnHeaders = false; // Might be explicitly re-enabled later on
+ m_Columns = params.Split(" ");
+ } else if (header == "Separators") {
+ std::vector<String> separators = params.Split(" ");
+
+ /* ugly ascii long to char conversion, but works */
+ if (separators.size() > 0)
+ m_Separators[0] = String(1, static_cast<char>(Convert::ToLong(separators[0])));
+ if (separators.size() > 1)
+ m_Separators[1] = String(1, static_cast<char>(Convert::ToLong(separators[1])));
+ if (separators.size() > 2)
+ m_Separators[2] = String(1, static_cast<char>(Convert::ToLong(separators[2])));
+ if (separators.size() > 3)
+ m_Separators[3] = String(1, static_cast<char>(Convert::ToLong(separators[3])));
+ } else if (header == "ColumnHeaders")
+ m_ColumnHeaders = (params == "on");
+ else if (header == "Limit")
+ m_Limit = Convert::ToLong(params);
+ else if (header == "Filter") {
+ Filter::Ptr filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
+
+ if (!filter) {
+ m_Verb = "ERROR";
+ m_ErrorCode = LivestatusErrorQuery;
+ m_ErrorMessage = "Invalid filter specification: " + line;
+ return;
+ }
+
+ filters.push_back(filter);
+ } else if (header == "Stats") {
+ m_ColumnHeaders = false; // Might be explicitly re-enabled later on
+
+ std::vector<String> tokens = params.Split(" ");
+
+ if (tokens.size() < 2) {
+ m_Verb = "ERROR";
+ m_ErrorCode = LivestatusErrorQuery;
+ m_ErrorMessage = "Missing aggregator column name: " + line;
+ return;
+ }
+
+ String aggregate_arg = tokens[0];
+ String aggregate_attr = tokens[1];
+
+ Aggregator::Ptr aggregator;
+ Filter::Ptr filter;
+
+ if (aggregate_arg == "sum") {
+ aggregator = new SumAggregator(aggregate_attr);
+ } else if (aggregate_arg == "min") {
+ aggregator = new MinAggregator(aggregate_attr);
+ } else if (aggregate_arg == "max") {
+ aggregator = new MaxAggregator(aggregate_attr);
+ } else if (aggregate_arg == "avg") {
+ aggregator = new AvgAggregator(aggregate_attr);
+ } else if (aggregate_arg == "std") {
+ aggregator = new StdAggregator(aggregate_attr);
+ } else if (aggregate_arg == "suminv") {
+ aggregator = new InvSumAggregator(aggregate_attr);
+ } else if (aggregate_arg == "avginv") {
+ aggregator = new InvAvgAggregator(aggregate_attr);
+ } else {
+ filter = ParseFilter(params, m_LogTimeFrom, m_LogTimeUntil);
+
+ if (!filter) {
+ m_Verb = "ERROR";
+ m_ErrorCode = LivestatusErrorQuery;
+ m_ErrorMessage = "Invalid filter specification: " + line;
+ return;
+ }
+
+ aggregator = new CountAggregator();
+ }
+
+ aggregator->SetFilter(filter);
+ aggregators.push_back(aggregator);
+
+ stats.push_back(filter);
+ } else if (header == "Or" || header == "And" || header == "StatsOr" || header == "StatsAnd") {
+ std::deque<Filter::Ptr>& deq = (header == "Or" || header == "And") ? filters : stats;
+
+ unsigned int num = Convert::ToLong(params);
+ CombinerFilter::Ptr filter;
+
+ if (header == "Or" || header == "StatsOr") {
+ filter = new OrFilter();
+ Log(LogDebug, "LivestatusQuery")
+ << "Add OR filter for " << params << " column(s). " << deq.size() << " filters available.";
+ } else {
+ filter = new AndFilter();
+ Log(LogDebug, "LivestatusQuery")
+ << "Add AND filter for " << params << " column(s). " << deq.size() << " filters available.";
+ }
+
+ if (num > deq.size()) {
+ m_Verb = "ERROR";
+ m_ErrorCode = 451;
+ m_ErrorMessage = "Or/StatsOr is referencing " + Convert::ToString(num) + " filters; stack only contains " + Convert::ToString(static_cast<long>(deq.size())) + " filters";
+ return;
+ }
+
+ while (num > 0 && num--) {
+ filter->AddSubFilter(deq.back());
+ Log(LogDebug, "LivestatusQuery")
+ << "Add " << num << " filter.";
+ deq.pop_back();
+ if (&deq == &stats)
+ aggregators.pop_back();
+ }
+
+ deq.emplace_back(filter);
+ if (&deq == &stats) {
+ Aggregator::Ptr aggregator = new CountAggregator();
+ aggregator->SetFilter(filter);
+ aggregators.push_back(aggregator);
+ }
+ } else if (header == "Negate" || header == "StatsNegate") {
+ std::deque<Filter::Ptr>& deq = (header == "Negate") ? filters : stats;
+
+ if (deq.empty()) {
+ m_Verb = "ERROR";
+ m_ErrorCode = 451;
+ m_ErrorMessage = "Negate/StatsNegate used, however the filter stack is empty";
+ return;
+ }
+
+ Filter::Ptr filter = deq.back();
+ deq.pop_back();
+
+ if (!filter) {
+ m_Verb = "ERROR";
+ m_ErrorCode = 451;
+ m_ErrorMessage = "Negate/StatsNegate used, however last stats doesn't have a filter";
+ return;
+ }
+
+ deq.push_back(new NegateFilter(filter));
+
+ if (deq == stats) {
+ Aggregator::Ptr aggregator = aggregators.back();
+ aggregator->SetFilter(filter);
+ }
+ }
+ }
+
+ /* Combine all top-level filters into a single filter. */
+ AndFilter::Ptr top_filter = new AndFilter();
+
+ for (const Filter::Ptr& filter : filters) {
+ top_filter->AddSubFilter(filter);
+ }
+
+ m_Filter = top_filter;
+ m_Aggregators.swap(aggregators);
+}
+
+int LivestatusQuery::GetExternalCommands()
+{
+ std::unique_lock<std::mutex> lock(l_QueryMutex);
+
+ return l_ExternalCommands;
+}
+
+Filter::Ptr LivestatusQuery::ParseFilter(const String& params, unsigned long& from, unsigned long& until)
+{
+ /*
+ * time >= 1382696656
+ * type = SERVICE FLAPPING ALERT
+ */
+ std::vector<String> tokens;
+ size_t sp_index;
+ String temp_buffer = params;
+
+ /* extract attr and op */
+ for (int i = 0; i < 2; i++) {
+ sp_index = temp_buffer.FindFirstOf(" ");
+
+ /* check if this is the last argument */
+ if (sp_index == String::NPos) {
+ /* 'attr op' or 'attr op val' is valid */
+ if (i < 1)
+ BOOST_THROW_EXCEPTION(std::runtime_error("Livestatus filter '" + params + "' does not contain all required fields."));
+
+ break;
+ }
+
+ tokens.emplace_back(temp_buffer.SubStr(0, sp_index));
+ temp_buffer = temp_buffer.SubStr(sp_index + 1);
+ }
+
+ /* add the rest as value */
+ tokens.emplace_back(std::move(temp_buffer));
+
+ if (tokens.size() == 2)
+ tokens.emplace_back("");
+
+ if (tokens.size() < 3)
+ return nullptr;
+
+ bool negate = false;
+ String attr = tokens[0];
+ String op = tokens[1];
+ String val = tokens[2];
+
+ if (op == "!=") {
+ op = "=";
+ negate = true;
+ } else if (op == "!~") {
+ op = "~";
+ negate = true;
+ } else if (op == "!=~") {
+ op = "=~";
+ negate = true;
+ } else if (op == "!~~") {
+ op = "~~";
+ negate = true;
+ }
+
+ Filter::Ptr filter = new AttributeFilter(attr, op, val);
+
+ if (negate)
+ filter = new NegateFilter(filter);
+
+ /* pre-filter log time duration */
+ if (attr == "time") {
+ if (op == "<" || op == "<=") {
+ until = Convert::ToLong(val);
+ } else if (op == ">" || op == ">=") {
+ from = Convert::ToLong(val);
+ }
+ }
+
+ Log(LogDebug, "LivestatusQuery")
+ << "Parsed filter with attr: '" << attr << "' op: '" << op << "' val: '" << val << "'.";
+
+ return filter;
+}
+
+void LivestatusQuery::BeginResultSet(std::ostream& fp) const
+{
+ if (m_OutputFormat == "json" || m_OutputFormat == "python")
+ fp << "[";
+}
+
+void LivestatusQuery::EndResultSet(std::ostream& fp) const
+{
+ if (m_OutputFormat == "json" || m_OutputFormat == "python")
+ fp << "]";
+}
+
+void LivestatusQuery::AppendResultRow(std::ostream& fp, const Array::Ptr& row, bool& first_row) const
+{
+ if (m_OutputFormat == "csv") {
+ bool first = true;
+
+ ObjectLock rlock(row);
+ for (const Value& value : row) {
+ if (first)
+ first = false;
+ else
+ fp << m_Separators[1];
+
+ if (value.IsObjectType<Array>())
+ PrintCsvArray(fp, value, 0);
+ else
+ fp << value;
+ }
+
+ fp << m_Separators[0];
+ } else if (m_OutputFormat == "json") {
+ if (!first_row)
+ fp << ", ";
+
+ fp << JsonEncode(row);
+ } else if (m_OutputFormat == "python") {
+ if (!first_row)
+ fp << ", ";
+
+ PrintPythonArray(fp, row);
+ }
+
+ first_row = false;
+}
+
+void LivestatusQuery::PrintCsvArray(std::ostream& fp, const Array::Ptr& array, int level) const
+{
+ bool first = true;
+
+ ObjectLock olock(array);
+ for (const Value& value : array) {
+ if (first)
+ first = false;
+ else
+ fp << ((level == 0) ? m_Separators[2] : m_Separators[3]);
+
+ if (value.IsObjectType<Array>())
+ PrintCsvArray(fp, value, level + 1);
+ else if (value.IsBoolean())
+ fp << Convert::ToLong(value);
+ else
+ fp << value;
+ }
+}
+
+void LivestatusQuery::PrintPythonArray(std::ostream& fp, const Array::Ptr& rs) const
+{
+ fp << "[ ";
+
+ bool first = true;
+
+ for (const Value& value : rs) {
+ if (first)
+ first = false;
+ else
+ fp << ", ";
+
+ if (value.IsObjectType<Array>())
+ PrintPythonArray(fp, value);
+ else if (value.IsNumber())
+ fp << value;
+ else
+ fp << QuoteStringPython(value);
+ }
+
+ fp << " ]";
+}
+
+String LivestatusQuery::QuoteStringPython(const String& str) {
+ String result = str;
+ boost::algorithm::replace_all(result, "\"", "\\\"");
+ return "r\"" + result + "\"";
+}
+
+void LivestatusQuery::ExecuteGetHelper(const Stream::Ptr& stream)
+{
+ Log(LogNotice, "LivestatusQuery")
+ << "Table: " << m_Table;
+
+ Table::Ptr table = Table::GetByName(m_Table, m_CompatLogPath, m_LogTimeFrom, m_LogTimeUntil);
+
+ if (!table) {
+ SendResponse(stream, LivestatusErrorNotFound, "Table '" + m_Table + "' does not exist.");
+
+ return;
+ }
+
+ std::vector<LivestatusRowValue> objects = table->FilterRows(m_Filter, m_Limit);
+ std::vector<String> columns;
+
+ if (m_Columns.size() > 0)
+ columns = m_Columns;
+ else
+ columns = table->GetColumnNames();
+
+ std::ostringstream result;
+ bool first_row = true;
+ BeginResultSet(result);
+
+ if (m_Aggregators.empty()) {
+ typedef std::pair<String, Column> ColumnPair;
+
+ std::vector<ColumnPair> column_objs;
+ column_objs.reserve(columns.size());
+
+ for (const String& columnName : columns)
+ column_objs.emplace_back(columnName, table->GetColumn(columnName));
+
+ ArrayData header;
+
+ for (const LivestatusRowValue& object : objects) {
+ ArrayData row;
+
+ row.reserve(column_objs.size());
+
+ for (const ColumnPair& cv : column_objs) {
+ if (m_ColumnHeaders)
+ header.push_back(cv.first);
+
+ row.push_back(cv.second.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
+ }
+
+ if (m_ColumnHeaders) {
+ AppendResultRow(result, new Array(std::move(header)), first_row);
+ m_ColumnHeaders = false;
+ }
+
+ AppendResultRow(result, new Array(std::move(row)), first_row);
+ }
+ } else {
+ std::map<std::vector<Value>, std::vector<AggregatorState *> > allStats;
+
+ /* add aggregated stats */
+ for (const LivestatusRowValue& object : objects) {
+ std::vector<Value> statsKey;
+
+ for (const String& columnName : m_Columns) {
+ Column column = table->GetColumn(columnName);
+ statsKey.emplace_back(column.ExtractValue(object.Row, object.GroupByType, object.GroupByObject));
+ }
+
+ auto it = allStats.find(statsKey);
+
+ if (it == allStats.end()) {
+ std::vector<AggregatorState *> newStats(m_Aggregators.size(), nullptr);
+ it = allStats.insert(std::make_pair(statsKey, newStats)).first;
+ }
+
+ auto& stats = it->second;
+
+ int index = 0;
+
+ for (const Aggregator::Ptr& aggregator : m_Aggregators) {
+ aggregator->Apply(table, object.Row, &stats[index]);
+ index++;
+ }
+ }
+
+ /* add column headers both for raw and aggregated data */
+ if (m_ColumnHeaders) {
+ ArrayData header;
+
+ for (const String& columnName : m_Columns) {
+ header.push_back(columnName);
+ }
+
+ for (size_t i = 1; i <= m_Aggregators.size(); i++) {
+ header.push_back("stats_" + Convert::ToString(i));
+ }
+
+ AppendResultRow(result, new Array(std::move(header)), first_row);
+ }
+
+ for (const auto& kv : allStats) {
+ ArrayData row;
+
+ row.reserve(m_Columns.size() + m_Aggregators.size());
+
+ for (const Value& keyPart : kv.first) {
+ row.push_back(keyPart);
+ }
+
+ auto& stats = kv.second;
+
+ for (size_t i = 0; i < m_Aggregators.size(); i++)
+ row.push_back(m_Aggregators[i]->GetResultAndFreeState(stats[i]));
+
+ AppendResultRow(result, new Array(std::move(row)), first_row);
+ }
+
+ /* add a bogus zero value if aggregated is empty*/
+ if (allStats.empty()) {
+ ArrayData row;
+
+ row.reserve(m_Aggregators.size());
+
+ for (size_t i = 1; i <= m_Aggregators.size(); i++) {
+ row.push_back(0);
+ }
+
+ AppendResultRow(result, new Array(std::move(row)), first_row);
+ }
+ }
+
+ EndResultSet(result);
+
+ SendResponse(stream, LivestatusErrorOK, result.str());
+}
+
+void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
+{
+ {
+ std::unique_lock<std::mutex> lock(l_QueryMutex);
+
+ l_ExternalCommands++;
+ }
+
+ Log(LogNotice, "LivestatusQuery")
+ << "Executing command: " << m_Command;
+ ExternalCommandProcessor::Execute(m_Command);
+ SendResponse(stream, LivestatusErrorOK, "");
+}
+
+void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
+{
+ Log(LogDebug, "LivestatusQuery")
+ << "ERROR: Code: '" << m_ErrorCode << "' Message: '" << m_ErrorMessage << "'.";
+ SendResponse(stream, m_ErrorCode, m_ErrorMessage);
+}
+
+void LivestatusQuery::SendResponse(const Stream::Ptr& stream, int code, const String& data)
+{
+ if (m_ResponseHeader == "fixed16")
+ PrintFixed16(stream, code, data);
+
+ if (m_ResponseHeader == "fixed16" || code == LivestatusErrorOK) {
+ try {
+ stream->Write(data.CStr(), data.GetLength());
+ } catch (const std::exception&) {
+ Log(LogCritical, "LivestatusQuery", "Cannot write query response to socket.");
+ }
+ }
+}
+
+void LivestatusQuery::PrintFixed16(const Stream::Ptr& stream, int code, const String& data)
+{
+ ASSERT(code >= 100 && code <= 999);
+
+ String sCode = Convert::ToString(code);
+ String sLength = Convert::ToString(static_cast<long>(data.GetLength()));
+
+ String header = sCode + String(16 - 3 - sLength.GetLength() - 1, ' ') + sLength + m_Separators[0];
+
+ try {
+ stream->Write(header.CStr(), header.GetLength());
+ } catch (const std::exception&) {
+ Log(LogCritical, "LivestatusQuery", "Cannot write to TCP socket.");
+ }
+}
+
+bool LivestatusQuery::Execute(const Stream::Ptr& stream)
+{
+ try {
+ Log(LogNotice, "LivestatusQuery")
+ << "Executing livestatus query: " << m_Verb;
+
+ if (m_Verb == "GET")
+ ExecuteGetHelper(stream);
+ else if (m_Verb == "COMMAND")
+ ExecuteCommandHelper(stream);
+ else if (m_Verb == "ERROR")
+ ExecuteErrorHelper(stream);
+ else
+ BOOST_THROW_EXCEPTION(std::runtime_error("Invalid livestatus query verb."));
+ } catch (const std::exception& ex) {
+ SendResponse(stream, LivestatusErrorQuery, DiagnosticInformation(ex));
+ }
+
+ if (!m_KeepAlive) {
+ stream->Close();
+ return false;
+ }
+
+ return true;
+}