summaryrefslogtreecommitdiffstats
path: root/lib/perfdata/influxdbcommonwriter.hpp
blob: 380b20c9f6433afb16fc71a234813a3c95337e54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/* Icinga 2 | (c) 2021 Icinga GmbH | GPLv2+ */

#ifndef INFLUXDBCOMMONWRITER_H
#define INFLUXDBCOMMONWRITER_H

#include "perfdata/influxdbcommonwriter-ti.hpp"
#include "icinga/service.hpp"
#include "base/configobject.hpp"
#include "base/perfdatavalue.hpp"
#include "base/tcpsocket.hpp"
#include "base/timer.hpp"
#include "base/tlsstream.hpp"
#include "base/workqueue.hpp"
#include "remote/url.hpp"
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <atomic>
#include <fstream>

namespace icinga
{

/**
 * Common base class for InfluxDB v1/v2 writers.
 *
 * @ingroup perfdata
 */
class InfluxdbCommonWriter : public ObjectImpl<InfluxdbCommonWriter>
{
public:
	DECLARE_OBJECT(InfluxdbCommonWriter);

	template<class InfluxWriter>
	static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);

	void ValidateHostTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;
	void ValidateServiceTemplate(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils) override;

protected:
	void OnConfigLoaded() override;
	void Resume() override;
	void Pause() override;

	boost::beast::http::request<boost::beast::http::string_body> AssembleBaseRequest(String body);
	Url::Ptr AssembleBaseUrl();
	virtual boost::beast::http::request<boost::beast::http::string_body> AssembleRequest(String body) = 0;
	virtual Url::Ptr AssembleUrl() = 0;

private:
	boost::signals2::connection m_HandleCheckResults;
	Timer::Ptr m_FlushTimer;
	WorkQueue m_WorkQueue{10000000, 1};
	std::vector<String> m_DataBuffer;
	std::atomic_size_t m_DataBufferSize{0};

	void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
	void CheckResultHandlerWQ(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
	void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
		const String& label, const Dictionary::Ptr& fields, double ts);
	void FlushTimeout();
	void FlushTimeoutWQ();
	void FlushWQ();

	static String EscapeKeyOrTagValue(const String& str);
	static String EscapeValue(const Value& value);

	OptionalTlsStream Connect();

	void AssertOnWorkQueue();

	void ExceptionHandler(boost::exception_ptr exp);
};

template<class InfluxWriter>
void InfluxdbCommonWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
	DictionaryData nodes;
	auto typeName (InfluxWriter::TypeInstance->GetName().ToLower());

	for (const typename InfluxWriter::Ptr& influxwriter : ConfigType::GetObjectsByType<InfluxWriter>()) {
		size_t workQueueItems = influxwriter->m_WorkQueue.GetLength();
		double workQueueItemRate = influxwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
		size_t dataBufferItems = influxwriter->m_DataBufferSize;

		nodes.emplace_back(influxwriter->GetName(), new Dictionary({
			{ "work_queue_items", workQueueItems },
			{ "work_queue_item_rate", workQueueItemRate },
			{ "data_buffer_items", dataBufferItems }
		}));

		perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_items", workQueueItems));
		perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
		perfdata->Add(new PerfdataValue(typeName + "_" + influxwriter->GetName() + "_data_queue_items", dataBufferItems));
	}

	status->Set(typeName, new Dictionary(std::move(nodes)));
}

}

#endif /* INFLUXDBCOMMONWRITER_H */